ThreadPoolExecutor详解
ThreadPoolExecutor详解
一. 概述
ThreadPoolExecutor是JDK提供的线程池的基类,其中定义了线程池的核心框架,并且允许客户端通过继承的方式实现自定义的线程池。JDK提供默认的几种线程池都继承了ThreadPoolExecutor类,因此有必要对ThreadPoolExecutor进行详细的分析。
二. 核心参数
ThreadPoolExecutor主要有以下几个参数:
- corePoolSize:核心线程数。线程池初始化时,内部默认是没有可用线程的。当向线程池中提交一个新任务时,如果当前线程池中线程数小于corePoolSize,则会创建新的线程执行该任务。
- workQueue:任务队列。如果当前线程池中的线程数已经达到corePoolSize,此时再提交新任务,则会将任务放到workQueue中保存,等待后续空闲线程去执行。workQueue是JDK中BlockingQueue的实现类,可以使用ArrayBlockingQueue有界队列、LinkedBlockingQueue无界队列或其他类型的阻塞队列,这里不再展开。
- maximumPoolSize:最大线程数。当workQueue已被任务填满(有界队列的情况下),此时如果再向线程池提交新任务,则线程池会创建新的线程,但是线程池中线程总数不得超过maximumPoolSize。也即maximumPoolSize是线程数量的上限。
- RejectedExecutionHandler:拒绝策略。当线程池中线程数量已达到maximumPoolSize上限时,若再提交任务,则线程池不会再处理,而是直接,按照RejectedExecutionHandler定义的策略拒绝执行。JDK默认提供了几种拒绝策略:
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:使用调用者所在的线程来执行
- DiscardPolicy:直接丢弃不处理
- DiscardOldestPolicy:丢弃任务队列中存在时间最长的任务,并执行当前任务。
- keepAliveTime:空闲线程存活时间。即线程池中的线程在空闲后,可以存活的最大时间。
三. 执行逻辑
上述的线程池参数,在一定程度上也说明了线程池执行任务的逻辑。下面进行总结:
- 如果当前线程池中运行的线程数小于corePoolSize,则创建新的线程执行任务,否则执行步骤2;
- 如果当前任务队列未满,则将任务加入到任务队列,否则执行步骤3;
- 如果线程数小于maximumPoolSize,则创建新的线程执行,否则执行拒绝策略。
四. 源码分析
ThreadPoolExecutor有一个内部类Worker,它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。此外,Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable,源码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//当前Worker所处于的线程
final Thread thread;
//待执行的任务
Runnable firstTask;
//任务计数器
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
向线程池中提交任务的核心方法为execute(),下面进行详细分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
源码中的英文注释我没有删除,可以清楚地看出线程池执行任务的逻辑与前文所述一致。
首先,通过ctl这个原子变量。获取当前线程池内的线程数:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:
- 记录当前有效线程的数量;
- 记录当前线程池的状态,如允许、关闭等。
由于使用了一个整形(32字节)来表示两个状态,因此JDK进行了特殊的处理,只使用30个字节来表示线程数量,剩余2个字节表示状态,这就表示ThreadPoolExecutor最多可以创建(2^29)-1个线程,这个数量理论上足够用了。
回到execute方法,通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行,addWorker方法如下:
private boolean addWorker(Runnable firstTask, boolean core) {
//自旋,判断线程池状态,并对线程数量执行原子+1操作
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
//如果线程池已经关闭,则直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。
//如果线程数已达上限,直接返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//执行原子操作,对线程数+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
//ctl变量操作成功,执行Worker相关逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个新的Worker,传入待执行的任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//加锁后,再次判断线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果创建了新的Worker,则调用其start方法立即执行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()方法的在首先在一个自旋中进行CAS操作,判断线程池状态和线程数,并尝试将线程数+1。如果操作失败直接返回false。CAS操作成功后,创建一个新的Worker,并立即调用其start()方法执行该任务。
Worker执行任务的方法为runWorker():
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//首先释放锁,允许中断
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//执行任务前先加锁
w.lock();
//如果线程池已经终止,则中断该线程。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//当前Worker计数器+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。
值得注意的是,Worker的firstTask可能为空,此时Worker并不会执行自己的任务,而是调用getTask()方法从任务队列中拉取任务执行。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。
至此,execute()方法的第一个分支执行完毕,即线程数量少于corePoolSize的情况。后面的分支逻辑也并不复杂:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//针对线程数超过corePoolSize的情况,将任务放入workQueue中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果workQueue以满,则以maximumPoolSize为上限尝试创建新的线程
else if (!addWorker(command, false))
//如果线程数已达maximumPoolSize,则执行拒绝策略
reject(command);
}
- Python实现守护进程
- 初探Anaconda——最省心的Python版本和第三方库管理
- Linux环境下JDK/Eclipse一键安装脚本
- (31) 剖析Arrays / 计算机程序的思维逻辑
- 应用自然语言处理(NLP)解码电影
- 不引入新的数组,实现数组元素交换位置函数
- (30) 剖析StringBuilder / 计算机程序的思维逻辑
- Java初始化顺序
- ConcurrentHashMap使用示例
- (40) 剖析HashMap / 计算机程序的思维逻辑
- nginx配置https(亲测可用)
- linux中无 conio.h的解决办法
- 运用适配器模式应对项目中的变化
- 开车啦!小爬虫抓取今日头条街拍美女图
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 常用SQL语句
- Java中的时间和日期(三):java8中新的时间API介绍
- Java中的时间和日期(四):与java8时间API有关的一些总结和补充
- Head First设计模式——策略模式
- 可重用性的6个级别
- 您可能不需要使用Vue 3的Vuex
- 谈一谈若干的K-V NoSQL应用:LevelDB、Redis、Tair、RockesDB
- 高通量数据中批次效应的鉴定和处理(六)- 直接校正表达矩阵
- Go viper 配置文件读取工具
- spring security默认访问权限判定源码
- Springboot+mybatis最简单的增删改查写法
- Head First设计模式——观察者模式
- spring security oauth2 资源服务/客户端无法正确获取权限
- EXTJS grid.column.renderer绑定失效
- Head First设计模式——装饰者模式