Netty之Protobuf​编解码框架

时间:2022-07-24
本文章向大家介绍Netty之Protobuf​编解码框架,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Protobuf

Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于传统的XML序列化工具,它更小、更快、更简单。支持结构化数据一次可以到处使用,包括跨语言,通过代码生成工具可以自动生成不同语言版本的源代码,可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。

其支持定义可选、必选字段。

使用案例

添加依赖

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.8.0</version>
</dependency>

下载Protobuf

网址:https://developers.google.com/protocol-buffers/

google由于国内不能打开,所以有需要可以找我

解压:需要的就是这个protoc.exe可执行文件,需要用它根据配置生成需传输对象的代码。

生成类配置文件

我这边需要生成两个类,所以需要两个配置文件。

SubscribeReq.proto

syntax = "proto2";  //协议版本
package netty;      //包名,其他 proto 可在此包下引用此 proto 的时候
option java_package = "com.liusy.protobuf"; //生成类的包名,注意:会在指定路径下按照该包名的定义来生成文件夹
option java_outer_classname = "SubscribeReqProto"; //生成类的类名

message SubscribeReq{
    required int32 subReqId = 1;  //required必须字段
    optional string userName = 2; //optional不必须字段
    required string productName = 3;
    repeated string address = 4; //数组字段,0或多个
}

SubscribeResp.proto

syntax = "proto2";
package netty;
option java_package = "com.liusy.protobuf";
option java_outer_classname = "SubscribeRespProto";

message SubscribeResp{
     required int32 subReqId = 1;
     required int32 respCode = 2;
     required string desc = 3;
}

此时需要在cmd中输入命令 proto.exe -I="proto文件所在路径" --java_out="输出路径" "proto文件详细路径" 命令生成类

我的目录是这样的

所以我这边的命令就是

 ./protoc.exe -I=./ --java_out=./ SubscribeReq.proto
 

执行命令之后,就会在自定义的目录生成在proto文件里定义的目录以及类

然后将代码复制到项目中

小测试

生成代码之后使用比较简单

public class TestSubscribeReqProto {

    //编码
    private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
        return req.toByteArray();
    }

    //解码
    private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
        return SubscribeReqProto.SubscribeReq.parseFrom(body);
    }

    private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqId(1);
        builder.setUserName("tom");
        builder.setProductName("表");
        return builder.build();
    }

    public static void main(String[] args) throws InvalidProtocolBufferException {
        SubscribeReqProto.SubscribeReq subscribeReq = createSubscribeReq();
        System.out.println("before:"+subscribeReq.toString());
        SubscribeReqProto.SubscribeReq decode = decode(encode(subscribeReq));
        System.out.println("afterDecoder:"+decode.toString());
        System.out.println(subscribeReq.equals(decode));

    }
}

运行结果:

结合Netty使用

客户端:

public class SubServer {

    private static final Log logger = LogFactory.getLog(SubServer.class);

    public static void main(String[] args) {
        new SubServer().start();
    }

    private void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //半包处理
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            //需要解码的目标类
                            pipeline.addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new SubServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);

            ChannelFuture future = server.bind(8080).sync();
            logger.info("服务器端启动成功");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("服务器端启动失败",e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

ProtobufDecoder只负责解码,如果想要处理半包信息,Netty提供了ProtobufVarint32FrameDecoder类,以及添加长度域的类ProtobufVarint32LengthFieldPrepender。也可以使用上一篇说到的LengthFieldBasedFrameDecoder和LengthFieldPrepender。

服务端处理器:

public class SubServerHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //由于ProtobufDecoder已经进行了解码,所以这边进行类型强转就行
        SubscribeReqProto.SubscribeReq subscribeReq = (SubscribeReqProto.SubscribeReq) msg;
        System.out.println("username:"+subscribeReq.getUserName()+"nproductName:"+subscribeReq.getProductName());
        ctx.writeAndFlush(resp(subscribeReq.getSubReqId()));
    }


    /**
    *构建response返回给客户端
    */
    public SubscribeRespProto.SubscribeResp resp(int id) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqId(id);
        builder.setRespCode(200);
        builder.setDesc("服务端已接收到信息");
        return builder.build();
    }
}

客户端:

public class SubClient {
    private static final Log logger = LogFactory.getLog(SubClient.class);

    public static void main(String[] args) {
        new SubClient().start();
    }

    private void start() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            pipeline.addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new SubClientHandler());
                        }
                    });
            ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
            logger.info("客户端已启动");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("客户端启动失败",e);
        } finally {
            group.shutdownGracefully();
        }

    }

}

客户端处理器:

public class SubClientHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    
    /**
    *客户单启动时构建请求消息发送给服务端
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqId(1);
        builder.setProductName("电脑");
        builder.setUserName("tom来自客户端");
        ctx.writeAndFlush(builder.build());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeRespProto.SubscribeResp subscribeResp = (SubscribeRespProto.SubscribeResp) msg;
        System.out.println(subscribeResp.getDesc());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

客户端在启动后发送一个消息给服务端

先后启动服务端,客户端,运行结果如下:

服务端:

客户端:

上述就是Google的Protobuf编解码框架的使用。更多内容敬请关注公众号。