Java8使用CompletableFuture的部分方法

时间:2022-07-23
本文章向大家介绍Java8使用CompletableFuture的部分方法,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

背景

CompletableFuture的使用是为了异步编程,异步编程可以解决同步编程的性能瓶颈问题。也就是将同步操作变为了并行操作。 当我们有一大批数据需要处理的时候我们可以将这些数据分而治之,使用CompletableFuture通过线程池的多个线程进行异步执行。

  1. 异步执行的意思就是下一个人不不用等上一个任务执行完成,也就是重新起一个线程池
  2. 这里的线程池指的就是当上一个人任务没有执行完,需要新起一个的线程就在这个线程池里创建或者直接获取已有的线程。

CompletableFuture介绍

  1. 在1.8之前我们使用多线程操作的方法是通过CallAble来实现call方法,然后通过future获得异步的结果,其中要么是使用get()方法进行阻塞,我么轮训IsDone()查看是否为true这两种方法都会导致主线程的阻塞。于是在1.8的引入了CompletableFuture,他是针对future做了改进。通过example来看一下具体的使用

CompletableFuture的主要方法

1. CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 其中没有指定线程池的是直接使用默认线程吃forkJoinPool(分而治之的代表)其中核心线程数为CPU的核心线程数。
  • runAsync 和 supplyAsync的区别在于前者没有返回值后者有返回值。 正是因为他入参的类型不一样,没有反回值的是Runable对象,我们都是知道Runable是实现的线程确实是没有返回值的。
/**
 * @author yuanxindong
 * @date 2020/7/30 9:07 下午
 */
public class CompletableFutureDemo {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    //
    System.out.println(runAsyncDemo().get());
    System.out.println(supplyAsyncDemo().get());
  }

  static CompletableFuture<Void> runAsyncDemo() {
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(CompletableFutureDemo::getMessage);
    return voidCompletableFuture;
  }

  static CompletableFuture<String> supplyAsyncDemo() {
    CompletableFuture<String> stringCompletableFuture1 =
        CompletableFuture.supplyAsync(CompletableFutureDemo::getMessage);
    return stringCompletableFuture1;
  }

 static  String getMessage() {
    return "return message";
  }
}

执行结果:

null
return message

Process finished with exit code 0

2. 计算结果完成时回掉方法

当我们异步调用完成调用后,计算结果完成或者异常的时候我们应该如何接受结果呢? 下面有四种接受的方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

不难看出形参名称为action的为正常回调的接受,形参为fn的为异常回调的接受。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

package 并发编程;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
 * @author yuanxindong
 * @date 2020/7/30 10:22 下午
 */
public class WhenCompleteDemo {
  public static void main(String[] args) throws InterruptedException {
    // 启动异步编程的一个线程任务
    CompletableFuture<Void> future =
        CompletableFuture.runAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
              }

              if (new Random().nextInt() % 2 >= 0) {
                int i = 12 / 0;
              }
              System.out.println("run end ...");
            });

    // 接受正常任务结果
    future.whenComplete(
        new BiConsumer<Void, Throwable>() {
          @Override
          public void accept(Void t, Throwable action) {
            System.out.println("执行完成!");
          }
        });

    // 接受异常任务结果
    future.exceptionally(
        new Function<Throwable, Void>() {
          @Override
          public Void apply(Throwable t) {
            System.out.println("执行失败!" + t.getMessage());
            return null;
          }
        });

    TimeUnit.SECONDS.sleep(2);
  }

  private static void print(String message) {
    System.out.println(message);
  }
}

执行结果:

执行失败!java.lang.ArithmeticException: / by zero
执行完成!

Process finished with exit code 0

我们可以想一下应用场景,在我们下了订单之后我们给用户返回抽奖的场景。

3、 thenApply 方法

下一个任务依赖上一个任务的结果

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Function<? super T,? extends U>

  • T:上一个任务返回结果的类型
  • U:当前任务的返回值类型
private static void thenApply() throws Exception {
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
        @Override
        public Long get() {
            long result = new Random().nextInt(100);
            System.out.println("result1="+result);
            return result;
        }
    }).thenApply(new Function<Long, Long>() {
        @Override
        public Long apply(Long t) {
            long result = t*5;
            System.out.println("result2="+result);
            return result;
        }
    });
    
    long result = future.get();
    System.out.println(result);
}

执行结果

result1=46
result2=230
230

应用场景当我们进行调用第三方的接口的的时候,然后需要拿到结果进行处理。

4、 handle 方法

handle 是执行任务完成时对结果的处理。 handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法(当异常被捕获后仍然可以执行)。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
public static void handle() throws Exception{
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {

        @Override
        public Integer get() {
            int i= 10/0;
            return new Random().nextInt(10);
        }
    }).handle(new BiFunction<Integer, Throwable, Integer>() {
        @Override
        public Integer apply(Integer param, Throwable throwable) {
            int result = -1;
            if(throwable==null){
                result = param * 2;
            }else{
                System.out.println(throwable.getMessage());
            }
            return result;
        }
     });
    System.out.println(future.get());
}

从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。

总结

  1. CompletableFuture的由来
  2. 还有CompleteTableTuture的异步提交,接受任务结果,接受任务结果且处理,任务完成执行等。
  3. 部分方法