【Java多线程-3】Future与FutureTask

时间:2022-07-25
本文章向大家介绍【Java多线程-3】Future与FutureTask,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Future与FutureTask

前文中我们讲述了创建线程的2种方式:直接继承Thread和实现Runnable接口,但这两种方式在执行完任务之后都无法获取执行结果。 自从Java 5开始,JDK提供了Callable和Future,解决了上述问题,通过它们可以在任务执行完毕之后得到任务执行结果。

1 Future

1.1 Future简介

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法:

  • cancel:取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务:
    1. 如果设置true,则表示可以取消正在执行过程中的任务。
    2. 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false;
    3. 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
    4. 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled:方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone:判断任务是否已经完成,已完成则返回true;
  • get():获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit):用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

1.2 Future使用示例

设想,有这样一个场景,同时启动3个线程分别执行一个任务,线程1耗时8s,线程2耗时7s,线程3耗时6s。用Future去接收线程执行结果,并手动维护一个List放置所有Future,代码如下:

import java.util.ArrayList;
import java.util.List;
import java.util.Date;
import java.util.concurrent.*;

/**
 * @author guozhengMu
 * @version 1.0
 * @date 2019/11/7 20:54
 * @description
 * @modify
 */
public class FutureTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        class Task implements Callable<String> {
            private int time;
            public Task(int time) {
                this.time = time;
            }

            @Override
            public String call() throws Exception {
                String name = Thread.currentThread().getName();
                System.out.println(name + "启动:" + new Date());
                TimeUnit.SECONDS.sleep(time);
                return name;
            }
        }

        List<Future<String>> results = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            Future<String> future = executor.submit(new Task(8 - i));
            results.add(future);
        }

        for (int i = 0; i < 3; i++) {
            System.out.println(results.get(i).get() + "完成:" + new Date());
        }
        System.out.println("全部线程执行完毕");
        executor.shutdownNow();
    }
}

输出结果:

pool-1-thread-1启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-3启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-2启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-1完成:Fri Nov 08 13:42:16 CST 2019
pool-1-thread-2完成:Fri Nov 08 13:42:16 CST 2019
pool-1-thread-3完成:Fri Nov 08 13:42:16 CST 2019
全部线程执行完毕

可以看到,get方法具有阻塞性,线程1 的结果未返回前,其他已经完成的线程任务结果也无法获取。下面对上面代码进行改进:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Date;
import java.util.concurrent.*;

/**
 * @author guozhengMu
 * @version 1.0
 * @date 2019/11/7 20:54
 * @description
 * @modify
 */
public class FutureTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(3);
        class Task implements Callable<String> {
            private int time;

            public Task(int time) {
                this.time = time;
            }

            @Override
            public String call() throws Exception {
                String name = Thread.currentThread().getName();
                System.out.println(name + "启动:" + new Date());
                TimeUnit.SECONDS.sleep(time);
                return name;
            }
        }

        List<Future<String>> results = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            Future<String> future = executor.submit(new Task(8 - i));
            results.add(future);
        }

        boolean flag = true;
        while (flag) {
            for (Iterator<Future<String>> iter = results.iterator(); iter.hasNext(); ) {
                Future<String> future = iter.next();
                if (future.isDone()) {
                    System.out.println(future.get() + "完成:" + new Date());
                    iter.remove();
                }
            }
            if (results.size() == 0) {
                flag = false;
            }
        }
        System.out.println("全部线程执行完毕");
        executor.shutdownNow();
    }
}

输出结果:

pool-1-thread-2启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-1启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-3启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-3完成:Fri Nov 08 14:12:49 CST 2019
pool-1-thread-2完成:Fri Nov 08 14:12:50 CST 2019
pool-1-thread-1完成:Fri Nov 08 14:12:51 CST 2019
全部线程执行完毕

可以看到,一旦某个线程任务执行结束,其结果能被立即获取到,但代价是程序在不停地循环查询线程任务 isDone 的结果,对cpu消耗比较大。因此,使用Future解决多任务结果,并不是最优的效果。 FutureTask正是为此而存在

2 FutureTask

2.1 FutureTask简介

FutureTask类实现了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V>

RunnableFuture接口又继承了Runable和Future

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可见,FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。FutureTask类图如下:

下面我们再来看看 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个工具类,这个工具类有两个构造函数:

FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

2.2 FutureTask使用示例

使用Callable+FutureTask获取执行结果:

import java.util.concurrent.*;

/**
 * @author guozhengMu
 * @version 1.0
 * @date 2019/11/8 14:17
 * @description
 * @modify
 */
public class FutureTaskTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask = new MyTask("hello,", "world-");
        //将任务放进FutureTask里
        FutureTask<Object> futureTask = new FutureTask<>(myTask);
        //采用thread来开启多线程
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            System.out.println(futureTask.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class MyTask implements Callable<Object> {
    private String param1;
    private String param2;

    //构造函数,用来向task中传递任务的参数
    public MyTask(String param1, String param2) {
        this.param1 = param1;
        this.param2 = param2;
    }

    //任务执行的动作
    @Override
    public String call() {
        for (int i = 0; i < 5; i++) {
            System.out.println(param1 + param2 + i);
        }
        return "运行完成!";
    }
}

输出结果:

hello,world-0
hello,world-1
hello,world-2
hello,world-3
hello,world-4
运行完成!

也可以使用线程池:

import java.util.concurrent.*;

/**
 * @author guozhengMu
 * @version 1.0
 * @date 2019/11/8 14:17
 * @description
 * @modify
 */
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建 FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(() -> 1 + 2);
        // 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        // 提交 FutureTask
        executor.submit(futureTask);
        // 获取计算结果
        Integer result = futureTask.get();
        System.out.println(result);
        executor.shutdown();
    }
}

接下来,我们使用FutureTask来实现Future多线程获取任务结果的场景:

import java.util.Date;
import java.util.concurrent.*;

/**
 * @author guozhengMu
 * @version 1.0
 * @date 2019/11/8 14:17
 * @description
 * @modify
 */
public class FutureTaskTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 3; i++) {
            Callable<String> callable = new Task(8 - i);
            MyFutureTask task = new MyFutureTask(callable);
            executor.submit(task);
        }
        executor.shutdown();
    }
}

class MyFutureTask extends FutureTask<String> {
    public MyFutureTask(Callable<String> callable) {
        super(callable);
    }

    @Override
    protected void done() {
        try {
            System.out.println(get() + "完成:" + new Date());
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

class Task implements Callable<String> {
    private int time;

    public Task(int time) {
        this.time = time;
    }

    @Override
    public String call() throws InterruptedException {
        String name = Thread.currentThread().getName();
        System.out.println(name + "启动:" + new Date());
        TimeUnit.SECONDS.sleep(time);
        return name;
    }
}

输出结果:

pool-1-thread-1启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-3启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-2启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-3完成:Fri Nov 08 17:35:32 CST 2019
pool-1-thread-2完成:Fri Nov 08 17:35:33 CST 2019
pool-1-thread-1完成:Fri Nov 08 17:35:34 CST 2019