ScheduledThreadPoolExecutor源码学习

时间:2022-07-23
本文章向大家介绍ScheduledThreadPoolExecutor源码学习,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

ScheduledThreadPoolExecutor看样子也是ThreadPoolExecutor的一种,因为ThreadPoolExecutor也没有什么问题那么为什么还要有ScheduledThreadPoolExecutor?

从字面上理解的话ScheduledThreadPoolExecutor意思就是计划执行的线程执行器。那么我可以猜测一下ScheduledThreadPoolExecutor的实现机制,使用sleep吗?手动狗头哈哈,怀着迷惑的心情,我们一探究竟。

从图中可以看到ScheduledThreadPoolExecutor是继承的ThreadPoolExecutor的,而我们知道ThreadPoolExecutor是依靠的是worker线程。如果是计划线程,那么肯定是让工作线程按照规律去执行。既然工作线程在父类中,那么子类要修改父类的工作线程那么无疑就是重写了?带着疑问咋来看吧。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

以上的4个计划线程池和父类ThreadPoolExecutor并没有什么差别。唯一的区别是使用的是延迟队列来装载任务的。而我们之前学习延迟队列的时候我们就知道延迟队列是通过实现Delayed接口生成时间戳,然后延迟队列获取消息的时候总是获取的到期时间的消息。

而这里使用的延迟队列也不是我们之前学习的DelayQueue,而是自定义的延迟队列。而其中的内部类也正是延迟队列的定义。

  public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
     //转变为RunnableScheduledFuture
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
             //扩容
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
          //将任务设置到队列开头
                    setIndex(e, 0);
                } else {
           // 将任务设置到第i位
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

在添加任务到延迟队列的时候,调用的offer方法。

 private void grow() {
            int oldCapacity = queue.length;
      //这里扩容的时候使用的是原来的容量+原来的1/2,扩容一半
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
      //将队列拷贝过来,因为之前加锁了,所以这块是线程安全的
            queue = Arrays.copyOf(queue, newCapacity);
        }

在添加到延迟队列的时候。调用的shifUp方法,其中k代表队列的最后下标。key代表任务.

 private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
        e为旧任务
                RunnableScheduledFuture<?> e = queue[parent];
        采用时间比较
                if (key.compareTo(e) >= 0)
            自己执行的时间是比较靠后的
                    break;
        说明自己执行的时间是靠前的
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
      队列k的位置放上新任务key
            queue[k] = key;
            setIndex(key, k);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        当前任务的执行时间与初步计算冲突任务执行时间的比
                long diff = time - x.time;
        如果当前任务执行的时间靠前返回-1
                if (diff < 0)
                    return -1;
                else if (diff > 0)
            当前执行任务的时间靠后
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

通过上述代码的分析,我们知道延迟队列中的第一个任务就是即将要执行的。那么如果时间没到执行的时间又是如果处理的?我们还是来看看源码吧.

 public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
      //自旋
                for (;;) {
        //获取延迟队列第一个元素
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                //如果时间到了执行的时间,就调用finishpoll
                            return finishPoll(first);
              //否则重新来吧
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

可以看到这块的代码也挺暴力的,一个自旋而且还不是执行时间的话。就这么等着它到期了之后再执行。

通过上述分析,我们知道ScheduledFutureTask类是任务的容器。其中实现了Runnable接口,然后提供了时间戳作为延迟的基础。而DelayedWorkQueue实现了BlockingQueue的基本接口。并将ScheduledFutureTask作为容器模板存储添加的任务。按照到期时间来组织要执行的任务。

通过分析,我们发现这里的延迟队列的默认容量是16,扩容之后就是24。依次类推。

private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
      
  而DeyLayQueue的默认容量是16,扩容的时候
      private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // 容量小于64的时候就进行2倍数扩容,小于64就50%扩容
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }

如果按照我们这种分析的话,那么计划任务线程池接下来基本和ThreadPoolExecutor没有什么差别了。唯一的差别就是任务队列了。通过源码我们发现有很多方法也是直接用用的父类。

在submit方法里


  public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }
    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }
    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }

这里好奇的是delay都是0,解释一下就是我们在延迟任务也就是ScheduledFutureTask已经包含了执行时间戳等信息。也就是这里的task.

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
  
  //执行延迟队列
      private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
  
  任务添加到了任务队列,就开动机器了。
  void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
        //启动工作线程了
            addWorker(null, true);
        else if (wc == 0)
         //启动工作线程
            addWorker(null, false);
    }

通过以上分析,我们知道计划线程池执行器的底层是依靠延迟队列实现的。而延迟队列是通过继承接口delayed接口。除此之外延迟队列也采用自旋的方式来不断尝试到期的任务来执行。任务的执行也是在添加到任务队列之后启动worker线程进行处理的。