JDK1.8源码分析:Executors线程池创建工厂的实现原理

时间:2019-02-16
本文章向大家介绍JDK1.8源码分析:Executors线程池创建工厂的实现原理,主要包括JDK1.8源码分析:Executors线程池创建工厂的实现原理使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

概述

  • 在Executor线程执行器框架中,提供了Executors这个工具类来创建指定的Executor实现类。在Executors工具类中提供了ThreadPoolExecutor创建时所需的默认参数,通过方法名称来表明指定的实现,从而简化了ThreadPoolExecutor线程池的创建。

固定线程池,无界队列

  • 线程池的核心线程数和最大线程数固定且相同,任务等待队列无界:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    // 指定线程创建工厂,如自定义线程名称,方便问题排查
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    
  • 线程池只有一个线程,任务等待队列无界:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    // 指定线程创建工厂,如自定义线程名称,方便问题排查
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    

无界线程池

  • 线程池无界,corePoolSize为0,maxiumPoolSize为Integer.MAX_VALUE,keepAliveTime为60秒,等待队列使用同步队列SynchronousQueue,该队列内部不存放数据,每次追加一个元素,如果存在线程等待获取,则交给该等待线程,否则对于非阻塞版本则直接返回false,对于阻塞版本则阻塞等待,在ThreadPoolExecutor中使用的是offer非阻塞版本,这里使用该队列主要作用是实现线程池线程的复用,即空闲线程即为等待线程,实现Cached的语义。

  • 这个版本通常用于任务实时性要求高,CPU资源充足的场景。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
    

    所以这个实现的特点为:结合ThreadPoolExecutor的execute实现原理,具体可以先看我的另外一篇文章:JDK1.8源码分析:Executor和ThreadPoolExecutor线程池的设计和源码实现

    。对于每提交一个新任务:

    1. 由于corePoolSize为0,故先放入等待队列,由于使用的是同步队列SynchronousQueue且使用offer非阻塞追加到队列,故如果此时线程池存在空闲线程,则交给其中一个空闲线程执行,否则追加队列失败;
    2. 追加队列失败,则如果当前线程池数量没有超过maxiumPoolSize,则创建一个新工作线程来执行这个任务。
    3. 由于keepAliveTime为60秒,corePoolSize为0,所以如果某个工作线程超过60秒没有处理任务,则被销毁回收。

周期性任务执行线程池

  • 周期性任务执行线程池主要通过ScheduledThreadPoolExecutor来实现,构造函数如下,通常可以指定corePoolSize,而maximumPoolSize通常为Integer.MAX_VALUE。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
    
  • 其中使用的队列为DelayedWorkQueue,为无界队列。该队列内部使用了一个初始容量为16的数组,之后每次拓容为原来的1.5倍,最大容量为Integer.MAX_VALUE。

Executors提供的周期性线程池创建方法

单线程版本
  • 即corePoolSize为1,新提交的任务放到队列中,直到队列满了(由于是无界队列,通常不会满),则创建新线程。

    // 周期性任务执行线程池
    // 线程数为1的调度线程池,如果该线程挂了,则会创建一个新的线程继续执行接下来的任务
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
    
  • 注意周期性任务的每次执行可以在同一个线程,也可以在不同的线程,但是周期性频率是保持有序的。如果某次执行任务抛了异常,则之后该任务不会再继续执行,所以一般需要在任务中进行异常捕获,否则会悄悄停止执行。如果是工作线程自身问题线程挂掉,则会新建一个工作线程继续执行任务。关于周期性任务执行如果出现异常。

指定线程版本
  • 指定corePoolSize的数量,通常用于一个周期性线程池需要执行多个周期性任务的场景。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }