Tomcat NIO(9)-IO线程-Overall流程和关键类
时间:2022-07-25
本文章向大家介绍Tomcat NIO(9)-IO线程-Overall流程和关键类,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。
对于tomcat io线程我们主要介绍:
- IO 线程 overall 流程
- IO 线程主要涉及的类以及作用
IO线程overall流程
对于 tomcat io 线程来说,overall 调用序列图如下 :
- SocketProcessor.run()--> ConnectionHandler.process()--> AbstractProcessorLight.process()--> Http11Processor.service--> CoyoteAdapter.service() 一直到 container 调用标准 servlet API。
- 涉及的关键类有 SocketProcessor,ConnectionHandler AbstractProcessorLight, Http11Processor,CoyoteAdapter
SocketProcessor的核心代码逻辑如下
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
Poller poller = NioEndpoint.this.poller;
if (poller == null) {
socketWrapper.close();
return;
}
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(key, socketWrapper);
}
} else if (handshake == -1 ) {
poller.cancelledKey(key, socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
poller.cancelledKey(key, socketWrapper);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
poller.cancelledKey(key, socketWrapper);
} finally {
socketWrapper = null;
event = null;
if (running && !paused && processorCache != null) {
processorCache.push(this);
}
}
}
- 首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。
- 如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。
- 如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。
- 最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。
- 另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。
ConnectionHandler的核心代码逻辑如下
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.process",
wrapper.getSocket(), status));
}
if (wrapper == null) {
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
}
if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {
return SocketState.OPEN;
}
if (processor != null) {
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
return SocketState.CLOSED;
}
ContainerThreadMarker.set();
try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
UpgradeProtocol upgradeProtocol =
getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
}
return SocketState.CLOSED;
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
processor));
}
}
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
ByteBuffer leftOverInput = processor.getLeftoverInput();
if (upgradeToken == null) {
UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
wrapper.unRead(leftOverInput);
connections.put(socket, processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));
}
return SocketState.CLOSED;
}
} else {
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
release(processor);
processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));
}
wrapper.unRead(leftOverInput);
wrapper.setUpgraded(true);
connections.put(socket, processor);
if (upgradeToken.getInstanceManager() == null) {
httpUpgradeHandler.init((WebConnection) processor);
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.init((WebConnection) processor);
} finally {
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {
state = SocketState.LONG;
}
}
}
}
} while ( state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else if (state == SocketState.SUSPENDED) {
// Don't add sockets back to the poller.
// The resumeProcessing() method will add this socket
// to the poller.
} else {
connections.remove(socket);
if (processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
if (instanceManager == null) {
httpUpgradeHandler.destroy();
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.destroy();
} finally {
try {
instanceManager.destroyInstance(httpUpgradeHandler);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
} else {
release(processor);
}
}
return state;
} catch(java.net.SocketException e) {
getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);
} catch (ProtocolException e) {
getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);
}
catch (OutOfMemoryError oome) {
getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
} finally {
ContainerThreadMarker.clear();
}
connections.remove(socket);
release(processor);
return SocketState.CLOSED;
}
- 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
- 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
- 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
- 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
- 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
- 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
- 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
- 对于 SocketState.LONG 的返回值调用如下: protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // read socket.registerReadInterest(); } } //SocketWrapper public void registerReadInterest() { getPoller().add(this, SelectionKey.OP_READ); }
- longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
- 对于 SocketState.OPEN 的返回值调用: connections.remove(socket); release(processor); wrapper.registerReadInterest();
- SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
- 在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。
目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。
- ChIP-seq实战分析
- PHP中9大缓存技术总结
- servlet中request等中文乱码问题
- Ofbiz模块加载机制即创建独立模块(脱离热部署)
- ofbiz连接mysql并创建独立数据库
- Angular+servlet java实现前后端数据交互
- servlet容器tomcat和jetty的简单使用
- activiti学习笔记(一) 获取流程配置实例
- ofbiz 服务引擎(一) controller中服务的调用解析
- ofbiz实体引擎(九) 多租户
- SparkStreaming入门
- 拒绝重复造轮子,用composer搞自己的框架(2)
- 拒绝重复造轮子,用composer搞自己的框架(1)
- 我的第一次ChIP-seq实践
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Linux xargs grep zgrep命令
- r语言空间可视化绘制道路交通安全事故地图
- 小知识:Oracle RAC添加服务名实现单节点访问
- Python去掉图片四周纯色边框
- R语言空间可视化:绘制英国脱欧投票地图
- CC攻击防御的一种方法
- 机器学习笔记之梯度下降算法原理讲解
- 使用OpenCV在Python中进行图像处理
- R语言对布丰投针(蒲丰投针)实验进行模拟和动态可视化生成GIF动画
- Shiny-R语言轻松开发交互式web应用
- SPSS中的等级线性模型Multilevel linear models研究整容手术数据
- Python花式表白
- Python中基于网格搜索算法优化的深度学习模型分析糖尿病数据
- WordPress运行内存不足的解决方式
- ggplot2 |legend参数设置,图形精雕细琢