Netty之Protobuf编解码框架
Protobuf
Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于传统的XML序列化工具,它更小、更快、更简单。支持结构化数据一次可以到处使用,包括跨语言,通过代码生成工具可以自动生成不同语言版本的源代码,可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。
其支持定义可选、必选字段。
使用案例
添加依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
下载Protobuf
网址:https://developers.google.com/protocol-buffers/
google由于国内不能打开,所以有需要可以找我
解压:需要的就是这个protoc.exe可执行文件,需要用它根据配置生成需传输对象的代码。
生成类配置文件
我这边需要生成两个类,所以需要两个配置文件。
SubscribeReq.proto
syntax = "proto2"; //协议版本
package netty; //包名,其他 proto 可在此包下引用此 proto 的时候
option java_package = "com.liusy.protobuf"; //生成类的包名,注意:会在指定路径下按照该包名的定义来生成文件夹
option java_outer_classname = "SubscribeReqProto"; //生成类的类名
message SubscribeReq{
required int32 subReqId = 1; //required必须字段
optional string userName = 2; //optional不必须字段
required string productName = 3;
repeated string address = 4; //数组字段,0或多个
}
SubscribeResp.proto
syntax = "proto2";
package netty;
option java_package = "com.liusy.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
required int32 subReqId = 1;
required int32 respCode = 2;
required string desc = 3;
}
此时需要在cmd中输入命令 proto.exe -I="proto文件所在路径" --java_out="输出路径" "proto文件详细路径" 命令生成类
我的目录是这样的
所以我这边的命令就是
./protoc.exe -I=./ --java_out=./ SubscribeReq.proto
执行命令之后,就会在自定义的目录生成在proto文件里定义的目录以及类
然后将代码复制到项目中
小测试
生成代码之后使用比较简单
public class TestSubscribeReqProto {
//编码
private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
return req.toByteArray();
}
//解码
private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqId(1);
builder.setUserName("tom");
builder.setProductName("表");
return builder.build();
}
public static void main(String[] args) throws InvalidProtocolBufferException {
SubscribeReqProto.SubscribeReq subscribeReq = createSubscribeReq();
System.out.println("before:"+subscribeReq.toString());
SubscribeReqProto.SubscribeReq decode = decode(encode(subscribeReq));
System.out.println("afterDecoder:"+decode.toString());
System.out.println(subscribeReq.equals(decode));
}
}
运行结果:
结合Netty使用
客户端:
public class SubServer {
private static final Log logger = LogFactory.getLog(SubServer.class);
public static void main(String[] args) {
new SubServer().start();
}
private void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//半包处理
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//需要解码的目标类
pipeline.addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new SubServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = server.bind(8080).sync();
logger.info("服务器端启动成功");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("服务器端启动失败",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
ProtobufDecoder只负责解码,如果想要处理半包信息,Netty提供了ProtobufVarint32FrameDecoder类,以及添加长度域的类ProtobufVarint32LengthFieldPrepender。也可以使用上一篇说到的LengthFieldBasedFrameDecoder和LengthFieldPrepender。
服务端处理器:
public class SubServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//由于ProtobufDecoder已经进行了解码,所以这边进行类型强转就行
SubscribeReqProto.SubscribeReq subscribeReq = (SubscribeReqProto.SubscribeReq) msg;
System.out.println("username:"+subscribeReq.getUserName()+"nproductName:"+subscribeReq.getProductName());
ctx.writeAndFlush(resp(subscribeReq.getSubReqId()));
}
/**
*构建response返回给客户端
*/
public SubscribeRespProto.SubscribeResp resp(int id) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqId(id);
builder.setRespCode(200);
builder.setDesc("服务端已接收到信息");
return builder.build();
}
}
客户端:
public class SubClient {
private static final Log logger = LogFactory.getLog(SubClient.class);
public static void main(String[] args) {
new SubClient().start();
}
private void start() {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap client = new Bootstrap();
client.group(group)
.option(ChannelOption.TCP_NODELAY,true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new SubClientHandler());
}
});
ChannelFuture future = client.connect(new InetSocketAddress("localhost", 8080)).sync();
logger.info("客户端已启动");
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("客户端启动失败",e);
} finally {
group.shutdownGracefully();
}
}
}
客户端处理器:
public class SubClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
*客户单启动时构建请求消息发送给服务端
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqId(1);
builder.setProductName("电脑");
builder.setUserName("tom来自客户端");
ctx.writeAndFlush(builder.build());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeRespProto.SubscribeResp subscribeResp = (SubscribeRespProto.SubscribeResp) msg;
System.out.println(subscribeResp.getDesc());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
客户端在启动后发送一个消息给服务端
先后启动服务端,客户端,运行结果如下:
服务端:
客户端:
上述就是Google的Protobuf编解码框架的使用。更多内容敬请关注公众号。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- springmvc之异常处理ResponseStatusExceptionResolver
- Java矩阵快速幂实现
- 走近STL -- 你好,List
- 我能看懂的MakeFile(自命名,多文件,多目标)
- Posix信号量与cond条件变量,到底该选谁?
- 信号量--System V信号量 与 Posix信号量
- 文件空间映射mmap()函数(是什么,为什么,怎么用)
- C++下shm共享内存模块
- 基于TypeScript封装Axios笔记(九)
- springmvc之SessionAttributes注解所引发的异常
- 【tensorflow2.0】处理文本数据-imdb数据
- springmvc之异常处理DefaultHandlerExceptionResolver
- springmvc之返回json类型的数据给前端
- springmvc之mvc:view-controller标签设置可以直接访问的视图
- exec族