tomcat对AQS的扩展:使用LimitLatch控制连接数
LimitLatch是一个共享性质的锁,这里的共享概念来自于AQS,指的是不同的线程可以同时获取该锁。本文开始之前,首先我要纠正之前的文章《面试官:谈一谈java中基于AQS的并发锁原理》的一个错误,LimitLatch并不是JDK实现的,而是tomcat实现的。
LimitLatch简介
jdk中对AQS的扩展有一个CountDownLatch,Latch是一个阀门的意思,CountDownLatch创建了一个阀门,之后阻塞,等待所有线程都执行结束并且countdown之后,才会继续执行。而本文要介绍的LimitLatch则更像是java中的Semaphore,用于控制资源的使用。
下面我们看一下LimitLatch中锁的实现:
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();//定义了AtomicLong类型的count数量,每次获取锁之后会加1
if (!released && newCount > limit) {//是否超过limit的限制
// Limit exceeded
count.decrementAndGet();//获取失败后减1
return -1;//返回-1代表获取锁失败,这是就只能进入队列了
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();//释放锁的时候count数量减1
return true;
}
}
从上面的代码中我们可以看到,LimitLatch首先定义了一个limit,每次获取锁时都会累计获取成功的线程数量,如果大于limit,则获取成功等待入队,释放锁的时候线程数量会减1。
下面我将以tomcat的NIO2模式为例,看一下tomcat是如何使用LimitLatch来控制连接数的。
tomcat初始化
tomcat的Nio2EndPoint启动的时候,会创建LimitLatch,而LimitLatch中的limit,正是我们tomcat中配置的最大连接数。代码如下:
@Override
public void startInternal() throws Exception {
if (!running) {
allClosed = false;
running = true;
paused = false;
//省略部分代码
initializeConnectionLatch();//初始化LimitLatch
startAcceptorThread();
}
}
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());//根据配置的最大连接数初始化LimitLatch
}
return connectionLimitLatch;
}
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
获取和释放连接
LimitLatch初始化后,就可以对连接的获取和释放进行管理了。下面我们看一下Nio2Endpoint中的内部类Nio2Acceptor
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
protected int errorDelay = 0;
public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) {
super(endpoint);
}
@Override
public void run() {
// The initial accept will be called in a separate utility thread
if (!isPaused()) {
try {
countUpOrAwaitConnection();//已经达到了最大的连接数,则入队等待通知
} catch (InterruptedException e) {
// Ignore
}
//省略部分代码
} else {
state = AcceptorState.PAUSED;
}
}
@Override
public void completed(AsynchronousSocketChannel socket,
Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket
if (isRunning() && !isPaused()) {
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else {
// Accept again on a new thread since countUpOrAwaitConnection may block
getExecutor().execute(this);
}
if (!setSocketOptions(socket)) {//处理socket失败,关闭
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);//调用closeSocket,关闭socket
}
}
@Override
public void failed(Throwable t, Void attachment) {
if (isRunning()) {
if (!isPaused()) {
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else {
// Accept again on a new thread since countUpOrAwaitConnection may block
getExecutor().execute(this);
}
} else {
state = AcceptorState.PAUSED;
}
// We didn't get a socket
countDownConnection();
//省略部分代码
} else {
// We didn't get a socket
countDownConnection();
}
}
}
上面代码中,有几点说明:
1.如果连接数已经达到最大连接,则会调用countUpOrAwaitConnection方法,代码如下:
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait();//入队等待
}
可以看到,达到最大连接数之后,就会入队等待
2.连接初始化失败,会调用countDownConnection方法,而连接处理结束后会调用closeSocket(destroySocket也调用closeSocket),最终调用countDownConnection,代码如下:
protected long countDownConnection() {
if (maxConnections==-1) return -1;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();//最终调用LimitLatch的countDown方法,见下面代码
if (result<0) {
getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
}
return result;
} else return -1;
}
public long countDown() {
sync.releaseShared(0);//调用AQS中的releaseShared释放锁
long result = getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
}
return result;//返回count数量
}
public final boolean releaseShared(int arg) {//AQS中的代码
if (tryReleaseShared(arg)) {//见文中开头的LimitLatch的中的锁代码
doReleaseShared();
return true;
}
return false;
}
可见,socket获取失败或者处理结束后,都会调用LimitLatch中的释放锁流程。
总结
LimitLatch的使用跟Semaphore有点类似,像是一个限流器,tomcat使用它进行了最大连接数的控制,看了这篇文章,是不是对tomcat的参数server.tomcat.max-threads参数的使用原理有了一定了解呢?
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- Hadoop Mapper 阶段将数据直接从 HDFS 导入 Hbase
- 译文 | 量化投资教程:投资组合优化与R实践
- 浅谈 java 中构建可执行 jar 包的几种方式
- python 日志模块 logging 详解
- 基于堆实现的优先级队列:PriorityQueue 解决 Top K 问题
- explain 深入剖析 MySQL 索引及其性能优化指南
- 图文并茂详解 SQL JOIN
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- Hadoop MapReduce 二次排序原理及其应用
- MySQL Tips【Updating】
- Meltdown、Spectre攻击---CPU乱序执行和预测执行导致的安全问题
- WordPress 4.6远程代码执行漏洞(CVE-2016-10033)复现环境搭建指南
- 相似文档查找算法之 simHash 简介及其 java 实现
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 二叉树最小深度
- 一日一技:限定Python函数只能被特定函数调用
- 四种ABAP单元测试隔离(test isolation)技术
- Python使用对象方式获取字典的值
- Hive整合HBase实现数据同步
- [数据结构与算法] 盘点工作中常用的算法
- MyBatis_resultMap 的关联方式实现多表查询(多对一)
- MyBatis_resultMap的N+1方式实现多表查询(多对 一)
- LeetCode 63. 不同路径 II
- 那些年遇到的刁钻JavaScript面试题(可防踩坑)
- JWT登录鉴权操作笔记 原
- c/c++补完计划(二-改): c字符串复制
- 来个鹅厂C语言面试题试试手?
- -1大于1,-1乘3不等于-3,C语言这个规则你必须得会!
- SQL注入攻击之sqlmap