【原创】Java并发编程系列36 | FutureTask
线程池源码中出现了很多Callable、Future、FutureTask等以前没介绍过的接口,尤其是线程池提交任务时总是把任务封装成FutureTask,今天就来为大家解惑:
- Runnable、Callable、Future、FutureTask
- FutureTask类结构
- FutureTask状态
- 执行任务 run()方法
- 获取任务返回值 get()方法
- 取消任务 cancel()方法
1. Runnable、Callable、Future、FutureTask
1.1 Runnable
Runnable接口只有一个run方法,而run方法的返回值是void,所以线程执行完之后没有返回值。
public interface Runnable {
public abstract void run();
}
1.2 Callable
在很多场景下,我们通过线程来异步执行任务之后,希望获取到任务的执行结果。比如RPC框架中,需要异步获取任务返回值。这种情况下,Runnable无法获取返回值就无法满足需求了,因此Callable就出现了。
Callable也是一个接口,也只有一个call()方法,不同的是Callable的call()方法有是有返回值的,返回值的类型是一个泛型,泛型由创建Callable对象时指定。
public interface Callable<V> {
V call() throws Exception;
}
1.3 Future
要想获得Callable的返回值就需要用到Future接口。Futrue可以监视和控制Callable任务的执行情况,如对执行结果进行取消、查询是否完成、获取结果等。
如:当一个任务通过线程池的submit()方法提交到线程池后,线程池会返回一个Future类型的对象,我们可以通过Future对象来获取任务在线程池中的状态。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 mayInterruptIfRunning参数用来表示是否需要中断线程,如果传true,表示需要中断线程,那么就会将任务的状态设置为INTERRUPTING;如果为false,那么就会将任务的状态设置为CANCELLED(关于任务的状态INTERRUPTING和CANCELLED后面会说明)
- isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
- isDone方法:表示任务是否已经完成,若任务完成,则返回true
- get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
- get(long timeout, TimeUnit unit)方法:获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
举例:Future获取Callable任务的返回值
public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "结果";
}
});
System.out.println("Callable返回值=" + future.get());
}
}
输出结果:
Callable返回值=结果
1.4 FutureTask
FutureTask是Runnable和Future的实现类,既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
当线程池调用submit()方法来向线程池中提交任务时,无论提交的是Runnable类型的任务,还是提交的是Callable类型的任务,最终都是将任务封装成一个FutureTask对象,我们可以通过这个FutureTask对象来获取任务在线程池中的状态。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 调用newTaskFor()将Callable任务封装成一个FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 执行任务
execute(ftask);
return ftask;
}
// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 直接new一个FutureTask对象
return new FutureTask<T>(callable);
}
2. FutureTask类结构
public class FutureTask<V> implements RunnableFuture<V> {
/** state变量用来保存任务的状态 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 提交的任务,Runnable类型的任务会通过Executors.callable()来转变为Callable */
private Callable<V> callable;
/** 用来保存Callable的call()方法的返回值 */
private Object outcome;
/** 执行Callable任务的线程 **/
private volatile Thread runner;
/**
* 任务未完成时,调用get方法获取结果的线程会阻塞等待
* waiters用于保存这些线程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
3. FutureTask状态
FutureTask任务的状态如下:
// 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
private static final int NEW = 0;
// 任务处于完成中,也就是正在执行还未设置返回值
private static final int COMPLETING = 1;
// 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后
private static final int NORMAL = 2;
// 任务出了异常,并将异常对象赋值给outcome属性之后
private static final int EXCEPTIONAL = 3;
// 调用cancle(false),任务被取消了
private static final int CANCELLED = 4;
// 调用cancle(true),任务中断,但是在线程中断之前
private static final int INTERRUPTING = 5;
// 调用cancle(true),任务中断,但是在线程中断之后
private static final int INTERRUPTED = 6;
状态变化如下图:
4. 执行任务run()
- 执行future.callable.call(),执行任务;
- 执行成功,设置结果outcome;
- 逐个唤醒waiters中的线程去获取执行结果。
public void run() {
/*
* 1. 不是NEW状态,不能执行
* 2. 设置runner失败,不能执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 真正执行任务
ran = true;// 执行成功,设置执行成功标志
} catch (Throwable ex) {
result = null;
ran = false;// 有异常,执行失败
setException(ex);// 设置异常
}
// 如果执行成功,则设置返回结果
if (ran)
set(result);
}
} finally {
runner = null;// 无论是否执行成功,把runner设置为null
int s = state;
// 处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 设置执行结果
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 执行完成,设置COMPLETING状态
outcome = v;// 设置执行结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置完结果,设置NORMAL状态
finishCompletion();// 逐个唤醒waiters中的线程去获取执行结果
}
}
5. 获取任务返回值get()方法
- 任务状态为NORMAL,直接返回执行结果;
- 任务状态为COMPLETING,线程yield()让出CPU,因为COMPLETING到NORMAL只需要很短的时间,get线程让出CPU的短暂时间,任务状态就是从COMPLETING变成了NORMAL;
- 任务状态为NEW,将get线程阻塞,如果设置了超时,阻塞至超时时间;如果没有设置超时,会一直阻塞直到任务完成后唤醒。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,awaitDone()等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);// 下文详解
// 返回结果,下文详解
return report(s);
}
/**
* 返回执行结果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常结束时,返回outcome
if (s == NORMAL)
return (V)x;
// 任务被取消了,抛出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 这里只能第EXCEPTIONAL状态,表示在执行过程中出现了异常,抛出ExecutionException。
throw new ExecutionException((Throwable)x);
}
/**
* 处于NEW或者COMPLETING状态时,get线程等待
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// ......
for (;;) {
// ......
// 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// ......
// 如果设置了超时,阻塞至超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 等待一段时间
LockSupport.parkNanos(this, nanos);
}
else
// 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
LockSupport.park(this);
}
}
6. 取消任务 cancel()
将任务状态设置成INTERRUPTING/INTERRUPTED/CANCELLED状态就表示取消了线程,因为在这些状态下任务的run方法是不能执行的。
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 以下情况不能取消任务:
* 1. 当前任务不是NEW状态,已经被执行了,不能取消
* 2. 当前任务还没有执行,state == NEW,但是CAS设置状态失败,不能取消
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();// 中断线程
} finally { // final state
// 中断之后,设置INTERRUPTED状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();// 唤醒waiters中的线程去获取执行结果
}
return true;
}
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Mac系统R语言升级后无法加载包报错 package or namespace load failed in dyn.load
- 如何从xml文件创建R语言数据框dataframe
- Matlab马尔可夫链蒙特卡罗法(MCMC)估计随机波动率(SV) 模型
- 如何从xml文件创建R语言数据框dataframe
- R语言POT超阈值模型和极值理论EVT分析
- R语言使用灰色关联分析(Grey Relation Analysis,GRA)中国经济社会发展指标
- R语言中的模拟过程和离散化:泊松过程和维纳过程
- R语言Lee-Carter模型对年死亡率建模预测预期寿命
- R语言有极值(EVT)依赖结构的马尔可夫链(MC)对洪水极值分析
- RxSwift 封装 CoreBluetooth(一) 配置
- Golang 操作Excel文件
- 腾讯云TKE-搭建prometheus监控(一)
- Android开发中ProgressDialog简单用法示例
- Android实现拍照及图片裁剪(6.0以上权限处理及7.0以上文件管理)
- Android仿微信调用第三方地图应用导航(高德、百度、腾讯)