Netty中序列化框架MessagePack的简单实现

时间:2022-06-20
本文章向大家介绍Netty中序列化框架MessagePack的简单实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

  MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下:

  1. 编解码高效,性能高;
  2. 序列化之后码流小
  3. 支持跨语言

MessagePack使用

1.依赖

  使用maven构建项目

<dependency>
	<groupId>org.msgpack</groupId>
	<artifactId>msgpack</artifactId>
	<version>0.6.12</version>
</dependency>

2.创建编码和解码器

编码器

/**
 * @param ctx 上下文
 * @param msg 需要编码的对象
 * @param out 编码后的数据
 */
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
	
	MessagePack msgpack = new MessagePack();
	// 对对象进行序列化
	byte[] raw = msgpack.write(msg);
	// 返回序列化的数据
	out.writeBytes(raw);
}

解码器

/**
 * @param ctx 上下文
 * @param msg 需要解码的数据
 * @param out 解码列表
 */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
	final byte[] array;
	final int length = msg.readableBytes();
	array = new byte[length];
	// 获取需要解码的字节数组
	msg.getBytes(msg.readerIndex(), array,0,length);
	MessagePack msgpack = new MessagePack();
	// 反序列化并将结果保存到了解码列表中
	out.add(msgpack.read(array));
}

3.客户端

EchoClient

/**
 * MsgPack 编解码器
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoClient {

	public static void main(String[] args) throws Exception {
		int port = 8080;
		if (args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			} catch (NumberFormatException e) {
				// 采用默认值
			}
		}
		new EchoClient().connector(port, "127.0.0.1",10);
	}

	public void connector(int port, String host,final int sendNumber) throws Exception {
		// 配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
							.option(ChannelOption.TCP_NODELAY, true)
							.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
							.handler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//这里设置通过增加包头表示报文长度来避免粘包
		            ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));
		            //增加解码器
		            ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
		            //这里设置读取报文的包头长度来避免粘包
		            ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
		            //增加编码器
		            ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
					// 4.添加自定义的处理器
					ch.pipeline().addLast(new EchoClientHandler(sendNumber));
				}
			});

			// 发起异步连接操作
			ChannelFuture f = b.connect(host, port).sync();
			// 等待客户端链路关闭
			f.channel().closeFuture().sync();
		}catch(Exception e){
			e.printStackTrace();
		}  finally {
			// 优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
	}

}

EchoClientHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *  	自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{


	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//UserInfo user = (UserInfo) msg;
		System.out.println("server receive the msgpack message :"+msg);
		//ctx.writeAndFlush(user);
		ctx.writeAndFlush(msg);
		
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close(); // 发生异常关闭链路
	}
}

4.服务端

EchoServer

/**
 * MsgPack 编解码器
 * 		服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServer {

	public void bind(int port) throws Exception {
		// 配置服务端的NIO线程组
		// 服务端接受客户端的连接
		NioEventLoopGroup bossGroup = new NioEventLoopGroup();
		// 进行SocketChannel的网络读写
		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
					.option(ChannelOption.SO_BACKLOG, 100)
					.handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
							// 添加msgpack的编码和解码器
							ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
							ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
							ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
							// 添加自定义的处理器
							ch.pipeline().addLast(new EchoServerHandler());

						}
					});

			// 绑定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			// 等待服务端监听端口关闭
			f.channel().closeFuture().sync();
		}catch(Exception e){
			e.printStackTrace();
		} finally {
			// 优雅退出,释放线程池资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) throws Exception {
		int port = 8080;
		if(args!=null && args.length > 0){
			try{
				port = Integer.valueOf(args[0]);
			}catch(NumberFormatException e){
				// 采用默认值
			}
		}
		new EchoServer().bind(port);
	}

}

EchoServerHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *  	自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{


	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//UserInfo user = (UserInfo) msg;
		System.out.println("server receive the msgpack message :"+msg);
		//ctx.writeAndFlush(user);
		ctx.writeAndFlush(msg);
		
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close(); // 发生异常关闭链路
	}
}

5.注意点(POJO)

消息类上加上注解Message,还有就是必须要有默认的无参构造器

/**
 * Msgpack 中必须添加@Message注解 及 无参构造方法
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
@Message
public class UserInfo {


	private String name;
	
	private int age;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	@Override
	public String toString() {
		return "UserInfo [name=" + name + ", age=" + age + "]";
	}
}

6.测试

服务端输出

server receive the msgpack message :["bobo烤鸭:0",0]
server receive the msgpack message :["bobo烤鸭:1",1]
server receive the msgpack message :["bobo烤鸭:2",2]
server receive the msgpack message :["bobo烤鸭:3",3]
server receive the msgpack message :["bobo烤鸭:4",4]
server receive the msgpack message :["bobo烤鸭:5",5]
server receive the msgpack message :["bobo烤鸭:6",6]
server receive the msgpack message :["bobo烤鸭:7",7]
server receive the msgpack message :["bobo烤鸭:8",8]
server receive the msgpack message :["bobo烤鸭:9",9]

客户端输出

Client receive the msgpack message :["bobo烤鸭:0",0]
Client receive the msgpack message :["bobo烤鸭:1",1]
Client receive the msgpack message :["bobo烤鸭:2",2]
Client receive the msgpack message :["bobo烤鸭:3",3]
Client receive the msgpack message :["bobo烤鸭:4",4]
Client receive the msgpack message :["bobo烤鸭:5",5]
Client receive the msgpack message :["bobo烤鸭:6",6]
Client receive the msgpack message :["bobo烤鸭:7",7]
Client receive the msgpack message :["bobo烤鸭:8",8]
Client receive the msgpack message :["bobo烤鸭:9",9]

至此Netty中就可以通过MessagePack来处理序列化的情况了~