高性能NIO框架Netty-对象传输
上篇文章高性能NIO框架Netty入门篇我们对Netty做了一个简单的介绍,并且写了一个入门的Demo,客户端往服务端发送一个字符串的消息,服务端回复一个字符串的消息,今天我们来学习下在Netty中怎么使用对象来传输数据。
上篇文章中传输字符串我们用的是框架自带的StringEncoder,StringDecoder编解码器,现在想要通过对象来传输数据,该怎么弄呢?
既然StringEncoder和StringDecoder可以传输字符串,我们来看看这2个类的源码不就知道它们到底做了一些什么工作。
StringEncoder
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
// TODO Use CharsetEncoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringEncoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
}
通过继承MessageToMessageEncoder,重写encode方法来进行编码操作,就是将字符串进行输出即可。
StringDecoder
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
// TODO Use CharsetDecoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringDecoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
}
继承MessageToMessageDecoder,重写decode方法,将ByteBuf数据直接转成字符串进行输出,解码完成。
通过上面的源码分析,我们发现编解码的原理无非就是在数据传输前进行一次处理,接收后进行一次处理,在网络中传输的数据都是字节,我们现在想要传PO对象,那么必然需要进行编码和解码2个步骤,我们可以自定义编解码器来对对象进行序列化,然后通过ByteBuf的形式进行传输, 传输对象需要实现java.io.Serializable接口。
首先我们定义一个传输对象,实现序列化接口,暂时先定义2个字段,一个ID,用来标识客户端,一个内容字段,代码如下:
public class Message implements Serializable {
private static final long serialVersionUID = -7543514952950971498L;
private String id;
private String content;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
传输对象定好后,定义对象的编解码器。
对象编码器
将对象序列化成字节,通过ByteBuf形式进行传输,ByteBuf是一个byte存放的缓冲区,提供了读写操作。
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
byte[] datas = ByteUtils.objectToByte(message);
out.writeBytes(datas);
ctx.flush();
}
}
对象解码器
接收ByteBuf数据,将ByteBuf反序列化成对象
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object obj = ByteUtils.byteToObject(ByteUtils.read(in));
out.add(obj);
}
}
将上篇文章中服务端的编解码器改成对象编解码器:
public class ImServer {
public void run(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//实体类传输数据,jdk序列化
ch.pipeline().addLast("decoder", new MessageDecoder());
ch.pipeline().addLast("encoder", new MessageEncoder());
ch.pipeline().addLast(new ServerPoHandler());
//字符串传输数据
/*ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ServerStringHandler());*/
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = bootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
接下来编写服务端的消息处理类:
public class ServerPoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Message message = (Message) msg;
System.err.println("server:" + message.getId());
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
服务端改造好了之后,就要改造客户端了,同样的道理,客户端和服务端的编解码器都要一致才行。
客户端连接时指定对象编解码器和对象消息处理类,代码如下:
public class ImConnection {
private Channel channel;
public Channel connect(String host, int port) {
doConnect(host, port);
return this.channel;
}
private void doConnect(String host, int port) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//实体类传输数据,jdk序列化
ch.pipeline().addLast("decoder", new MessageDecoder());
ch.pipeline().addLast("encoder", new MessageEncoder());
ch.pipeline().addLast(new ClientPoHandler());
//字符串传输数据
/*ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ClientStringHandler());*/
}
});
ChannelFuture f = b.connect(host, port).sync();
channel = f.channel();
} catch(Exception e) {
e.printStackTrace();
}
}
}
客户端消息处理类:
/**
* 当编解码器为实体对象时时用来接收数据
* @author yinjihuan
*
*/
public class ClientPoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Message message = (Message) msg;
System.out.println("client:" + message.getContent());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
客户端启动类也需要改造,将发送字符串的消息变成对象消息
public class ImClientApp {
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 2222;
Channel channel = new ImConnection().connect(host, port);
//对象传输数据
Message message = new Message();
message.setId(UUID.randomUUID().toString().replaceAll("-", ""));
message.setContent("hello yinjihuan");
channel.writeAndFlush(message);
//字符串传输数据
//channel.writeAndFlush("yinjihuan");
}
}
源码参考:https://github.com/yinjihuan/netty-im
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 树义带你学 Prometheus(五):Prometheus 的关键概念
- 树义带你学 Prometheus(六):Spring Boot Actuator 实现应用监控
- 树义带你学 Prometheus(七):SpringBoot 实现自定义指标监控
- 布尔型盲注的PY交易
- 一文详解「队列」,手撸队列的3种方法!
- 代理模式
- Swift:Lable 高度计算误差
- 基于python检查SSL证书到期情况代码实例
- Python搭建Keras CNN模型破解网站验证码的实现
- PyCharm 在Windows的有用快捷键详解
- Python 自动化测试(三): pytest 参数化测试用例构建
- 基于Android平台实现拼图小游戏
- kotlin项目加入Glide图片加载库并使用GlideApp的方法
- Android实现百分比下载进度条效果
- 实验2 OpenGL交互