Netty 主从多线程
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. https://netty.io
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。
Nonblocking I/O
NIO,非阻塞 IO。对比于BIO(Blocking I/O,阻塞IO),NIO 的并发性能得到了很大提高。
常见的五种 IO 模型对比
- 同步阻塞 IO(BIO)阻塞整个步骤。适用于少连接且延迟低的场景。
- 同步非阻塞 IO(NIO),阻塞业务处理但不阻塞数据接收。适用于高并发且处理简单的场景。
- 多路复用 IO,数据请求和业务处理是两个分开进行处理。
- 信号驱动 IO,主要用在嵌入式开发,不参与讨论。
- 异步 IO,数据请求和业务处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。
主从多线程
Netty 是典型的 Reator 模型结构。
Reactor 模式是基于事件驱动开发的,其核心组成部分包括 Reactor 和线程池。其中 Reactor 负责监听和分配事件,而线程池负责处理事件。
根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:
- 单线程模型 (单 Reactor 单线程)
- 多线程模型 (单 Reactor 多线程)
- 主从多线程模型 (多 Reactor 多线程)
什么是主从多线程
从一个主线程 NIO 线程池中选择一个线程(boss)作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程(worker)负责后续的业务处理工作。
示例代码
从开源项目中截取了一段 Netty 初始化代码片段。
private void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
// 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
}
})
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
// 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128);
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, port).sync();
ChannelFuture f2 = b.bind(host, port2).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println(String.format("occur exception when start server:", e));
} finally {
System.out.println(String.format("shutdown bossGroup and workerGroup"));
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
b.bind(host, port).sync() 和 b.bind(host, port2).sync() 同时绑定了两个 ip 和 端口。
运行结果
源代码知识点
NioEventLoopGroup
特殊的 EventExecutorGroup 接口类,它允许注册已处理的通道,以便在事件循环期间进行后续选择。
ServerBootstrap
Bootstrap 子类,可轻松引导 ServerChannel。
NioServerSocketChannel
一个 ServerSocketChannel 接口的实现类,它使用基于NIO选择器的实现来接受新连接。
ChannelFuture
异步 Channel I / O操作的结果,未完成或已完成。
代码调试
new NioEventLoopGroup()
MultithreadEventExecutorGroup.java 初始化实例。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
传入参数:
- nThreads 此实例将使用的线程数。
- executor 需要执行的 Runable 任务对象 。
- choicerFactory 创建 EventExecutorChooser 对象的工厂类。
- args 参数将传递给每个 newChild 调用。
new ServerBootstrap()
ServerBootstrap 是 Netty 服务端应用开发的入口。
ServerBootstrap 的配置:
- group 方法,设置初始化的主从"线程池"。
- channel 方法,设置通道类型。服务端:NioServerSocketChannel。
- ...
b.bind(host, port).sync()
绑定并侦听某个端口
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
小结
突如其来的三天小长假,彻底打乱了生活节奏 。
一天搬家、一天休息、一天加班。眼见着明天周日应该好好学习知识了,迎来的却是正常班。
周六熬夜写文章,然后明天早起上班去。就这样。
- WCF技术剖析之二:再谈IIS与ASP.NET管道
- ASP.NET MVC5+EF6+EasyUI 后台管理系统(61)-如何使用框架来开发
- (保存)C#基础概念二十五问
- flash留言本
- 这或许是2017年最有设计感的“无人车”推文
- 世界首条光伏高速公路:能发电,能充电,能化雪
- WCF技术剖析之四:基于IIS的WCF服务寄宿(Hosting)实现揭秘
- 简练的视图模型 ViewModel
- 提供智能服务?先迈过数字信任这个坎
- 《天弋夺宝》—01飞船的控制
- ASP.NET MVC5+EF6+EasyUI 后台管理系统(63)-Excel导入和导出
- 核心代码(未注释)
- 从科研角度谈“如何实现基于机器学习的智能运维”
- 用后台代码创建Storyboard
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 自己动手实现4大免费聊天机器人:小冰、图灵、腾讯、青云客
- Android Spinner下拉框的基本使用
- hadoop本地运行的两个案例。官方Grep案例、官方WordCount案例。
- 腾讯智能闲聊机器人详细开发教程
- 用PyTorch实现MNIST手写数字识别(非常详细)
- 手把手教你从零开始用Java写爬虫
- STM32 cjson的GBK/UTF-8/UNICODE转换、显示中文、GBK字库
- 都说Linux很重要,你会几个Linux命令?来看看这道面试题目。
- 使用VisualGDB将Keil项目导入VisualStudio
- 小白学图像 | Group Normalization详解+PyTorch代码
- 使用VisualGDB开发Keil MDK-ARM项目
- 保姆级教程:还愁不会搭建伪分布式吗?(其实很简单)
- 如何使用OpenCV RTMP直播推流
- Scrapy框架新手入门教程
- STM32 F4串口接收中断