Netty之TCP粘包/拆包

时间:2022-07-24
本文章向大家介绍Netty之TCP粘包/拆包,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一、何为TCP粘包/拆包?

TCP会根据缓冲区的实际大小情况进行包的拆分和合并,所谓粘包,就是将多个小的包封装成一个大的包进行发送。拆包,即是将一个超过缓冲区可用大小的包拆分成多个包进行发送。

二、粘包/拆包产生的原因

1、写入的字节大小大于套接字的发送缓存区大小。

2、进行MSS大小的TCP分段

3、以太网帧的payload大于MTU进行IP分段

三、解决方法

1、消息定长,不够空格补

2、在包尾添加回车换行符(也可自定义分隔符)进行分割,例如FTP

3、将消息分成消息头和消息体两部分,消息头记录的消息的总长度

四、未考虑TCP粘包/拆包的案例

服务端:

public class Server {
    private int port;
    public Server(int port) {
        this.port = port;
    }
    public void start(){
        //配置服务端NIO线程
        //接收客户端连接,事件分发线程池
        NioEventLoopGroup boss = null;
        //处理读写事件线程池
        NioEventLoopGroup workder = null;
        try {
            boss = new NioEventLoopGroup();
            workder = new NioEventLoopGroup();

            //创建服务端,相当于NIO的ServerSocketChannel
            ServerBootstrap server = new ServerBootstrap();
            server.group(boss,workder)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ServerHandler());
                        }
                    })
                    //最大客户端连接数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //与客户端保持长链接
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            //绑定端口,同步等待
            ChannelFuture future = server.bind(this.port).sync();
            System.out.println("服务端已启动,端口为:"+port);
            //等待服务器监听端口关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {   
        } finally {
            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            workder.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new Server(8080).start();
    }
}

服务端的IO事件处理器:

public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, "UTF-8");
        System.out.println(body);
    }
}

客户端:(启动之后发送一百条信息给服务端)

public class Client {
    public static void main(String[] args) {
        new Client().start();
    }
    private void start() {
        //用于IO读写的线程池
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            //相当于NIO的SocketChannel
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //绑定IO读写事件的处理类
                           ch.pipeline().addLast(new ClientHandler());
                        }
                    });
            //发起连接
            ChannelFuture cf = client.connect(new InetSocketAddress(8080)).sync();
            //等待关闭
            cf.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端的IO事件处理类:

public class ClientHandler extends ChannelHandlerAdapter{
    /**
     * 连接成功后触发此方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf byteBuf = null;
        for (int i = 0; i < 100; i++) {
            String msg = "这是第"+i+"个消息";
            byteBuf = Unpooled.buffer(msg.getBytes().length);
            byteBuf.writeBytes(msg.getBytes());
            ctx.writeAndFlush(byteBuf);
        }
    }
}

先后启动服务端与客户端,服务端接受到的信息如下所示:

会发现,多条消息会粘连在一起。

五、加入Netty的TCP粘包/拆包解决方案。

Netty解决TCP粘包/拆包相关类以及功能:

1、LineBasedFrameDecoder:以r或rn为分隔符

2、StringDecoder:将接收到的消息转换成字符串

3、DelimiterBasedFrameDecoder:自定义分隔符

4、FixedLengthFrameDecoder:定长解析

这边先用LineBasedFrameDecoder以“r”或“rn”去分割,然后用StringDecoder将消息转换成字符串。这边由于只是服务端接受消息,所以只在服务端改动,如果有互发信息的需求,请在客户端也加上相应的Decoder类。

服务端改动比较简单,只需在ChannelPipeline上添加对应的Decoder类:

ChannelPipeline pipeline = ch.pipeline();
//以r或rn分割字符串,1024是能接受分割后字符串的最大长度
pipeline.addLast(new LineBasedFrameDecoder(1024));
//将消息转换字符串
pipeline.addLast(new StringDecoder());
pipeline.addLast(new ServerHandler());

因为将消息转换成字符串,所以需要在自定义的ServerHandler更改消息处理逻辑,直接强转成String就行。

public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println(body);
    }
}

客户端只需要在IO事件处理类中将发送的每一个消息后面添加“r”或“rn”即可 :

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf byteBuf = null;
        for (int i = 0; i < 100; i++) {
            String msg = "这是第"+i+"个消息rn";
            byteBuf = Unpooled.buffer(msg.getBytes().length);
            byteBuf.writeBytes(msg.getBytes());
            ctx.writeAndFlush(byteBuf);
        }
    }

此时先后启动服务端、客户端,服务端接受到的消息格式如下:

可以看到,此时TCP粘包的问题已经解决。

如果不是以“r”或“rn”结尾的字符串,可以使用DelimiterBasedFrameDecoder:自定义分隔符。

例如 :以“&”符号分隔。

pipeline.addLast(new DelimiterBasedFrameDecoder(1024,
                Unpooled.copiedBuffer("&".getBytes())));

也可以使用定长解析器:FixedLengthFrameDecoder

pipeline.addLast(new FixedLengthFrameDecoder(1024));

上述就是Netty解决TCP粘包/拆包的方案。