ThreadPoolExecutor详解

时间:2022-07-24
本文章向大家介绍ThreadPoolExecutor详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

ThreadPoolExecutor详解

一. 概述

ThreadPoolExecutor是JDK提供的线程池的基类,其中定义了线程池的核心框架,并且允许客户端通过继承的方式实现自定义的线程池。JDK提供默认的几种线程池都继承了ThreadPoolExecutor类,因此有必要对ThreadPoolExecutor进行详细的分析。

二. 核心参数

ThreadPoolExecutor主要有以下几个参数:

  1. corePoolSize:核心线程数。线程池初始化时,内部默认是没有可用线程的。当向线程池中提交一个新任务时,如果当前线程池中线程数小于corePoolSize,则会创建新的线程执行该任务。
  2. workQueue:任务队列。如果当前线程池中的线程数已经达到corePoolSize,此时再提交新任务,则会将任务放到workQueue中保存,等待后续空闲线程去执行。workQueue是JDK中BlockingQueue的实现类,可以使用ArrayBlockingQueue有界队列、LinkedBlockingQueue无界队列或其他类型的阻塞队列,这里不再展开。
  3. maximumPoolSize:最大线程数。当workQueue已被任务填满(有界队列的情况下),此时如果再向线程池提交新任务,则线程池会创建新的线程,但是线程池中线程总数不得超过maximumPoolSize。也即maximumPoolSize是线程数量的上限。
  4. RejectedExecutionHandler:拒绝策略。当线程池中线程数量已达到maximumPoolSize上限时,若再提交任务,则线程池不会再处理,而是直接,按照RejectedExecutionHandler定义的策略拒绝执行。JDK默认提供了几种拒绝策略:
    1. AbortPolicy:直接抛出异常
    2. CallerRunsPolicy:使用调用者所在的线程来执行
    3. DiscardPolicy:直接丢弃不处理
    4. DiscardOldestPolicy:丢弃任务队列中存在时间最长的任务,并执行当前任务。
  5. keepAliveTime:空闲线程存活时间。即线程池中的线程在空闲后,可以存活的最大时间。

三. 执行逻辑

上述的线程池参数,在一定程度上也说明了线程池执行任务的逻辑。下面进行总结:

  1. 如果当前线程池中运行的线程数小于corePoolSize,则创建新的线程执行任务,否则执行步骤2;
  2. 如果当前任务队列未满,则将任务加入到任务队列,否则执行步骤3;
  3. 如果线程数小于maximumPoolSize,则创建新的线程执行,否则执行拒绝策略。

四. 源码分析

ThreadPoolExecutor有一个内部类Worker,它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。此外,Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable,源码如下:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    //当前Worker所处于的线程
    final Thread thread;

    //待执行的任务
    Runnable firstTask;

    //任务计数器
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
  }

向线程池中提交任务的核心方法为execute(),下面进行详细分析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
   
    int c = ctl.get();
    //通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

源码中的英文注释我没有删除,可以清楚地看出线程池执行任务的逻辑与前文所述一致。

首先,通过ctl这个原子变量。获取当前线程池内的线程数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:

  1. 记录当前有效线程的数量;
  2. 记录当前线程池的状态,如允许、关闭等。

由于使用了一个整形(32字节)来表示两个状态,因此JDK进行了特殊的处理,只使用30个字节来表示线程数量,剩余2个字节表示状态,这就表示ThreadPoolExecutor最多可以创建(2^29)-1个线程,这个数量理论上足够用了。

回到execute方法,通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行,addWorker方法如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    //自旋,判断线程池状态,并对线程数量执行原子+1操作
    retry:
    for (;;) {
        int c = ctl.get();
        
        //获取线程池状态
        int rs = runStateOf(c);

       	//如果线程池已经关闭,则直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。
            //如果线程数已达上限,直接返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //执行原子操作,对线程数+1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //ctl变量操作成功,执行Worker相关逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个新的Worker,传入待执行的任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            
            mainLock.lock();
            try {
                //加锁后,再次判断线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //如果创建了新的Worker,则调用其start方法立即执行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()方法的在首先在一个自旋中进行CAS操作,判断线程池状态和线程数,并尝试将线程数+1。如果操作失败直接返回false。CAS操作成功后,创建一个新的Worker,并立即调用其start()方法执行该任务。

Worker执行任务的方法为runWorker():

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //首先释放锁,允许中断
    w.unlock(); 
    boolean completedAbruptly = true;
    
    try {
        while (task != null || (task = getTask()) != null) {
            //执行任务前先加锁
            w.lock();
            //如果线程池已经终止,则中断该线程。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //当前Worker计数器+1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。

值得注意的是,Worker的firstTask可能为空,此时Worker并不会执行自己的任务,而是调用getTask()方法从任务队列中拉取任务执行。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。

至此,execute()方法的第一个分支执行完毕,即线程数量少于corePoolSize的情况。后面的分支逻辑也并不复杂:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
   
    int c = ctl.get();
    //通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    //针对线程数超过corePoolSize的情况,将任务放入workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    //如果workQueue以满,则以maximumPoolSize为上限尝试创建新的线程
    else if (!addWorker(command, false))
        //如果线程数已达maximumPoolSize,则执行拒绝策略
        reject(command);
}