【原创】Java并发编程系列35 | ScheduledThreadPoolExecutor定时器

时间:2022-07-24
本文章向大家介绍【原创】Java并发编程系列35 | ScheduledThreadPoolExecutor定时器,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

上一篇讲解了线程池的原理,这篇就在线程池基础上介绍基于线程池实现的定时器ScheduledThreadPoolExecutor:

  1. ScheduledThreadPoolExecutor的用法
  2. ScheduledThreadPoolExecutor源码分析
  3. ScheduledThreadPoolExecutor执行过程分析

1. 介绍

ScheduledThreadPoolExecutor 可以用来在给定延时后执行异步任务或者周期性执行任务,也就是我们说的定时器。ScheduledThreadPoolExecutor基于线程池,通过多线程实现延时和周期执行。

1.1 用法Demo

如下代码使用ScheduledThreadPoolExecutor实现:10ms后打印第一次,之后每隔30ms打印一次。

public class TimerDemo {
    public static void main(String[] args) {
        // 1. 创建线程池定时器
        ScheduledExecutorService timer = Executors.newScheduledThreadPool(3);
        // 2. 提交定时任务:10ms后打印第一次,之后每隔30ms打印一次
        timer.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
            }
        }, 10, 30, TimeUnit.MILLISECONDS);
    }
}

1.2 四种定时器用法

  1. 第一种schedule(Runnable command, long delay, TimeUnit unit);达到给定的延时时间后,执行任务,Runnable不能返回结果;
  2. 第二种schedule(Callablecallable, long delay, TimeUnit unit); 达到给定的延时时间后,执行任务,Callable可以返回结果;
  3. 第三种scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
  4. 第四种scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。
    /**
     * 达到给定的延时时间后,执行任务
     * @param command Runnable接口的任务,ScheduledFuture.get()获取结果为null
     */
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * 到给定的延时时间后,执行任务。
     * @param callable 实现Callable接口的任务,ScheduledFuture.get()可获取任务结果
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * 固定周期执行任务
     * @param initialDelay 第一次执行的延迟时间
     * @param period 周期
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
            long initialDelay, long period, TimeUnit unit);

    /**
     * 固定延时周期执行任务
     * @param initialDelay 第一次执行的延迟时间
     * @param delay 上一次执行结束到下一次执行开始的间隔时间
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
            long initialDelay, long delay, TimeUnit unit);

2. 类结构

2.1 继承结构

  1. ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,是一种特殊的线程池,拥有 execute()和 submit()提交异步任务功能。
  2. ScheduledThreadPoolExecutor 类实现了ScheduledExecutorService,该接口定义了延时执行任务和周期执行任务的功能;
  3. ScheduledThreadPoolExecutor 有两个重要的内部类:DelayedWorkQueue和ScheduledFutureTask。DelayedWorkQueue 实现了 BlockingQueue 接口,是一个阻塞队列;ScheduledFutureTask 继承了 FutureTask 类,是一个可以返回异步任务的结果的Runnable。

2.2 构造方法

通过构造方法可以看到,创建ScheduledThreadPoolExecutor其实就是创建一个线程池,corePoolSize可以指定,maximumPoolSize为Integer.MAX_VALUE,任务队列为DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

2.3 ScheduledFutureTask

ScheduledThreadPoolExecutor 提交的任务时,将任务封装成 ScheduledFutureTask,当执行任务时通过ScheduledFutureTask的run()方法调用任务的run()方法。

ScheduledFutureTask的compareTo()方法用于延迟队列排序,按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

private class ScheduledFutureTask<V> 
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    private long time;// 下次执行时间
    private final long period;// 周期
    
    /**
     * 比较方法,到下次执行时间短的任务优先
     */
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
    
    /**
     * ScheduledThreadPoolExecutor提交的任务被封装成ScheduledFutureTask,所以任务执行要通过这个run()方法
     */
    public void run() {
        boolean periodic = isPeriodic();// 是否是周期执行
        // 当前线程池运行状态下如果不可以执行任务,取消该任务
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        // 如果不是周期性任务,直接调用FutureTask中的run方法执行
        else if (!periodic)
            ScheduledFutureTask.super.run();
        // 如果是周期性任务,调用FutureTask中的runAndReset方法执行
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();// 计算下次执行该任务的时间
            reExecutePeriodic(outerTask);// 将新任务再次放入线程池等待被执行任务
        }
    }
}

2.4 DelayedWorkQueue

DelayedWorkQueue 是一个基于小顶堆的数据结构,类似于 DelayQueue 和 PriorityQueue。DelayedWorkQueue 按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。之前已经详细介绍过 DelayQueue 和 PriorityQueue了,这里就不在重复了。

  1. DelayedWorkQueue是存储ScheduledFutureTask阻塞队列。
  2. 插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;
  3. 取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {
    // 队列初始容量
    private static final int INITIAL_CAPACITY = 16;
    // 根据初始容量创建RunnableScheduledFuture类型的数组
    private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;
    //leader线程
    private Thread leader = null;
    //当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
    private final Condition available = lock.newCondition();
    
    /**
     * 根据延期时间对元素排序,队头的元素是最先到期的
     */
    public boolean offer(Runnable x) {}
    /**
     * 只有在队头元素到期时才能够从队列中取元素。
     * 如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
     */
    public RunnableScheduledFuture<?> take() throws InterruptedException {}
    
    // ......
}

3. 执行过程

以上文的小示例为例,通过源码来分析ScheduledThreadPoolExecutor的执行过程。

示例代码:

public class TimerDemo {
    public static void main(String[] args) {
        // 1. 创建线程池定时器
        ScheduledExecutorService timer = Executors.newScheduledThreadPool(3);
        // 2. 提交定时任务:10ms后打印第一次,之后每隔30ms打印一次
        timer.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
            }
        }, 10, 30, TimeUnit.MILLISECONDS);
    }
}

3.1 创建线程池定时器

创建线程池定时器就是创建一个线程池:

/**
 * Executors.newScheduledThreadPool(3);
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

3.2 添加任务

  1. Runable任务封装成ScheduledFutureTask;
  2. 任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
  3. 确保线程池中有活动线程,如果没有就启动一个。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  long initialDelay, long period, TimeUnit unit) {
 // 一些检查
 if (command == null || unit == null)
  throw new NullPointerException();
 if (period <= 0)
  throw new IllegalArgumentException();
 
 // 将command封装成ScheduledFutureTask,包括下次执行时间和周期
 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
   null, triggerTime(initialDelay, unit), unit.toNanos(period));
 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 sft.outerTask = t;// 设置下次执行的任务
 // 将ScheduledFutureTask类型的任务放入线程池延时执行,下文详细介绍
 delayedExecute(t);
 return t;
}

/**
 * 将ScheduledFutureTask类型的任务放入线程池延时执行
 */
private void delayedExecute(RunnableScheduledFuture<?> task) {
 // 线程池关闭了,走拒绝策略
 if (isShutdown())
  reject(task);
 else {
  // 无论如何,将任务加入延时队列,插入队列时会根据下次执行时间排序
  super.getQueue().add(task);
  // 特殊情况,插入队列后线程池关闭了,需要从队列中删除
  if (isShutdown() &&
   !canRunInCurrentRunState(task.isPeriodic()) &&
   remove(task))
   task.cancel(false);
  else
   ensurePrestart();// 确保线程池中有线程执行
 }
}

/**
 * 确保线程池中有线程执行
 * 只有两种情况会启动线程:
 * 1. 当前线程数小于corePoolSize,以corePoolSize为界限启动一个线程
 * 2. 线程池参数corePoolSize=0且此时线程池中没有线程,以maximumPoolSize为界限启动一个线程
 */
void ensurePrestart() {
 int wc = workerCountOf(ctl.get());
 // 线程数小于corePoolSize,以corePoolSize为界限启动一个线程
 if (wc < corePoolSize)
  addWorker(null, true);
 /*
  * 线程数大于等于corePoolSize且线程数==0,其实就是线程池参数corePoolSize=0且此时线程池中没有线程
  * 以maximumPoolSize为界限启动一个线程
  */
 else if (wc == 0)
  addWorker(null, false);
}

3.3 执行任务

  1. 线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
  2. 取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
  3. ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
  4. 线程池中的活动线程会循环到任务队列中取任务,...循环...
/**
 * ScheduledThreadPoolExecutor提交的任务被封装成ScheduledFutureTask,所以任务执行要通过这个run()方法
 */
public void run() {
 boolean periodic = isPeriodic();// 是否是周期执行
 // 当前线程池运行状态下如果不可以执行任务,取消该任务
 if (!canRunInCurrentRunState(periodic))
  cancel(false);
 // 如果不是周期性任务,直接调用FutureTask中的run方法执行
 else if (!periodic)
  ScheduledFutureTask.super.run();
 // 如果是周期性任务,调用FutureTask中的runAndReset方法执行
 else if (ScheduledFutureTask.super.runAndReset()) {
  setNextRunTime();// 计算下次执行该任务的时间
  reExecutePeriodic(outerTask);// 将新任务再次放入线程池等待被执行任务
 }
}

/**
 * 计算下次执行该任务的时间
 */
private void setNextRunTime() {
 long p = period;
 if (p > 0)
  time += p;
 else
  time = triggerTime(-p);
}

/**
 * 将新任务再次放入线程池等待被执行任务
 */
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 if (canRunInCurrentRunState(true)) {
  super.getQueue().add(task);// 加入延时队列
  // 删除不符合条件任务
  if (!canRunInCurrentRunState(true) && remove(task))
   task.cancel(false);
  else
   ensurePrestart();// 确保线程池中有线程执行
 }
}

3.3 执行过程总结

  1. Runable任务封装成ScheduledFutureTask;
  2. 任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
  3. 确保线程池中有活动线程,如果没有就启动一个;
  4. 线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
  5. 取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
  6. ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
  7. 线程池中的活动线程会循环到任务队列中取任务,...循环...

4. scheduleAtFixedRate() VS scheduleWithFixedDelay()

  • scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
  • scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。

从源码角度理解scheduleAtFixedRate()和scheduleWithFixedDelay()的不同,由两个细节决定:

细节一:构造ScheduledFutureTask时,scheduleAtFixedRate传入period(>0),而scheduleWithFixedDelay传入-delay(<0)。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));// z这里是period
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));// 这里是-delay
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

细节二:执行完一次后设置下次执行时间时,

  • p>0,scheduleAtFixedRate,下次执行开始时间=上次开始执行时间+周期
  • period<=0,scheduleWithFixedDelay,下次执行开始时间=上次结束执行时间(now)+周期
    /**
     * 计算下次执行该任务的时间
     */
    private void setNextRunTime() {
        long p = period;
        // p>0,scheduleAtFixedRate,下次执行开始时间=上次开始执行时间+周期
        if (p > 0)
            time += p;
        // period<=0,scheduleWithFixedDelay,下次执行开始时间=上次结束执行时间+周期
        else
            time = triggerTime(-p);
    }
    
    long triggerTime(long delay) {
        return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

5. 总结

  1. ScheduledThreadPoolExecutor 可以用来在给定延时后执行异步任务或者周期性执行任务,也就是我们说的定时器。ScheduledThreadPoolExecutor基于线程池,通过多线程实现延时和周期执行。
  2. ScheduledThreadPoolExecutor的四种用法:
  • schedule(Runnable command, long delay, TimeUnit unit);达到给定的延时时间后,执行任务,Runnable不能返回结果;
  • schedule(Callablecallable, long delay, TimeUnit unit); 达到给定的延时时间后,执行任务,Callable可以返回结果;
  • 第三种scheduleAtFixedRate(); 固定周期执行任务,每次执行的开始时间之间的间隔是固定的,最开始就能够确定之后每次执行的时间;
  • 第四种scheduleWithFixedDelay(); 固定延时周期执行任务,上一次执行结束到下一次执行开始的间隔时间是固定的,由于每次执行任务花费时间不一定相同,所以只有在上次执行结束之后才能确定下次执行开始的时间。
执行过程:
  1. Runable任务封装成ScheduledFutureTask;
  2. 任务加入延时队列,同时在队列中按照执行的时间顺序排序,最先执行的任务在队头;
  3. 确保线程池中有活动线程,如果没有就启动一个;
  4. 线程池中的活动线程会循环到任务队列中取任务,当队头任务还没到期时,线程阻塞至队头任务到期时间,然后再取任务;
  5. 取出任务后执行,因为任务是ScheduledFutureTask类型(添加任务时封装的),执行ScheduledFutureTask.run();
  6. ScheduledFutureTask.run()执行当前任务,设置下次执行时间并将任务放入线程池;
  7. 线程池中的活动线程会循环到任务队列中取任务,...循环...