Netty 笔记

时间:2019-11-27
本文章向大家介绍Netty 笔记,主要包括Netty 笔记使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Netty是一款异步的事件驱动的网络程序应用框架,支持快速的开发可维护性的高性能的面向协议的服务器和客户端。
Netty优雅的处理了网络编程,多线程处理和并发领域的问题。

包含的功能模块:

传输服务:Socket和数据报,Http隧道,VM管道
协议:Http和WebSocket,SSL安全套接字,压缩,大文件传输,Google Protopuf,RTSP实时流协议(流媒体),遗留的基于文本和二进制协议
核心:可扩展的事件模型,统一的通信API,零拷贝的ByteBuffer

Netty网络抽象组件

提示:ctrl+alt+b查看各接口的实现,得知netty都提供了哪些支持

  • Channel ~ Socket
  • EventLoop ~ 控制流,多线程处理,并发
  • ChannelFuture ~ 异步通知

Channel , EventLoop,Thread,EventLoopGroup的关系

    • EventLoopGroup:EventLoop = 1:n
    • EventLoop:Thread = 1:1 在EventLoop生命周期内
    • Channel:EventLoop = 1:1 在Channel生命周期内,只注册一个EventLoop

 Netty管理数据流和执行业务的组件

  • Channelandler ~ 业务处理容器 ;子类型【编码器,解码器,SimpleChannelInboundHandler】
  • ChannelPipeline ~ 将多个ChannelHandler串联起来
  • ChannelHandlerContext ~ ChannelHandler和ChannelPipeline之间的绑定,调用该对象的write导致消息跳过出站Handler在尾部直接写入

建议 :建立一个连接,客户端需要一个EventLoopGroup,服务端需要两个EventLoopGroup(建立连接和处理连接)

传输~Channel 

Java原生和Netty网络的API比较

Java原生阻塞IO

public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            for(;;) {
                final Socket clientSocket = socket.accept();
                System.out.println(
                        "Accepted connection from " + clientSocket);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(
                                    Charset.forName("UTF-8")));
                            out.flush();
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Java原生NIO

nio:选择并处理状态的变化

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;){
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                //handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server =
                                (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {
                        SocketChannel client =
                                (SocketChannel) key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // ignore on close
                    }
                }
            }
        }
    }
}

只有NIO和Epoll支持零拷贝

  • 针对于Linux,自Linux内核版本 2.5.44后,引入epoll——一个高度可扩展的I/O事件通知特性,提供了比旧的POSIX select和poll系统调用更好的性能,Linux JDK NIO API使用了这些epoll调用

用法简单,只需要将NioEventLoopGroup替换为EpollEventGroup,并且把NioServerSocketChannel.class替换为EpollServerSocketChannel.class即可。

String os = System.getProperty("os.name");  
if(os.toLowerCase().startsWith("win")){  
  System.out.println(os + " can't gunzip");  
}

Netty实现阻塞IO

OIO的处理逻辑

public class NettyOioServer {
    public void server(int port)
            throws Exception {
        final ByteBuf buf =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                                ch.pipeline().addLast(
                                    new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(
                                                ChannelHandlerContext ctx)
                                                throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(
                                                            ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

Netty实现NIO

public class NettyNioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n",
                        Charset.forName("UTF-8")));
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group).channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      public void initChannel(SocketChannel ch)
                                              throws Exception {
                                              ch.pipeline().addLast(
                                                  new ChannelInboundHandlerAdapter() {
                                                      @Override
                                                      public void channelActive(
                                                              ChannelHandlerContext ctx) throws Exception {
                                                                ctx.writeAndFlush(buf.duplicate())
                                                                  .addListener(
                                                                          ChannelFutureListener.CLOSE);
                                                      }
                                                  });
                                      }
                                  }
                    );
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

原文地址:https://www.cnblogs.com/fubinhnust/p/11940115.html