使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解
之前创建线程的时候都是用的 newCachedThreadPoo
,newFixedThreadPool
,newScheduledThreadPool
,newSingleThreadExecutor
这四个方法。
当然 Executors
也是用不同的参数去 new ThreadPoolExecutor
实现的,本文先分析前四种线程创建方式,后在分析 new ThreadPoolExecutor
创建方式
使用 Executors 创建线程池
1.newFixedThreadPool()
由于使用了LinkedBlockingQueue
所以maximumPoolSize
没用,当corePoolSize
满了之后就加入到LinkedBlockingQueue
队列中。 每当某个线程执行完成之后就从LinkedBlockingQueue
队列中取一个。 所以这个是创建固定大小的线程池。
源码分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.newSingleThreadPool()
创建线程数为1的线程池,由于使用了LinkedBlockingQueue
所以maximumPoolSize
没用,corePoolSize
为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
3.newCachedThreadPool()
创建可缓冲的线程池。没有大小限制。由于corePoolSize
为0所以任务会放入SynchronousQueue
队列中,SynchronousQueue
只能存放大小为1,所以会立刻新起线程,由于maxumumPoolSize
为Integer.MAX_VALUE
所以可以认为大小为2147483647
。受内存大小限制。
源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用 ThreadPoolExecutor 创建线程池
源码分析 ,ThreadPoolExecutor
的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数参数
1、corePoolSize
核心线程数大小,当线程数 < corePoolSize ,会创建线程执行 runnable
2、maximumPoolSize
最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中
3、keepAliveTime
保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。
4、unit
时间单位
5、workQueue
保存任务的阻塞队列
6、threadFactory
创建线程的工厂
7、handler
拒绝策略
任务执行顺序
1、当线程数小于 corePoolSize
时,创建线程执行任务。
2、当线程数大于等于 corePoolSize
并且 workQueue
没有满时,放入workQueue
中
3、线程数大于等于 corePoolSize
并且当 workQueue
满时,新任务新建线程运行,线程总数要小于 maximumPoolSize
4、当线程总数等于 maximumPoolSize
并且 workQueue
满了的时候执行 handler
的 rejectedExecution
。也就是拒绝策略。
四个拒绝策略
ThreadPoolExecutor默认有四个拒绝策略:
1、ThreadPoolExecutor.AbortPolicy()
直接抛出异常RejectedExecutionException
2、ThreadPoolExecutor.CallerRunsPolicy()
直接调用run方法并且阻塞执行
3、ThreadPoolExecutor.DiscardPolicy()
直接丢弃后来的任务
4、ThreadPoolExecutor.DiscardOldestPolicy()
丢弃在队列中队首的任务
当然可以自己继承RejectedExecutionHandler来写拒绝策略.
TestThreadPoolExecutor 示例
TestThreadPoolExecutor.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:39
**/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("创建任务并提交到线程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待所有线程执行完毕当前任务。
threadPool.shutdown();
boolean loop = true;
do {
//等待所有线程执行完毕当前任务结束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("所有线程执行完毕");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
ThreadPoolTask.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:40
**/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
遇到java.util.concurrent.RejectedExecutionException
第一
你的线程池 ThreadPoolExecutor
显示的 shutdown()
之后,再向线程池提交任务的时候。 如果你配置的拒绝策略是 AbortPolicy
的话,这个异常就会抛出来。
第二
当你设置的任务缓存队列过小的时候,或者说, 你的线程池里面所有的线程都在干活(线程数== maxPoolSize
),并且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。
响应
可以看到线程 pool-1-thread-1 到5 循环使用
创建任务并提交到线程池中:task=1
开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1
创建任务并提交到线程池中:task=2
开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2
创建任务并提交到线程池中:task=3
开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3
创建任务并提交到线程池中:task=4
开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4
创建任务并提交到线程池中:task=5
开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5
创建任务并提交到线程池中:task=6
开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1
创建任务并提交到线程池中:task=7
开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2
创建任务并提交到线程池中:task=8
开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3
创建任务并提交到线程池中:task=9
开始执行任务:task=9任务,使用的线程池,线程名称:pool-1-thread-4
创建任务并提交到线程池中:task=10
开始执行任务:task=10任务,使用的线程池,线程名称:pool-1-thread-5
所有线程执行完毕
耗时:1015
测试代码
github https://github.com/souyunku/ymq-example/tree/master/ymq-thread
Contact
- 作者:鹏磊
- 出处:http://www.ymq.io
- Email:admin@souyunku.com
- 版权归作者所有,转载请注明出处
- Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用JFR分析性能问题
- MyISAM 迁移至 InnoDB方案
- 解决Seafile局域网访问失败
- 一键解锁网易云音乐变灰歌曲
- 彻底理解 IO多路复用
- 分享Apache环境禁止目录浏览的方法
- DB2 Linux平台安装 Part 1 Linux环境配置
- DB2 Linux平台安装 Part 2 单机版软件安装
- DB2 Linux平台安装 Part 3 实例的建立与配置
- C++核心准则Con.2:默认情况下,将成员函数定义为const类型
- C++核心准则Con.3:默认情况下,传递参照常量的指针或引用
- C++核心准则Con.4:如果一个对象在构建之后值不会改变,使用const定义它
- C++核心准则Con.5:对于可以在编译时计算的值,使用constexpr进行声明
- DB2 Linux平台安装 Part 4 创建数据库
- VBA编写Ribbon Custom UI编辑器03——认识Ribbon的xml