关于聚合和多线程的处理套路
概述
无差别地请求多个外部接口并聚合所有请求结果,应该有属于它自己的套路,应该将所有多线程的操作屏蔽之,我们只关心参数和结果。因此,应该抛弃Callable/FutureTask/Future等这些手工模式,这些代码应该交给框架来实现。
手工模式
何为手工模式,我们以Callable为例设计请求外部的接口,可能像下面这样子,参数是NumberParam,两个外部接口分别是IntToStringCallable和DoubleToStringCallable,
class IntToStringCallable implements Callable<String> {
private final NumberParam param;
IntToStringCallable(NumberParam numberParam) {
this.param = numberParam;
}
@Override
public String call() {
return Integer.toHexString(param.getAge());
}
}
class DoubleToStringCallable implements Callable<String> {
private final NumberParam param;
DoubleToStringCallable(NumberParam numberParam) {
this.param = numberParam;
}
@Override
public String call() {
return Double.toHexString(param.getMoney());
}
}
如果采用FutureTask的方式多线程执行这两个接口,可能是这样子的,
FutureTask<String> r1 = new FutureTask<>(new IntToStringCallable(numberParam));
new Thread(r1).start();
FutureTask<String> r2 = new FutureTask<>(new DoubleToStringCallable(numberParam));
new Thread(r2).start();
try {
List<String> ret = new ArrayList<>();
ret.add(r1.get());
ret.add(r2.get());
log.info("ret=" + ret);
} catch (Exception ignore) {
}
需要首先构造FutureTask,然后使用Thread比较原始的api去执行,当然还可以再简化一下,比如使用Future方式,
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<String> r1 = threadPool.submit(new IntToStringCallable(numberParam));
Future<String> r2 = threadPool.submit(new DoubleToStringCallable(numberParam));
try {
List<String> ret = new ArrayList<>();
ret.add(r1.get());
ret.add(r2.get());
log.info("ret=" + ret);
} catch (Exception ignore) {
}
我相信这是一种普遍常见的做法了。这里没有必要继续评论这些做法的问题了。
Java 8之后
Java 8之后有了更加方便的异步编程方式了,不用再辛苦地去写Callable的,一句话就可以表达Callable+FutureTask/...,
CompletableFuture<String> pf = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());
改造之前的做法结果可能就是这个样子了,
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());
CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> new DoubleToStringCallable(numberParam).call());
try {
List<String> ret = new ArrayList<>();
ret.add(r1.get());
ret.add(r2.get());
log.info("ret=" + ret);
} catch (Exception ignore) {
}
其实可以看出来,这个时候我们不一定需要一个Callable了,提供异步的能力是supplyAsync来完成的,我们只需要正常的入参出参的普通方法就可以了。
Java 8之后再之后
Java 8之后的异步编程方式确实简单了很多,但是在我们的业务代码中还是出现了和异步编程相关的无关业务逻辑的事情,可否继续简化呢。本案的设计灵感来自同样Java 8的优秀设计——ParallelStream,举个简单的例子,
Arrays.asList("a", "b", "c").parallelStream().map(String::toUpperCase).collect(Collectors.toList());
异步及多线程是ParallelStream来完成的,用户只需要完成String::toUpperCase部分。
本案的设计主要有三个interface来实现,分别是,
public interface MyProvider<T,V> {
T provide(V v);
}
public interface MyCollector<T> {
void collectList(T t);
List<T> retList();
}
public interface MyStream<T,V> {
List<T> toList(List<MyProvider<T,V>> providers, V v);
}
其实MyProvider表达是请求外部接口,MyStream表示一种类似ParallelStream的思想,一种内化异步多线程的操作模式,MyCollector属于内部设计api可以不暴露给用户; 一个改写上面的例子的例子,
@Test
public void testStream() {
MyProvider<String,NumberParam> p1 = new IntToStringProvider();
MyProvider<String,NumberParam> p2 = new DoubleToStringProvider();
List<MyProvider<String, NumberParam>> providers = Arrays.asList(p1, p2);
MyStream<String, NumberParam> myStream = new CollectStringStream();
List<String> strings = myStream.toList(providers, numberParam);
log.info("ret=" + strings);
}
在这个方法内一点异步编程的内容都没有的,用户只需要编程自己关心的逻辑即可,当然是要按照Provider的思路去写,这或许有一点心智负担。 这个CollectStringStream帮我们完成来一些脏活累活,
public List<String> toList(List<MyProvider<String, NumberParam>> myProviders, NumberParam param) {
MyCollector<String> myCollector = new NoMeaningCollector();
List<CompletableFuture<Void>> pfs = new ArrayList<>(myProviders.size());
for (MyProvider<String, NumberParam> provider : myProviders) {
CompletableFuture<Void> pf = CompletableFuture.runAsync(() -> myCollector.collectList(provider.provide(param)), executor);
pfs.add(pf);
}
try {
CompletableFuture.allOf(pfs.toArray(new CompletableFuture[0])).get(3, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof TimeoutException) {
pfs.forEach(p -> {
if (!p.isDone()){
p.cancel(true);
}});
}
}
return myCollector.retList();
}
这样看起这个设计又不美了,但是如果有更多的外部接口需要调用,CollectStringStream就显得很有价值了,新加入再多的请求外部接口要改动的代码很少很少,所以这种思想我觉得是值得推广的。
总结
照例附上参考代码,不过值得思考的是我们如何像优秀的代码学习并运用到自己的项目中。 参考代码,java-toy
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用 JSP+Servlet 模仿京东页面实现购物车功能
- Python 绘制科赫雪花的简单实现
- IP 数据报格式详解
- 地址解析协议 ARP 详解
- IP 地址分为哪几类?
- MySQL选错索引导致的线上慢查询事故
- 聊聊dubbo-go的kubernetesRegistry
- 《Java 开发手册》解读:三目运算符为何会导致 NPE?
- 11.实现AI对战AI的五子棋程序
- 8.wxPython设置图像遮罩(mask)的方法
- Logstash:Data转换,分析,提取,丰富及核心操作
- 7.wxPython制作一个桌面精灵
- html前端之css绘制形状
- Python 对列表中的字符串首字母大写处理
- 对加密-加签的完整流程