ThreadPoolExecutor源码学习
我们之前温习了Thread类,明白了Runable接口才是多线程的任务核心。那么ThreadPoolExecutor就是用维护多线程的。作为工具类,ThreadPoolExecutor应该提供了很多操作线程的方法,按理说也是逐个去调用目标线程的方法。那么我们就详细了解一下ThreadPoolExecutor的实现过程吧。我们发现ThreadPoolExecutor类继承了AbstractExecutorService。而AbstractExecutorService实现了ExecutorService,ExecutorService继承了Executor,Executor主要提供execute方法。应该和真正的线程start方法挂钩。在AbstractExecutorService方法中实现了ExecutorService的接口。ExecutorService主要包含了线程的提交和线程的中断等方法。
作为线程管理的工具,那么ThreadPoolExecutor就是专门维护线程运行的,那么线程的容器也必然在这个类中,也就是我们新建的线程会提交到这个类中,然后通过这个类会将提交的Runnable任务按照他的安排进行执行。我们看到在ThreadPoolExecutor中有一个 private final HashSet<Worker> workers = new HashSet<Worker>();其中的Worker就是用来存储Runnable任务的工作任务。
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
setState(-1);
this.firstTask = firstTask;
//创建线程,并把任务提交
this.thread = getThreadFactory().newThread(this);
}
按照上边的逻辑,那么worker就是我们提交的任务的代理,也就是调用了start方法。然后调用runwroker的方法。而start方法是在addWorker中调用的
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
线程状态
int rs = runStateOf(c);
检测
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
自旋
for (;;) {
任务数目
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
对工作线程数添加1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
添加一个新的工作任务线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
加锁,所有创建线程的时候都加锁,防止创建多个相同的线程
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
加锁之后二次检测
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
加入到工作线程队列,然后通过对workers数组的操作就可以操作该线程。相当于记录一下线程信息
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
调用真正的线程
inal void runWorker(Worker w) {
获取当前线程
Thread wt = Thread.currentThread();
当前任务
Runnable task = w.firstTask;
移除
w.firstTask = null;
解除锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
这个while会一直执行,把队列中的任务都执行了。由于runWorker是另外一个线程。所以这个线程会抢着处理任务队列。指导任务处理完毕
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
wt.interrupt();
try {
执行任务之前
beforeExecute(wt, task);
Throwable thrown = null;
try {
执行任务,正真的run方法内容。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
执行任务之后
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
后置处理
processWorkerExit(w, completedAbruptly);
}
}
获取任务队列
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
工作线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
当线程和合理范围内就获取任务
取出一个任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
后置处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
加锁,防止多线程操作创建多个核心线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
将当前线程停止
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
创建新的工作线程
addWorker(null, false);
}
}
经过上述源码分析,我们大概明白了ThreadPoolExecutor类的工作机理。尤其是工作线程worker使用代理的方式。以及在while方法在工作线程中的使用使得线程能够处理足够的任务。在线程处理的任务结束,没有任务可以处理也就是任务队列空的时候就线程自然销毁,如果核心线程不需要销毁就创建新线程。但是当没有任务的时候。核心线程其实是自旋的,所以感觉还是有消耗的。因此对于没有必要使用多线程的地方还是不要使用多线程了。当然通过分析我们发现ThreadPoolExecutor并没有在初始化的时候就创建核心线程,而是在逐步添加任务的时候创建的。根据我的分析和理解,我可能会尽可能少的设置核心线程。或者建议开启allowCoreThreadTimeOut,减少不必要的系统消耗。
- Struts2 S2-046, S2-045 Firewall(漏洞防火墙)
- 应用程序的通信成本
- Spring cloud 之 Feign Client
- Spring Cloud Netflix
- 重新整理AUTO_INCREMENT字段
- Spring Cloud Config
- Spring boot with Schedule (启用/禁用)
- DevOps Tools
- Docker 虚拟机之 Redis
- Spring boot with HTTPS SSL
- Spring boot with Git version
- Spring boot with Elasticsearch 5.5.1
- 怎样将 MySQL 数据表导入到 Elasticsearch
- Spring data 数据库建表(一对一,一对多,多对多)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- django-常见问题勘误
- 【tensorflow2.0】AutoGraph和tf.Module
- mybatis之第一个mybatis程序(二)
- mybatis-spring整合的三种(逐渐优化)方案
- 用C++跟你聊聊“命令模式”,跟我聊就够了
- 【tensorflow2.0】数据管道dataset
- 用C++跟你聊聊“桥接模式”
- 用C++跟你聊聊“单例模式”,类的“计划生育”
- django-HttpResponse,render,redirect
- django-ForeignKey,OneToOneField,ManyToManyField
- 用C++跟你聊聊“备忘录模式” ,如果能重来,我要···
- 数据库(一)--通过django创建数据库表并填充数据
- springmvc之异常处理中ExceptionHanderExceptionResolver
- 【tensorflow2.0】损失函数losses
- 如何在python文件中测试sql语句