史上最全ThreadPoolExecutor梳理(上篇)

时间:2022-07-23
本文章向大家介绍史上最全ThreadPoolExecutor梳理(上篇),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一、开场白

Java是面向对象编程,万事万物皆对象,讲究池化技术,可以避免对象频繁的创建、销毁,浪费性能。线程池作为线程的复用利器,工作中都用过,可以说是非常非常重要。面试时很多面试官也会重点考察这块知识,用归用,但你是否真的了解线程池的内部原理?

  • 核心线程、最大线程、阻塞队列、拒绝策略,这四者是什么关系?
  • 拒绝策略有哪些?如何实现一个自定义的拒绝策略?
  • 如何动态调整线程池中的参数配置?
  • Runnable任务是作为构造器入参来实例化Thread对象的,如果一个Runnable任务执行完,下一个Runnable如何传入Thread对象中?
  • 空闲线程是如何回收的?回收的力度有多大?
  • ThreadPoolExecutor,预留了哪些扩展?如何做性能监控?

二、7个核心参数

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

1、corePoolSize(核心线程数):

  • 当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize。
  • 除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程。

2、maximumPoolSize(最大线程数):

  • 线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,会创建新的线程来执行任务。
  • 另外,对于无界队列,可忽略该参数。

3、keepAliveTime(最大空闲时间):

  • 默认情况下,当线程个数大于corePoolSize时,如果线程的空闲时间超过keepAliveTime则会销毁。
  • allowCoreThreadTimeOut(boolean) 方法可将此超时策略应用于核心线程。
  • 另外,也可以使用setKeepAliveTime()动态地更改参数。

4、unit(存活时间的单位):

  • 时间单位,分为7类,从细到粗顺序:NANOSECONDS(纳秒),MICROSECONDS(微秒),MILLISECONDS(毫秒),SECONDS(秒),MINUTES(分),HOURS(小时),DAYS(天);

5、workQueue(任务队列):

  • 用于保存等待执行任务的阻塞队列,线程会不断从该队列拉取任务执行。
  • 如果运行的线程数少于 corePoolSize,优先创建新的线程,而不进行排队。
  • 如果运行的线程数大于等于 corePoolSize,则 Executor 始终首选将请求加入队列,而不是创建新的线程。
  • 如果无法将任务加入队列,则创建新的线程,除非线程数已经达到 maximumPoolSize,此时,任务将被拒绝。

6、threadFactory(线程工厂):

  • 用于创建新线程。由同一个threadFactory创建的线程,属于同一个ThreadGroup,创建的线程优先级都为Thread.NORM_PRIORITY,以及是非守护进程状态。
  • threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号);

7、handler(拒绝策略):

  • 当线程池和队列都满了,则表明该线程池已达饱和状态。
  • ThreadPoolExecutor.AbortPolicy:拒绝并抛出异常 RejectedExecutionException。(默认策略)
  • ThreadPoolExecutor.CallerRunsPolicy:调用者所在线程来运行该任务,能够减缓新任务的提交速度
  • ThreadPoolExecutor.DiscardPolicy:直接扔掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:如果线程池尚未关闭,将队列的头元素移除,然后提交当前任务
  • 也可以实现 RejectedExecutionHandler接口,自定义拒绝策略。

三、状态、计数字段

// 高3位用来表示线程池的状态,后面的29位则表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  //二进制表示,29个1

// 线程池当前状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

ctl是线程池的核心状态控制字段,本身是一个AtomicInteger,用来保证对ctl的操作都是线程安全的。这里利用位运算巧妙地将一个int(一个int 4个字节 即32位)拆成了两部分,高3位用来表示线程的状态,剩下的29位则表示工作线程数。这里就可以得知工作线程的数量上限即CAPACITY,大约有5亿。

这五种状态转换成二进制后如下所示:

  • RUNNING:
    • 能接受新提交的任务
    • 能处理阻塞队列中的任务
  • SHUTDOWN:
    • 不再接受新提交的任务
    • 可以继续处理阻塞队列中已保存的任务。
    • 在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态
    • finalize() 方法,也会调用shutdown()方法进入该状态
  • STOP:
    • 不接受新任务,也不处理队列中的任务
    • 线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
      • interruptWorkers(),所有线程(Worker)会中断
      • drainQueue(),返回阻塞队列中未执行的任务List
      • 触发 tryTerminate() 方法
  • TIDYING:
    • 如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法,并把状态修改成 TERMINATED
  • TERMINATED:
    • 在terminated() 方法执行完后进入该状态,terminated()方法默认空实现

状态相关的方法:

  • 线程池是否处于运行状态。
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
  • 线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
  • 线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
  • 线程池的统计数据
long getTaskCount() // 已完成和未执行的任务总数;
long getCompletedTaskCount() // 已完成的任务数量,小于等于taskCount;
int getLargestPoolSize() //线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,是否达到过maximumPoolSize;
int getPoolSize() //线程池中的的线程数量
int getActiveCount() // 正在运行任务的线程数量

四、线程池提交任务

  • execute(),提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit(),提交需要返回值的任务,线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功。
Future<Object> future = executor.submit(task);

未完待续: 史上最全ThreadPoolExecutor梳理(下篇)