ThreadPoolExecutor源码学习

时间:2022-07-23
本文章向大家介绍ThreadPoolExecutor源码学习,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

我们之前温习了Thread类,明白了Runable接口才是多线程的任务核心。那么ThreadPoolExecutor就是用维护多线程的。作为工具类,ThreadPoolExecutor应该提供了很多操作线程的方法,按理说也是逐个去调用目标线程的方法。那么我们就详细了解一下ThreadPoolExecutor的实现过程吧。我们发现ThreadPoolExecutor类继承了AbstractExecutorService。而AbstractExecutorService实现了ExecutorService,ExecutorService继承了Executor,Executor主要提供execute方法。应该和真正的线程start方法挂钩。在AbstractExecutorService方法中实现了ExecutorService的接口。ExecutorService主要包含了线程的提交和线程的中断等方法。

作为线程管理的工具,那么ThreadPoolExecutor就是专门维护线程运行的,那么线程的容器也必然在这个类中,也就是我们新建的线程会提交到这个类中,然后通过这个类会将提交的Runnable任务按照他的安排进行执行。我们看到在ThreadPoolExecutor中有一个 private final HashSet<Worker> workers = new HashSet<Worker>();其中的Worker就是用来存储Runnable任务的工作任务。

Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
  setState(-1); 
  this.firstTask = firstTask;
  //创建线程,并把任务提交
  this.thread = getThreadFactory().newThread(this);
}

按照上边的逻辑,那么worker就是我们提交的任务的代理,也就是调用了start方法。然后调用runwroker的方法。而start方法是在addWorker中调用的

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
      线程状态
            int rs = runStateOf(c);
            检测
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            自旋
            for (;;) {
         任务数目
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
          对工作线程数添加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
    添加一个新的工作任务线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
        加锁,所有创建线程的时候都加锁,防止创建多个相同的线程
                mainLock.lock();
                try {
                     
                    int rs = runStateOf(ctl.get());
                    加锁之后二次检测
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
              加入到工作线程队列,然后通过对workers数组的操作就可以操作该线程。相当于记录一下线程信息
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

调用真正的线程

inal void runWorker(Worker w) {
获取当前线程
        Thread wt = Thread.currentThread();
    当前任务
        Runnable task = w.firstTask;
    移除
        w.firstTask = null;
    解除锁
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
    这个while会一直执行,把队列中的任务都执行了。由于runWorker是另外一个线程。所以这个线程会抢着处理任务队列。指导任务处理完毕
            while (task != null || (task = getTask()) != null) {
                w.lock();
        
                if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
                    wt.interrupt();
                try {
        执行任务之前
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
          执行任务,正真的run方法内容。
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
          执行任务之后
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        后置处理
            processWorkerExit(w, completedAbruptly);
        }
    }
  
  获取任务队列
   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            工作线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
      当线程和合理范围内就获取任务
      取出一个任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    
  后置处理
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        加锁,防止多线程操作创建多个核心线程
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        将当前线程停止
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
      创建新的工作线程
            addWorker(null, false);
        }
    }
  
    

经过上述源码分析,我们大概明白了ThreadPoolExecutor类的工作机理。尤其是工作线程worker使用代理的方式。以及在while方法在工作线程中的使用使得线程能够处理足够的任务。在线程处理的任务结束,没有任务可以处理也就是任务队列空的时候就线程自然销毁,如果核心线程不需要销毁就创建新线程。但是当没有任务的时候。核心线程其实是自旋的,所以感觉还是有消耗的。因此对于没有必要使用多线程的地方还是不要使用多线程了。当然通过分析我们发现ThreadPoolExecutor并没有在初始化的时候就创建核心线程,而是在逐步添加任务的时候创建的。根据我的分析和理解,我可能会尽可能少的设置核心线程。或者建议开启allowCoreThreadTimeOut,减少不必要的系统消耗。