Netty Demo

初识Netty

Netty是一个提供了易于使用的API的客户端/服务器框架

  • 高并发
  • NIO(非阻塞IO)
  • 传输快,零拷贝

n1

阻塞与非阻塞

线程访问资源,该资源是否准备就绪的一种处理方式

  • 阻塞:如果资源占用,则一直等待

  • 非阻塞:如果资源占用,则返回一个结果

同步和异步

访问数据的一种机制

n2

IO模型

  • BIO

    同步阻塞IO,Block IO

    并发处理能力差,依赖网速带宽

n3

通过增加线程,一应一答,改良后引入线程池

  • NIO

    非堵塞IO,Non-Block IO

    线程会做其他事情,定时再去查看资源情况

    引入selector多路复用选择器,buffer等

n4

客户端增多,不会影响性能

  • AIO

    异步非阻塞

    线程做其他事情,释放资源者通知线程

n5

n6

Netty的线程模型

  • 单线程模型:所有操作由一个NIO单线程处理

n7

  • 多线程模型:由一组NIO线程处理IO操作

n8

  • 主从线程模型:一组线程池接受请求,一组线程池处理IO

n9

简单服务器编写

引入依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
</dependency>
public class HelloServer {
    public static void main(String[] args) throws Exception{

        //declare a thread group, to accept the connect from the client
        //bossgroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workergroup,do the task from the bossgroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        try {
            //create the netty server
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)   //set nio pattern
                    .childHandler(new HelloServerIni());

            //start the server and set the port
            ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();

            //listen the closed channel and set sync
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

每个channel由多个handler组成管道pipeline

n10

public class HelloServerIni extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        //get pipeline
        ChannelPipeline channelPipeline = channel.pipeline();
        //add handler by pipeline
        channelPipeline.addLast("HttpServerCodec",new HttpServerCodec());
        channelPipeline.addLast("customHandler",new CustomHandler());

    }
}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {

        Channel channel = channelHandlerContext.channel();

        if (httpObject instanceof HttpRequest){

            System.out.println(channel.remoteAddress());

            ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);

            FullHttpResponse fullHttpResponse =
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_0,
                            HttpResponseStatus.OK,
                            content);

            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
            fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());

            channel.writeAndFlush(fullHttpResponse);
        }

    }
}

Netty生命周期

n11

n12

实时通讯Demo

三种实现方式:

  • Ajax轮询
  • Long pull
  • websocket

server端

chatserver

public class ChatServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup maingroup = new NioEventLoopGroup();
        EventLoopGroup subgroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(maingroup,subgroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChatServerIni());

            ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            maingroup.shutdownGracefully();
            subgroup.shutdownGracefully();

        }

    }
}

chatserverini

public class ChatServerIni extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline channelPipeline = channel.pipeline();

        channelPipeline.addLast(new HttpServerCodec());

        //support write big data stream
        channelPipeline.addLast(new ChunkedWriteHandler());

        //max contend length
        //aggregate the http request and http response
        channelPipeline.addLast(new HttpObjectAggregator(1024*64));

        //--------------to support http---------------------

        //to set the websocket route
        //do something about handshaking(close, ping, pong)
        channelPipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //custom handler
        channelPipeline.addLast(new ChatHandler());
    }
}

chatserver handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //manage the clients channels
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        //add client channel to channel group
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);

        System.out.println(ctx.channel().id().asLongText());
        System.out.println(ctx.channel().id().asShortText());

    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        String content = msg.text();
        System.out.println(content);

        for (Channel channel: clients){
            channel.writeAndFlush(
                    new TextWebSocketFrame("[server get message:]"
                            + LocalDateTime.now() + content));

        }

//        clients.writeAndFlush (
//                new TextWebSocketFrame("[server get message:]"
//                + LocalDateTime.now() + content));
    }
}

client端

var socket = new WebSocket("ws://[ip]:[port]"); 

生命周期:

  • onopen()
  • onmessage()
  • onerror()
  • onclose()

主动方法:

  • Socket.send()
  • Socket.close()
<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8" />
        <title></title>
    </head>
    <body>

        <div>send message:</div>
        <input type="text" id="msgContent" />
        <input type="button" value="send" onclick="CHAT.chat()"/>

        <div>get message:</div>
        <div id="receiveMsg" style="background-color: antiquewhite;"></div>

        <script type="application/javascript">

            window.CHAT = {
                socket: null,
                init: function(){
                    if(window.WebSocket){
                        CHAT.socket = new WebSocket("ws://127.0.0.1:8088/ws");
                        CHAT.socket.onopen = function(){
                            console.log("on open");
                        };
                        CHAT.socket.onclose = function(){
                            console.log("on close");
                        };
                        CHAT.socket.onerror = function(){
                            console.log("on error");
                        };
                        CHAT.socket.onmessage = function(e){
                            console.log("on message" + e.data);
                            var receiveMsg = document.getElementById("receiveMsg");
                            var html = receiveMsg.innerHTML;
                            receiveMsg.innerHTML = html + "<br/>" + e.data;
                        };

                    }else{
                        alert("no support for websocket");
                    }
                },
                chat: function(){
                    var msg = document.getElementById("msgContent");
                    CHAT.socket.send(msg.value);
                }
            }
            CHAT.init();

        </script>
    </body>
</html>

n13

Springboot中整合

修改chatserver

@Component
public class ChatServer {

    private static class SingletionChatServer{
        static final ChatServer instance = new ChatServer();
    }

    public static ChatServer getInstance(){
        return SingletionChatServer.instance;
    }

    private EventLoopGroup maingroup;
    private EventLoopGroup subgroup;
    private ChannelFuture channelFuture;
    private ServerBootstrap serverBootstrap;

    public ChatServer() {
        maingroup = new NioEventLoopGroup();
        subgroup = new NioEventLoopGroup();
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(maingroup,subgroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChatServerIni());

    }

    public void start() {
        this.channelFuture = serverBootstrap.bind(8088);
        System.err.println("netty started");
    }
}

在springapplication同级目录下创建nettybooter类

@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if(contextRefreshedEvent.getApplicationContext().getParent() == null){
            try {
                ChatServer.getInstance().start();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

项目部分代码

package hku.knowname.weasel.chatmanagement.chatserver;

import hku.knowname.weasel.chatmanagement.ChatService.ChatService;
import hku.knowname.weasel.chatmanagement.SpringUtil;
import hku.knowname.weasel.chatmanagement.enums.MsgActionEnum;
import hku.knowname.weasel.chatmanagement.utils.JsonUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //manage the clients channels
    public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        //add client channel to channel group
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);

        System.out.println(ctx.channel().id().asLongText());
        System.out.println(ctx.channel().id().asShortText());

    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        String content = msg.text();
//        System.out.println(content);

        Channel currentChannel = ctx.channel();

        //1. get the message from client
        DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
        Integer action = dataContent.getAction();

        //2. judge the message type
        if (action == MsgActionEnum.CONNECT.type){
            // 2.1 link the channel and user id
            int sendID = dataContent.getChatMsg().getSendId();
            UserChannelRel.put(sendID, currentChannel);

            //test
            for (Channel c : clients){
                System.out.println(c.id().asLongText());
            }
            UserChannelRel.output();

        }else if (action == MsgActionEnum.CHAT.type){
            ChatMsg chatMsg = dataContent.getChatMsg();
            String msgText = chatMsg.getMsg();
            int receivedid = chatMsg.getReceiveId();
            int sendid = chatMsg.getSendId();

            // 2.2 chat type: save the chat message in db, tag the status[no read]
            //save chat to db
            ChatService chatService = (ChatService) SpringUtil.getBean("chatServiceImp");

            String msgId = chatService.saveMsg(chatMsg);
            chatMsg.setChatId(msgId);

            //send msg
            Channel receiverChannel = UserChannelRel.get(receivedid);
            if (receiverChannel == null){
                // receiver user offline , send msg TODO
            }else {
                // find this receiver in channel group
                Channel findChannel = clients.find(receiverChannel.id());
                if (findChannel != null){
                    //user online
                    receiverChannel.writeAndFlush(
                            new TextWebSocketFrame(JsonUtils.objectToJson(chatMsg)));

                }else {
                    // user offline TODO

                }
            }




        }else if (action == MsgActionEnum.SIGNED.type){
            // 2.3 read type: change the status to the [already read]
            ChatService chatService = (ChatService) SpringUtil.getBean("chatServiceImp");
            //extand means the chatid in the signed type, with comma
            String chatidstr = dataContent.getExtand();
            String[] chatids = chatidstr.split(",");
            List<String> chatidList = new ArrayList<>();
            for (String mid: chatids){
                if (StringUtils.isNotBlank(mid)){
                    chatidList.add(mid);
                }
            }
            System.out.println(chatidList.toString());

            if (chatidList != null && !chatidList.isEmpty() && chatidList.size()>0){
                //sign the msg batch
                chatService.updateMsgSigned(chatidList);

            }
        }else if (action == MsgActionEnum.KEEPALIVE.type){
            // 2.4 heartbeat
        }

//        for (Channel channel: clients){
//            channel.writeAndFlush(
//                    new TextWebSocketFrame("[server get message:]"
//                            + LocalDateTime.now() + content));
//        }

        clients.writeAndFlush (
                new TextWebSocketFrame("[server get message:]"
                + LocalDateTime.now() + content));


    }
}

  • Copyrights © 2019-2020 Rex