Tomcat NIO(7)-Poller
在上一篇文章里我们主要介绍了 tomcat NIO 中的 acceptor 线程,其中包括了server 监听 socket 的初始化,端口绑定,acceptor 线程的启动,接受连接请求,将请求事件注册到 poller 线程。在这里我们主要介绍 poller 线程。
tomcat NIO 架构中会有 poller 线程,每一个 poller 实例都有一个 NIO selector对象,主要用于监测注册在原始 scoket 上的事件是否发生。每一个 poller 实例同时也是会拥有一个事件队列实例 SynchronizedQueue<PollerEvent>,用来存放发生的事件,一般该事件队列的 items 由前一篇文章介绍的 acceport 线程放入。对于 poller thread,主要包括如下 items:
- 启动 poller 线程
- 添加事件到事件队列
- 对原始 socket 注册事件
- poller 线程核心逻辑
- poller 线程的等待与唤醒
启动poller线程:
poller 线程的启动主要在以前文章中介绍的架构类 NioEndpoint 的 startInternal 方法中完成,相关核心源代码如下:
public void startInternal() throws Exception {
//here we ignore other code section which not related with poller in this method to save space
// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
- 本代码基于 tomcat v 9.0.21,发现其中 poller thread 数目始终为 1。
- 在tomcat 8 及以前的版本中,可以通过 pollerThreadCount 配置 poller thread 的数目。
添加事件到事件队列
该工作主要由 Poller 类的 register 方法和 addEvent 方法完成,核心源码如下:
private Selector selector
private AtomicLong wakeupCounter = new AtomicLong(0);
private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
- Poller 类有 SynchronizedQueue<PollerEvent> 队列,用来存放发生的事件。
- register() 方法一般由以前文章介绍的 acceptor 线程调用,对原始 socket 注册 open_read 事件,然后创建 OP_REGISTER 类型的 poller 事件,交由方法 addEvent() 处理。
- addEvent() 方法直接调用队列对象 offer() 方法,将 poller 事件加入队列之中。同时根据 wakeupCounter 增加之后的值是否为 0 来决定是否唤醒 selector(详细会在 poller 线程的等待与唤醒部分讲解)。
对原始socket注册事件
该工作主要由 Poller 对象的 events() 方法以及 PollerEvent 对象的 run() 方法完成,核心源码如下:
//Poller
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused && eventCache != null) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x);
}
}
return result;
}
//PollerEvent
public void run() {
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
try {
if (key == null) {
try {
socket.socketWrapper.close();
} catch (Exception ignore) {
}
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
} catch (Exception ignore) {}
}
}
}
- Poller 对象实例的 event() 方法会遍历实例中 SynchronizedQueue<PollerEvent> 事件队列中的所有 PollerEvent 事件对象,然后依次调用 PollerEvent 的 run() 方法。同时 reset 事件,以方便后续重用事件对象。
- PollerEvent 的 run() 方法会根据不同的 event 类型对原始 socket 去注册事件,当 event 为 OP_REGISTER 时,会利用 java NIO API 对原始 socket 去注册 OP_READ 事件,如果不是 OP_REGISTER 则追加当前的 event 事件。根据以前文章,在 acceptor 线程中建立连接之后注册到 Poller 线程中的事件就是OP_REGISTER , 所以是 acceptor 线程建立原始 scoket 连接,然后 Poller 线程对原始 socket 注册 OP_READ 事件。
Poller线程核心逻辑
该工作主要由 Poller 类的 run() 方法和 processKey() 方法以及之前文章介绍的AbstractEndpoint 的 processSocket() 方法完成,核心源码如下:
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid() && socketWrapper != null) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
if (sk.isReadable()) {
if (socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, socketWrapper);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
//AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
- run() 方法是一个循环来处理整个 poller 线程的逻辑。
- run() 方法会先调用前面介绍的 event() 方法来对队列中所有原始 socket 去注册事件。
- 然后再调用 NIO API selector.selectNow() 和 selector.select() 方法来监听是否有 event() 方法注册的事件发生(这里会涉及到 Poller 线程的阻塞与唤醒这种比较优雅的设计,详细在后面文章单独讲解)。
- 如果整个 poller 是关闭了,则做超时处理(关闭原始 socket),关闭 selector,跳出整个循环。
- 如果 selector 有检测到事件发生,则对所有发生的事件调用 poller 的 processKey() 方法。
- 在每次循环的最后调用 poller 的 timeout() 方法处理是否超时。
- 对于 poller 实例的 processKey() 方法会去检查 SelectionKey 的合理性,在可读可写的基础上去调用 AbstractEndpoint 的 processSocket() 方法。
- 对于 AbstractEndpoint 具体实现类对象之中的 processSocket() 方法,会利用 tomcat 的 io 线程池(通过调用getExecutor方法得到线程池)来运行 SocketProcessor对象实例的 run() 方法。也就是说把SocketWrapper 对象实例委托给 SocketProcessor实例的 run() 方法,在 tomcat io 线程池中运行。
目前先写到这里,下一篇文章里我们继续介绍 tomcat NIO 中 poller 线程的阻塞与唤醒。
- 《Python自然语言处理》答案第一、二章
- 【 关关的刷题日记49】 Leetcode 434. Number of Segments in a String
- 自然语言处理构建文本向量空间1.百科2.源代码3.参考:
- 小爬虫之爬取豆瓣电影排行榜1.技术路线2.任务3.分析4.运行结果5.源码
- Numpy 修炼之道 (5)—— 索引和切片
- 深入理解final关键字
- Numpy 修炼之道 (4)—— 基本运算操作
- 一些APT攻击案例分享
- 浅谈命令查询职责分离(CQRS)模式
- Numpy 修炼之道 (3)—— 数据类型
- 熔断器设计模式
- 树链剖分详解
- 洛谷P3379 【模板】最近公共祖先(LCA)(树链剖分)
- 学习使用Jieba1.Jieba2. 特点3.功能4.安装5.使用6.其他中文分词工具
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- CPU有个禁区,内核权限也无法进入!
- 分布式 | DBLE 3.20.07.0 来啦!
- C语言三剑客之《C陷阱与缺陷》一书精华提炼
- Linux进程间通信(上)之管道、消息队列实践
- FPGA上电时序
- 更新Kubernetes APIServer证书
- R语言连续时间马尔科夫链模拟案例 Markov Chains
- 如何用R语言在机器学习中建立集成模型?
- 从零开始Kubernetes Operator
- TiKV源码解析系列文章(二十)Region Split源码解析
- scrapy爬虫框架和selenium的使用:对优惠券推荐网站数据LDA文本挖掘
- 单性状动物模型矩阵形式计算BLUP值
- 如何计算一般配合力和特殊配合力
- 【29期】Java集合框架 10 连问,你有被问过吗?
- 学徒数据挖掘之谁说生存分析一定要按照表达量中位值或者平均值分组呢?