JDK1.8源码分析:Executors线程池创建工厂的实现原理
概述
- 在Executor线程执行器框架中,提供了Executors这个工具类来创建指定的Executor实现类。在Executors工具类中提供了ThreadPoolExecutor创建时所需的默认参数,通过方法名称来表明指定的实现,从而简化了ThreadPoolExecutor线程池的创建。
固定线程池,无界队列
-
线程池的核心线程数和最大线程数固定且相同,任务等待队列无界:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 指定线程创建工厂,如自定义线程名称,方便问题排查 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
-
线程池只有一个线程,任务等待队列无界:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 指定线程创建工厂,如自定义线程名称,方便问题排查 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
无界线程池
-
线程池无界,corePoolSize为0,maxiumPoolSize为Integer.MAX_VALUE,keepAliveTime为60秒,等待队列使用同步队列SynchronousQueue,该队列内部不存放数据,每次追加一个元素,如果存在线程等待获取,则交给该等待线程,否则对于非阻塞版本则直接返回false,对于阻塞版本则阻塞等待,在ThreadPoolExecutor中使用的是offer非阻塞版本,这里使用该队列主要作用是实现线程池线程的复用,即空闲线程即为等待线程,实现Cached的语义。
-
这个版本通常用于任务实时性要求高,CPU资源充足的场景。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
所以这个实现的特点为:结合ThreadPoolExecutor的execute实现原理,具体可以先看我的另外一篇文章:JDK1.8源码分析:Executor和ThreadPoolExecutor线程池的设计和源码实现
。对于每提交一个新任务:- 由于corePoolSize为0,故先放入等待队列,由于使用的是同步队列SynchronousQueue且使用offer非阻塞追加到队列,故如果此时线程池存在空闲线程,则交给其中一个空闲线程执行,否则追加队列失败;
- 追加队列失败,则如果当前线程池数量没有超过maxiumPoolSize,则创建一个新工作线程来执行这个任务。
- 由于keepAliveTime为60秒,corePoolSize为0,所以如果某个工作线程超过60秒没有处理任务,则被销毁回收。
周期性任务执行线程池
-
周期性任务执行线程池主要通过ScheduledThreadPoolExecutor来实现,构造函数如下,通常可以指定corePoolSize,而maximumPoolSize通常为Integer.MAX_VALUE。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
-
其中使用的队列为DelayedWorkQueue,为无界队列。该队列内部使用了一个初始容量为16的数组,之后每次拓容为原来的1.5倍,最大容量为Integer.MAX_VALUE。
Executors提供的周期性线程池创建方法
单线程版本
-
即corePoolSize为1,新提交的任务放到队列中,直到队列满了(由于是无界队列,通常不会满),则创建新线程。
// 周期性任务执行线程池 // 线程数为1的调度线程池,如果该线程挂了,则会创建一个新的线程继续执行接下来的任务 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
-
注意周期性任务的每次执行可以在同一个线程,也可以在不同的线程,但是周期性频率是保持有序的。如果某次执行任务抛了异常,则之后该任务不会再继续执行,所以一般需要在任务中进行异常捕获,否则会悄悄停止执行。如果是工作线程自身问题线程挂掉,则会新建一个工作线程继续执行任务。关于周期性任务执行如果出现异常。
指定线程版本
-
指定corePoolSize的数量,通常用于一个周期性线程池需要执行多个周期性任务的场景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
- 碎片化 | 第四阶段-37-sturts2-登录功能实现-视频
- Spring Cloud Edgware新特性之六:Artifact ID变更
- 用 RNN 训练语言模型生成文本
- 碎片化 | 第四阶段-38-Struts2登录session对象封装-视频
- LeetCode实战:子问题分析
- 5分钟构建一个自己的无人驾驶车
- 碎片化 | 第四阶段-39-Struts2中session对象梳理-视频
- 用深度神经网络处理NER命名实体识别问题
- 碎片化 | 第四阶段-40-Struts组件分类讲解-视频
- nginx location配置
- 碎片化 | 第四阶段-41-struts2字节流生成验证码-视频
- Python|模块,包,标准模板
- 人脸识别应用之“变脸”
- 碎片化 | 第四阶段-32-Struts2列表展示-视频
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法