【原创】Java并发编程系列36 | FutureTask

时间:2022-07-25
本文章向大家介绍【原创】Java并发编程系列36 | FutureTask,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

线程池源码中出现了很多Callable、Future、FutureTask等以前没介绍过的接口,尤其是线程池提交任务时总是把任务封装成FutureTask,今天就来为大家解惑:

  1. Runnable、Callable、Future、FutureTask
  2. FutureTask类结构
  3. FutureTask状态
  4. 执行任务 run()方法
  5. 获取任务返回值 get()方法
  6. 取消任务 cancel()方法

1. Runnable、Callable、Future、FutureTask

1.1 Runnable

Runnable接口只有一个run方法,而run方法的返回值是void,所以线程执行完之后没有返回值。

public interface Runnable {
    public abstract void run();
}

1.2 Callable

在很多场景下,我们通过线程来异步执行任务之后,希望获取到任务的执行结果。比如RPC框架中,需要异步获取任务返回值。这种情况下,Runnable无法获取返回值就无法满足需求了,因此Callable就出现了。

Callable也是一个接口,也只有一个call()方法,不同的是Callable的call()方法有是有返回值的,返回值的类型是一个泛型,泛型由创建Callable对象时指定。

public interface Callable<V> {
 V call() throws Exception;
}

1.3 Future

要想获得Callable的返回值就需要用到Future接口。Futrue可以监视和控制Callable任务的执行情况,如对执行结果进行取消、查询是否完成、获取结果等。

如:当一个任务通过线程池的submit()方法提交到线程池后,线程池会返回一个Future类型的对象,我们可以通过Future对象来获取任务在线程池中的状态。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 mayInterruptIfRunning参数用来表示是否需要中断线程,如果传true,表示需要中断线程,那么就会将任务的状态设置为INTERRUPTING;如果为false,那么就会将任务的状态设置为CANCELLED(关于任务的状态INTERRUPTING和CANCELLED后面会说明)
  • isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
  • isDone方法:表示任务是否已经完成,若任务完成,则返回true
  • get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
  • get(long timeout, TimeUnit unit)方法:获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

举例:Future获取Callable任务的返回值

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        Future<String> future = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "结果";
            }
        });
        System.out.println("Callable返回值=" + future.get());
    }
}

输出结果:

Callable返回值=结果

1.4 FutureTask

FutureTask是Runnable和Future的实现类,既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

当线程池调用submit()方法来向线程池中提交任务时,无论提交的是Runnable类型的任务,还是提交的是Callable类型的任务,最终都是将任务封装成一个FutureTask对象,我们可以通过这个FutureTask对象来获取任务在线程池中的状态。

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 调用newTaskFor()将Callable任务封装成一个FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 执行任务
        execute(ftask);
        return ftask;
    }

    // newTaskFor
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        // 直接new一个FutureTask对象
        return new FutureTask<T>(callable);
    }

2. FutureTask类结构

public class FutureTask<V> implements RunnableFuture<V> {
    /** state变量用来保存任务的状态 */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    /** 提交的任务,Runnable类型的任务会通过Executors.callable()来转变为Callable */
    private Callable<V> callable;
    /** 用来保存Callable的call()方法的返回值 */
    private Object outcome;
    /** 执行Callable任务的线程 **/
    private volatile Thread runner;
    /**
     * 任务未完成时,调用get方法获取结果的线程会阻塞等待
     * waiters用于保存这些线程
     */
    private volatile WaitNode waiters;
    
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
}

3. FutureTask状态

FutureTask任务的状态如下:

    // 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
    private static final int NEW          = 0;
    // 任务处于完成中,也就是正在执行还未设置返回值
    private static final int COMPLETING   = 1;
    // 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后
    private static final int NORMAL       = 2;
    // 任务出了异常,并将异常对象赋值给outcome属性之后
    private static final int EXCEPTIONAL  = 3;
    // 调用cancle(false),任务被取消了
    private static final int CANCELLED    = 4;
    // 调用cancle(true),任务中断,但是在线程中断之前
    private static final int INTERRUPTING = 5;
    //  调用cancle(true),任务中断,但是在线程中断之后
    private static final int INTERRUPTED  = 6;

状态变化如下图:

4. 执行任务run()

  1. 执行future.callable.call(),执行任务;
  2. 执行成功,设置结果outcome;
  3. 逐个唤醒waiters中的线程去获取执行结果。
public void run() {
 /*
  * 1. 不是NEW状态,不能执行
  * 2. 设置runner失败,不能执行
  */
 if (state != NEW ||
  !UNSAFE.compareAndSwapObject(this, runnerOffset,
          null, Thread.currentThread()))
  return;
 try {
  Callable<V> c = callable;
  if (c != null && state == NEW) {
   V result;
   boolean ran;
   try {
    result = c.call();// 真正执行任务
    ran = true;// 执行成功,设置执行成功标志
   } catch (Throwable ex) {
    result = null;
    ran = false;// 有异常,执行失败
    setException(ex);// 设置异常
   }
   // 如果执行成功,则设置返回结果
   if (ran)
    set(result);
  }
 } finally {
  runner = null;// 无论是否执行成功,把runner设置为null
  int s = state;
  // 处理中断
  if (s >= INTERRUPTING)
   handlePossibleCancellationInterrupt(s);
 }
}

/**
 * 设置执行结果
 */
protected void set(V v) {
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 执行完成,设置COMPLETING状态
  outcome = v;// 设置执行结果
  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置完结果,设置NORMAL状态
  finishCompletion();// 逐个唤醒waiters中的线程去获取执行结果
 }
}

5. 获取任务返回值get()方法

  1. 任务状态为NORMAL,直接返回执行结果;
  2. 任务状态为COMPLETING,线程yield()让出CPU,因为COMPLETING到NORMAL只需要很短的时间,get线程让出CPU的短暂时间,任务状态就是从COMPLETING变成了NORMAL;
  3. 任务状态为NEW,将get线程阻塞,如果设置了超时,阻塞至超时时间;如果没有设置超时,会一直阻塞直到任务完成后唤醒。
public V get() throws InterruptedException, ExecutionException {
 int s = state;
 // 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,awaitDone()等待
 if (s <= COMPLETING)
  s = awaitDone(false, 0L);// 下文详解
 // 返回结果,下文详解
 return report(s);
}

/**
 * 返回执行结果
 */
private V report(int s) throws ExecutionException {
 Object x = outcome;
 // 任务正常结束时,返回outcome
 if (s == NORMAL)
  return (V)x;
 // 任务被取消了,抛出CancellationException
 if (s >= CANCELLED)
  throw new CancellationException();
 // 这里只能第EXCEPTIONAL状态,表示在执行过程中出现了异常,抛出ExecutionException。
 throw new ExecutionException((Throwable)x);
}

/**
 * 处于NEW或者COMPLETING状态时,get线程等待
 */
private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
 // ......
 for (;;) {
  // ......
  // 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
  else if (s == COMPLETING) // cannot time out yet
   Thread.yield();
  // ......         
  // 如果设置了超时,阻塞至超时时间
  else if (timed) {
   nanos = deadline - System.nanoTime();
   if (nanos <= 0L) {
    removeWaiter(q);
    return state;
   }
   // 等待一段时间
   LockSupport.parkNanos(this, nanos);
  }
  else
   // 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
   LockSupport.park(this);
 }
}

6. 取消任务 cancel()

将任务状态设置成INTERRUPTING/INTERRUPTED/CANCELLED状态就表示取消了线程,因为在这些状态下任务的run方法是不能执行的。

public boolean cancel(boolean mayInterruptIfRunning) {
 /*
  * 以下情况不能取消任务:
  * 1. 当前任务不是NEW状态,已经被执行了,不能取消
  * 2. 当前任务还没有执行,state == NEW,但是CAS设置状态失败,不能取消
  */
 if (!(state == NEW &&
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
     mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
  return false;
 try {    // in case call to interrupt throws exception
  // 中断
  if (mayInterruptIfRunning) {
   try {
    Thread t = runner;
    if (t != null)
     t.interrupt();// 中断线程
   } finally { // final state
    // 中断之后,设置INTERRUPTED状态
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
   }
  }
 } finally {
  finishCompletion();// 唤醒waiters中的线程去获取执行结果
 }
 return true;
}