ScheduledThreadPoolExecutor源码学习
ScheduledThreadPoolExecutor看样子也是ThreadPoolExecutor的一种,因为ThreadPoolExecutor也没有什么问题那么为什么还要有ScheduledThreadPoolExecutor?
从字面上理解的话ScheduledThreadPoolExecutor意思就是计划执行的线程执行器。那么我可以猜测一下ScheduledThreadPoolExecutor的实现机制,使用sleep吗?手动狗头哈哈,怀着迷惑的心情,我们一探究竟。
从图中可以看到ScheduledThreadPoolExecutor是继承的ThreadPoolExecutor的,而我们知道ThreadPoolExecutor是依靠的是worker线程。如果是计划线程,那么肯定是让工作线程按照规律去执行。既然工作线程在父类中,那么子类要修改父类的工作线程那么无疑就是重写了?带着疑问咋来看吧。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
以上的4个计划线程池和父类ThreadPoolExecutor并没有什么差别。唯一的区别是使用的是延迟队列来装载任务的。而我们之前学习延迟队列的时候我们就知道延迟队列是通过实现Delayed接口生成时间戳,然后延迟队列获取消息的时候总是获取的到期时间的消息。
而这里使用的延迟队列也不是我们之前学习的DelayQueue,而是自定义的延迟队列。而其中的内部类也正是延迟队列的定义。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
//转变为RunnableScheduledFuture
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//扩容
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
//将任务设置到队列开头
setIndex(e, 0);
} else {
// 将任务设置到第i位
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
在添加任务到延迟队列的时候,调用的offer方法。
private void grow() {
int oldCapacity = queue.length;
//这里扩容的时候使用的是原来的容量+原来的1/2,扩容一半
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
//将队列拷贝过来,因为之前加锁了,所以这块是线程安全的
queue = Arrays.copyOf(queue, newCapacity);
}
在添加到延迟队列的时候。调用的shifUp方法,其中k代表队列的最后下标。key代表任务.
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
e为旧任务
RunnableScheduledFuture<?> e = queue[parent];
采用时间比较
if (key.compareTo(e) >= 0)
自己执行的时间是比较靠后的
break;
说明自己执行的时间是靠前的
queue[k] = e;
setIndex(e, k);
k = parent;
}
队列k的位置放上新任务key
queue[k] = key;
setIndex(key, k);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
当前任务的执行时间与初步计算冲突任务执行时间的比
long diff = time - x.time;
如果当前任务执行的时间靠前返回-1
if (diff < 0)
return -1;
else if (diff > 0)
当前执行任务的时间靠后
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
通过上述代码的分析,我们知道延迟队列中的第一个任务就是即将要执行的。那么如果时间没到执行的时间又是如果处理的?我们还是来看看源码吧.
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//获取延迟队列第一个元素
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//如果时间到了执行的时间,就调用finishpoll
return finishPoll(first);
//否则重新来吧
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
可以看到这块的代码也挺暴力的,一个自旋而且还不是执行时间的话。就这么等着它到期了之后再执行。
通过上述分析,我们知道ScheduledFutureTask类是任务的容器。其中实现了Runnable接口,然后提供了时间戳作为延迟的基础。而DelayedWorkQueue实现了BlockingQueue的基本接口。并将ScheduledFutureTask作为容器模板存储添加的任务。按照到期时间来组织要执行的任务。
通过分析,我们发现这里的延迟队列的默认容量是16,扩容之后就是24。依次类推。
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
而DeyLayQueue的默认容量是16,扩容的时候
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// 容量小于64的时候就进行2倍数扩容,小于64就50%扩容
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
如果按照我们这种分析的话,那么计划任务线程池接下来基本和ThreadPoolExecutor没有什么差别了。唯一的差别就是任务队列了。通过源码我们发现有很多方法也是直接用用的父类。
在submit方法里
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
这里好奇的是delay都是0,解释一下就是我们在延迟任务也就是ScheduledFutureTask已经包含了执行时间戳等信息。也就是这里的task.
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//执行延迟队列
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
任务添加到了任务队列,就开动机器了。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//启动工作线程了
addWorker(null, true);
else if (wc == 0)
//启动工作线程
addWorker(null, false);
}
通过以上分析,我们知道计划线程池执行器的底层是依靠延迟队列实现的。而延迟队列是通过继承接口delayed接口。除此之外延迟队列也采用自旋的方式来不断尝试到期的任务来执行。任务的执行也是在添加到任务队列之后启动worker线程进行处理的。
- Web 前端利器Emmet 的CSS 用法总结
- 响应式 HTML 邮件制作之三个实例
- java知识点归纳
- EaseMobile 主题导航菜单设置小图标的方法(图文+视频教程)
- NEC css规范
- DW Mobile Switcher:移动设备识别切换主题插件
- 揭秘技术大国以色列
- mix-blend-mode 混合模式 background-blend-mode 背景混合模式 isolation:isolate 隔离
- wp_nav_menu 函数经Walker_Nav_Menu 类自定义导航菜单HTML
- css3 RGBA 红色R+绿色G+蓝色B+Alpha通道
- css3 gradient 渐变
- TwentyTwenty:一个图片特效Jquery 插件
- 何为实在
- css3 动画应用 animations 和transtions transform在加上JavaScript 可以实现硬件加速动画。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用XtraBackup备份MySQL 8.0 Part 6 对数据库进行增量备份
- ArrayList源码阅读笔记
- 3分钟短文 | Laravel表单验证没规则可用?你试试自定义,真香!
- 【C#】DataGridView 数据绑定的一些细节
- 3分钟短文 | Laravel 查询结果检查是不是空,5个方法你别用错!
- 使用XtraBackup备份MySQL 8.0 Part 7 对增量备份进行恢复
- 3分钟短文 | Laravel 日志全程记录 SQL 查询语句,要改写底层?
- MySQL InnoDB表空间加密
- 微信小程序自动化测试最佳实践(附 Python 源码)
- 3分钟短文 | MySQL在分组时,把多列合并为一个字段!
- Redis Linux系统参数最佳配置
- 实现Promise其它API
- 使用sysbench进行压测 Part1 sysbench安装
- Java并发编程(07):Fork/Join框架机制详解
- PostgreSQL Pgbouncer 到底怎么使用,疗效有多大