Netty高级篇
时间:2022-07-23
本文章向大家介绍Netty高级篇,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
本文将用netty实现一个简单的RPC框架。
一、什么叫RPC?
RPC,远程调用,就是A程序部署在1号机器上,B程序部署在2号机器上,A可以像调本地方法一样地去调用B程序,而不需要程序员额外地编写这个交互过程,这就叫RPC远程调用。dubbo、Ribbon、openfeign都是RPC框架。
二、RPC调用过程
- 请求:服务消费者发送请求 ---> 编码 ---> 通过网络发送 ---> 服务提供者接收请求 ---> 解码 ---> 处理请求
- 响应:服务提供者处理完请求进行响应 ---> 编码 ---> 通过网络发送 ---> 消费者接收响应 ---> 解码 ---> 处理响应
这两个流程中,我们只需要关心发送请求和处理响应即可,中间那些过程对程序员来说都是透明的。所以,要实现RPC框架,要处理的就是封装中间那些步骤。
三、实现思路
假如现在我们需要调用服务提供者HelloServiceImpl的hello方法,调用流程如下:
- 客户端创建代理对象,通过Netty,远程连接服务端,将参数发送过去。
- 服务端收到参数,传给hello方法,接收到返回值,再发送给客户端。
四、代码实操
1、HelloService接口:
public interface HelloService {
String hello(String msg);
}
2、客户端:
- HelloServiceImpl:**
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if (StringUtils.isNotBlank(msg)) {
return"收到客户端消息:[" + msg + "]";
}
else return "收到客户端消息";
}
}
- NettyServerHandler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg = " + msg);
if (msg.toString().startsWith("HelloService#hello#")){
String param = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);
String result = new HelloServiceImpl().hello(param);
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- NettyServer:
public class NettyServer {
public static void startService(String host, int port){
startServer0(host, port);
}
private static void startServer0(String host, int port){
EventLoopGroup bossgroup = new NioEventLoopGroup(1);
EventLoopGroup workergroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
System.out.println("provider is in active");
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
}
- ServerBootStrap
public class ServerBootStrap {
public static void main(String[] args){
NettyServer.startService("127.0.0.1", 8848);
}
}
3、客户端:
- NettyClientHandler:
/**
* 流程:业务代码远程调用某个方法,被代理对象拦截,进而执行call方法,
* call方法请求服务端,然后在waiting着,服务器处理完请求将响应返回给另外一个方法,
* 即read方法,read方法收到响应后,所以就应该唤醒call方法中等待的线程,继续往下执行。
* @author: zhu
* @date: 2020/8/22 16:09
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String param;
/**
* 被代理对象调用,发送数据给服务器,然后等待被唤醒
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
// 将参数发送给服务端
context.writeAndFlush(param);
// 发送之后就等待被唤醒
wait();
// 被唤醒的时候read方法已经拿到result了,这里直接return即可
return result;
}
/**
* 与服务器连接创建成功后被调用
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 收到服务器数据后被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify(); // 唤醒call方法
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void setParam(String param) {
this.param = param;
}
}
- NettyClient :
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler clientHandler;
/**
* 初始化
*/
private static void initClient(){
clientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
try {
bootstrap.connect("127.0.0.1", 8848).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object getBean(final Class<?> serviceClass, final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[] {serviceClass}, (proxy, method, args) -> {
if (clientHandler == null){
initClient();
}
// 设置要发给服务器端的参数
clientHandler.setParam(providerName + args[0]);
return executorService.submit(clientHandler).get();
});
}
}
- ClientBootStrap:
public class ClientBootStrap {
// 定义协议头
private static final String providerName = "HelloService#hello#";
public static void main(String[] args){
// 创建消费者
NettyClient customer = new NettyClient();
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
String result = service.hello("扣你吉瓦");
System.out.println(result);
}
}
先启动ServerBootStrap,然后启动ClientBootStrap去调用服务端的hello方法,最后可以成功拿到返回结果。
- 作业调度框架 Quartz.NET 2.0 beta 发布
- 系统捕获异常并发送到服务器
- 当调用GetAuthorizationGroups() 的错误-“试图访问卸载的应用程序域“(Exception from HRESULT: 0x80131014)解决方案
- WCF 4.0路由服务Routing Service
- ExpandableListView简单应用及listview模拟ExpandableListView
- 文件句柄与文件描述符
- android GifView分享
- VAE、GAN、Info-GAN:全解深度学习三大生成模型
- android获取设备唯一标示
- 如果正确读取SQL Server中的扩展事件?
- android自定义xmls文件属性
- 分布式系统中的RPC请求经常出现乱序的情况 写一个算法来将一个乱序的序列保序输出
- jsoup详解
- 用LogParser对IIS 日志进行分析
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法