Tomcat NIO(10)-IO线程-关键类
时间:2022-07-26
本文章向大家介绍Tomcat NIO(10)-IO线程-关键类,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在上一篇文章里我们主要介绍了 tomcat io 线程的 overall 调用流程以及关键类SocketProcessor 和 ConnectionHandler 的核心逻辑总结,这里我们主要来介绍剩余其它的核心类 AbstractProcessorLight,Http11Processor,CoyoteAdapter。
AbstractProcessorLight核心逻辑如下:
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]");
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]");
}
}
if (dispatches == null || !dispatches.hasNext()) {
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
return state;
}
- 该类的核心方法为 process() ,会被在上一篇文章之中介绍的 ConnectionHandler 对象实例的 process() 方法调所用。
- 该方法根据不同的 socket 事件和是否采用异步处理来进行不同的调用,返回期望的 SocketState 状态,这里我们只对非异步的正常调用介绍。
- 对于非异步的正常调用下,SocketEvent 为 OPEN_READ ,进入Http11Processor 实例的 service() 方法。
- 对于其他未知的 SocketEvent 事件来说,返回给 ConnectionHandler 实例的SocketState 为 CLOSED 。根据以前文章,这样的结果会被给 SocketProcessor 关闭原始 socket 。
Http11Processor的核心代码逻辑如下:
public AbstractProcessor(Adapter adapter) {
this(adapter, new Request(), new Response());
}
//Http11Processor
public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
super(adapter);
this.protocol = protocol;
httpParser = new HttpParser(protocol.getRelaxedPathChars(),
protocol.getRelaxedQueryChars());
inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
protocol.getRejectIllegalHeaderName(), httpParser);
request.setInputBuffer(inputBuffer);
outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
response.setOutputBuffer(outputBuffer);
// Create and add the identity filters.
inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new IdentityOutputFilter());
// Create and add the chunked filters.
inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new ChunkedOutputFilter());
// Create and add the void filters.
inputBuffer.addFilter(new VoidInputFilter());
outputBuffer.addFilter(new VoidOutputFilter());
// Create and add buffered input filter
inputBuffer.addFilter(new BufferedInputFilter());
// Create and add the chunked filters.
//inputBuffer.addFilter(new GzipInputFilter());
outputBuffer.addFilter(new GzipOutputFilter());
pluggableFilterIndex = inputBuffer.getFilters().length;
}
public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) {
// Parsing the request header
try {
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (protocol.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) {
socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString("http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString("http11processor.fallToDebug");
case INFO:
log.info(message, t);
break;
case DEBUG:
log.debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
// Has an upgrade been requested?
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}
if (foundUpgrade) {
// Check the protocol
String requestedProtocol = request.getHeader("Upgrade");
UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade");
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);
InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
socketWrapper, getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
}
if (getErrorState().isIoAllowed()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
} catch (HeadersTooLargeException e) {
log.error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}
if (!protocol.getDisableUploadTimeout()) {
int connectionTimeout = protocol.getConnectionTimeout();
if(connectionTimeout > 0) {
socketWrapper.setReadTimeout(connectionTimeout);
} else {
socketWrapper.setReadTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
sendfileState = processSendfile(socketWrapper);
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || protocol.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
- Http11Processor 是 AbstractProcessor 的实现子类,构造函数会创建以前文章中介绍的 TomcatRequest/TomcatResponse/Http11InputBuffer/Http11OutBuffer 对象,设置它们的关联关系,以及关联 CoyoteAdapter 对象。
- Http11Processor 的核心方法是 service 方法,整体代码比较长,我们着重分析重点。
- Service 方法会去初始化以前文章中介绍的 Http11InputBuffer 和 Http11OutputBuffer 对象实例,用来解析请求行,请求头,写入数据等等。
- 首先会去利用 Http11InputBuffer 对象实例的 parseRequestLine() 方法解析请求行,如果请求行没有解析完(例如client没有发完数据),那么返回 SocketState.LONG 的状态。根据上一篇文章, ConnectionHanlder 如果发现返回 LONG 状态,会对 socket 包装对象去注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件发送,从而等待 client 继续发送数据。同时并不会移除原始 socket 和处理类 Http11Processor 的关联关系,也不去回收 Http11Processor 实例,以便保持现有状态(已经解析的数据),当 client 再次发送数据的时候可以继续处理。
- 再利用 Http11InputBuffer.parseHeaders() 方法解析请求头,如果请求头没有没有解析完(client没有发完数据),则处理方式和上一步解析请求行一样。
- 该方法中会有一些协议 upgrade 的处理(例如websocket),我们不在这里详细展开。
- 当请求头和请求行完全解析完毕时,会调用 CoyoteAdapter.service() 方法,该方法会通过 servlet container 调用标准 servlet API 。
- Servlet API 正常调用完毕,对于非异步请求回去调用 endRequest() 方法表示结束。在其内部用 Http11InputBuffer.endRequest() 结束请求,用 Http11OutputBuffer.end() 将剩余 response 数据发送到 client 端。
- 同时对于非异步模式下的 servlet 请求,还会去调用 Http11InputBuffer.nextRequest() 方法和 Http11OutputBuffer.nextRequest() 方法来回收两个实例,以便后续重用,可以提高效率。
- 对于非异步请求正常结束后,返回的 socket 状态是 SocketState.OPEN 。根据上一篇文章介绍的 ConnectionHandler 对象, OPEN 则表示该连接为长连接,不关闭原始 socket 。所以在关联的 Map 中移除 socket 和 Http11Processor 的对应关系,释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续去监听 client 端可读事件,从而结束当前请求,为下一个请求做准备。
CoyoteAdapter的核心代码逻辑如下:
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
request.setResponse(response);
response.setRequest(request);
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// Parse and set Catalina and configuration specific
// request parameters
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);
if (request.isAsyncCompleting() && error.get()) {
// Connection will be forcibly closed which will prevent
// completion happening at the usual point. Need to trigger
// call to onComplete() here.
res.action(ActionCode.ASYNC_POST_PROCESS, null);
async = false;
}
// Access log
if (!async && postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
Context context = request.getContext();
Host host = request.getHost();
// If the context is null, it is likely that the endpoint was
// shutdown, this connection closed and the request recycled in
// a different thread. That thread will have updated the access
// log so it is OK not to update the access log here in that
// case.
// The other possibility is that an error occurred early in
// processing and the request could not be mapped to a Context.
// Log via the host or engine in that case.
long time = System.currentTimeMillis() - req.getStartTime();
if (context != null) {
context.logAccess(request, response, time, false);
} else if (response.isError()) {
if (host != null) {
host.logAccess(request, response, time, false);
} else {
connector.getService().getContainer().logAccess(
request, response, time, false);
}
}
}
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
if (!async) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
}
- CoyoteAdapter 的核心方法是 service 方法,整体代码比较长,我们着重分析重点。
- 该方法会去用 tomcat 的 request 和 response 创建 servlet 的标准 request 和 response ,并设置其关联关系,即把 tomcat request 关联到 servlet request ,把 tomcat response 关联到 servlet response 。
- 通过 servlet container 调用标准 servlet API,connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)
- 如果不是异步请求,完成servlet API后,通过HttpServletRequest.finishRequest() 方法调用和HttpServletResponse.finishResponse() 方法调用结束当前请求和响应。
- 最后通过 HttpServletRequest.recycle() 调用和 HttpServletResponse.recycle() 调用来回收请求和响应,以便后面可以重用提高效率。
目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程中的读写。
- Golang 值得注意的地方
- MySQL数值类型在binlog中需要注意的细节(r12笔记第69天)
- WordPress评论滑动/拉链解锁myQaptcha修改为自动提交的方法
- MySQL root用户登录的几个小问题(r12笔记第67天)
- Java实现生产者消费者的两种方式(r12笔记第66天)
- Golang语言的函数调用信息
- mysqldump的一点使用总结(r12笔记第81天)
- 转-Golang语言Interface漫谈
- WordPress导航菜单图标字体插件font awesome 4 menus纯代码版
- Oracle 12c远程克隆PDB的问题及修复(r12笔记第78天)
- Oracle表中含有255列以上时需要注意的(r12笔记第77天)
- Golang语言--资源自动回收技术
- Oracle 12.2中的一个参数说明(r12笔记第76天)
- Golang语言社区--【游戏服务器知识】多线程并发
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- python pyecharts数据可视化 玫瑰图、柱形图、饼图、环图
- Python opencv图像处理基础总结(三) 图像直方图 直方图应用 直方图反向投影
- Python opencv图像处理基础总结(四) 模板匹配 图像二值化
- python pyecharts数据可视化 词云图 仪表盘 水球图
- python jupyter notebook配置 更改默认工作目录 更换皮肤主题 代码字体 大小
- 关于直播卖货系统平台在微信浏览器中音视频播放的问题
- python爬虫 scrapy爬虫框架的基本使用
- Python opencv图像处理基础总结(五) 图像金字塔 图像梯度 Canny算法边缘提取
- python scrapy爬虫练习(1) 爬取豆瓣电影top250信息
- python爬虫 senlenium爬取拉勾网招聘数据
- Python opencv图像处理基础总结(六) 直线检测 圆检测 轮廓发现
- 简单又强大的pandas爬虫 利用pandas库的read_html()方法爬取网页表格型数据
- python pyecharts数据可视化 折线图 箱形图
- Python爬虫 selenium自动化 利用搜狗搜索爬取微信公众号文章信息
- python 办公自动化系列 (1) 从22053条数据中统计断网次数并计算平均断网时间