异步神器CompletableFuture
介绍
上个礼拜我们线上有个接口比较慢,这个接口在刚开始响应时间是正常的。但随着数据量的增多,响应时间变慢了。
这个接口里面顺序调用了2个服务,且2个服务之间没有数据依赖。我就用CompletableFuture把调用2个服务的过程异步化了一下,响应时间也基本上缩短为原来的一半,问题解决。
正好上次分享了函数式接口和Stream的使用,这次就分享一下CompletableFuture,里面也用到了大量的函数式接口
想方便的异步执行任务,就必须放到单独的线程中。继承Thread类,实现Runnable都不能拿到任务的执行结果,这时就不得不提创建线程的另一种方式了,实现Callable接口。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable接口一般配合ExecutorService来使用
// ExecutorService.java
<T> Future<T> submit(Callable<T> task);
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(() -> {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
});
// 4950
System.out.println(result.get());
我们从Future中获取结果
public interface Future<V> {
// 取消任务的执行
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已经取消
boolean isCancelled();
// 任务是否已经完成
boolean isDone();
// 获取任务执行结果,会阻塞线程
V get() throws InterruptedException, ExecutionException;
// 超时获取任务执行结果,会阻塞线程
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
对于简单的场景使用Future并没有什么不方便。但是一些复杂的场景就很麻烦, 如2个异步任务,其中一个有结果就直接返回。Future用起来就不方便,因为想获取结果时,要么执行future.get()方法,但是这样会阻塞线程,变成同步操作,要么轮询isDone()方法,但是比较耗费CPU资源。
Netty和Google guava为了解决这个问题,在Future的基础上引入了观察者模式(即在Future上addListener),当计算结果完成时通知监听者。
Java8新增的CompletableFuture则借鉴了Netty等对Future的改造,简化了异步编程的复杂性,并且提供了函数式编程的能力
创建CompletableFuture对象
方法名 |
描述 |
---|---|
completedFuture(U value) |
返回一个已经计算好的CompletableFuture |
runAsync(Runnable runnable) |
使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值 |
runAsync(Runnable runnable, Executor executor) |
使用指定的线程池执行任务,没有返回值 |
supplyAsync(Supplier<U> supplier) |
使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值 |
supplyAsync(Supplier<U> supplier, Executor executor) |
使用指定的线程池执行任务,有返回值 |
@FunctionalInterface
public interface Supplier<T> {
T get();
}
Supplier在《用好强大的Stream》中已经介绍过了,是一个能获取返回值的函数式接口
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println(stringFuture.get());
计算结果完成时
方法名 |
---|
whenComplete(BiConsumer<? super T,? super Throwable> action) |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) |
因为入参是BiConsumer<? super T,? super Throwable>函数式接口,所以可以处理正常和异常的计算结果
whenComplete和whenCompleteAsync的区别如下
- whenComplete:执行完当前任务的线程继续执行whenComplete的任务
- whenCompleteAsync:把whenCompleteAsync这个任务提交给线程池来执行
CompletableFuture的所有方法的定义和whenComplete都很类似
- 方法不以Async结尾意味着使用相同的线程执行
- 方法以Async结尾意味着将任务提交到线程池来执行
- 方法以Async结尾时可以用ForkJoinPool.commonPool()作为线程池,也可以使用自己的线程池
后续介绍的所有方法都只写一种case
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).whenComplete((v, e) -> {
// hello
System.out.println(v);
});
// hello
System.out.println(future.get());
转换,消费,执行
方法名 |
描述 |
---|---|
thenApply |
获取上一个任务的返回,并返回当前任务的值 |
thenAccept |
获取上一个任务的返回,单纯消费,没有返回值 |
thenRun |
上一个任务执行完成后,开始执行thenRun中的任务 |
CompletableFuture.supplyAsync(() -> {
return "hello ";
}).thenAccept(str -> {
// hello world
System.out.println(str + "world");
}).thenRun(() -> {
// task finish
System.out.println("task finish");
});
组合(两个任务都完成)
方法名 |
描述 |
---|---|
thenCombine |
组合两个future,获取两个future的返回结果,并返回当前任务的返回值 |
thenAcceptBoth |
组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值 |
runAfterBoth |
组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务 |
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "欢迎关注 ";
}).thenApply(t -> {
return t + "微信公众号 ";
}).thenCombine(CompletableFuture.completedFuture("Java识堂"), (t, u) -> {
return t + u;
}).whenComplete((t, e) -> {
// 欢迎关注 微信公众号 Java识堂
System.out.println(t);
});
组合(只需要一个任务完成)
方法名 |
描述 |
---|---|
applyToEither |
两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值 |
acceptEither |
两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值 |
runAfterEither |
两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值 |
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "欢迎关注微信公众号";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java识堂";
});
CompletableFuture future = future1.applyToEither(future2, str -> str);
// 欢迎关注微信公众号 Java识堂 随机输出
System.out.println(future.get());
sleepRandom()为我写的一个随机暂停的函数
多任务组合
方法名 |
描述 |
---|---|
allOf |
当所有的CompletableFuture完成后执行计算 |
anyOf |
任意一个CompletableFuture完成后执行计算 |
allOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "欢迎关注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公众号";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java识堂";
});
// 欢迎关注 微信公众号 Java识堂
CompletableFuture.allOf(future1, future2, future3)
.thenApply(v ->
Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" ")))
.thenAccept(System.out::print);
anyOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "欢迎关注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公众号";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java识堂";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
// 欢迎关注 微信公众号 Java识堂 随机输出
System.out.println(resultFuture.get());
异常处理
方法名 |
描述 |
---|---|
exceptionally |
捕获异常,进行处理 |
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100 / 0;
}).thenApply(num -> {
return num + 10;
}).exceptionally(throwable -> {
return 0;
});
// 0
System.out.println(future.get());
当然有一些接口能捕获异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
String str = null;
return str.length();
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常结果为" + v);
} else {
// 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("发生异常了" + e.toString());
}
});
- 高通HAL层之Sensor HAL
- Android Data Binding(数据绑定)用户指南
- Android 禁止Viewpager左右滑动功能
- 高通Audio中ASOC的machine驱动
- 《Redis设计与实现》读书笔记(三十五) ——Redis 二进制位数组及SWAR汉明重量算法
- Android TabWidget底部显示
- 《Redis设计与实现》读书笔记(三十六) ——Redis 慢查询日志实现
- 概率学中的随机变量与分布
- 神马?SQL竟然可以解脑筋急转弯的题目?
- android中一些特殊字符(如:←↑→↓等箭头符号)的Unicode码值
- 基于SpringBoot的任务管理平台v1.0正式发布
- 大数据系统的Lambda架构
- AKKA中的事件流
- Java初涉感悟
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 8848钛金手机之nacos的注册发现
- 让你git 时不再输入账号和密码
- JS 实现点击按钮复制一段文字
- Python操作Excel合并单元格
- CRM第一天:客户关系管理系统的环境搭建和注册
- lambda 表达式导致 Arthas 无法 redefine 的问题
- 战士上战场,还不会部署kubernetes集群?
- 使用docker python 的最新版本以及pip 安装模块
- Java面试题总结之数据库与SQL语句
- Python脚本命令行执行隐藏密码等敏感信息
- 使用SRS Docker搭建自己的直播平台
- @陈同学的专属Python教程之快速使用
- MYSQL错误码2059解决办法
- @陈同学的专属Python教程之函数
- 微信小程序的Web API接口设计及常见接口实现