并行执行任务
需求
在app列表首页,展示多个item,并有分页;而每个item里后台都会调用一个http请求,判断当前item的状态
分析
为了更好的用体验,无疑需要使用多线程并行处理http请求,而且还需要拿到每个线程的执行结果. 上面的分析,有两个问题需要解决:
1. 如何创建线程池
2. 如何拿到所有线程的执行结果
对于第一个问题,还是很好解决的,使用并发包( java.util.concurrent
)下面的ThreadPoolExecutor类创建线程池,阿里巴巴Java开发手册上推荐使用该类创建线程池:
,根据统计,该首页的qps最大为3以及服务器的配置后,线程池创建如下:
protected static class ThreadFactory {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(ThreadNumEnum.CORE_POOL_SIZE.getNum(), ThreadNumEnum.MAX_IMUM_POOL_SIZE.getNum(), TimeOutEnum.FiveSecond.getSeconds(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
private ThreadFactory() {
}
public static ExecutorService getThreadPool() {
return executor;
}
}
如何能拿到线程的执行结果呢,传统的Thread无法拿到执行结果,由于run方法无返回值,通过ThreadPoolExecutor类图发现:
继承了AbstractExecutorService、ExecutorService,对ExecutorService中的invokeAll方法产生极大的兴趣,仔细阅读注释,其实这个方法用来并行执行任务:
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
到这里,第二个问题的解决思路已经有了.
编码实现
invokeAll方法的入参分别为任务列表list、超时时间、时间单位,所以首先我们需要创建任务列表:
List<BasicUserFilter>list=newArrayList<>();
超时时间为每个FutureTask执行超时时间,这里设置成3s,这里的3s超时时间是针对的所有tasks,而不是单个task的超时时间,如果超时,会取消没有执行完的所有任务,并抛出超时异常,源码如下:
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
BasicUserFilter需要实现Callable接口,因为在方法call里能拿到线程的执行结果, 下面就是并行执行任务了:
ExecutorService executor = ThreadFactory.getThreadPool();
List<XXX> userFilterDtoList = new ArrayList<>();
try {
List<Future<XXX>> futureList = executor.invokeAll(list, TimeOutEnum.FourSecond.getSeconds(), TimeUnit.MILLISECONDS);
futureList.stream().forEach(p -> {
try {
Future<XXX> filterDtoFuture = p;
//拿到线程执行结果
userFilterDtoList.add(filterDtoFuture.get());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
关于多线程笔者还有很多需要去学习,上面是一个工作笔记,关于invokeAll的执行流程、神奇的Future模式,感兴趣的可以阅读源码就能找到答案.
- 基于改进人工蜂群算法的K均值聚类算法(附MATLAB版源代码)
- RabbitMQ入门-Routing直连模式
- WCF技术剖析之三十二:一步步创建一个完整的分布式事务应用
- .NET的资源并不限于.resx文件,你可以采用任意存储形式[上篇]
- RabbitMQ入门-消息订阅模式
- 年终盘点:2018最值得学习的几种热门编程语言
- 如何编写没有Try/Catch的程序
- RabbitMQ入门-消息派发那些事儿
- RabbitMQ入门-高效的Work模式
- 谈谈分布式事务之四: 两种事务处理协议OleTx与WS-AT
- RabbitMQ入门-从HelloWorld开始
- RabbitMQ入门-从HelloWorld开始
- RabbitMQ入门-初识RabbitMQ
- 谈谈分布式事务之三: System.Transactions事务详解[下篇]
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android标题栏上添加多个Menu按钮的实例
- Android编程解析XML文件的方法详解【基于XmlPullParser】
- 教你3分钟了解Android 简易时间轴的实现方法
- Android网络技术HttpURLConnection详解
- Ubuntu16.04下CUDA8.0和CUDA9.0共存
- Ubuntu 20.04 开启隐藏录音降噪功能(推荐)
- 解密 Linux 版本信息的方法
- ubuntu20.04连接wifi的方法(2种)
- 服务器Centos部署MySql并连接Navicat过程详解
- Android横竖屏幕切换生命周期详解
- LINUX中如何查看某个端口是否被占用的方法
- Android实现RecyclerView添加分割线的简便方法
- Android定时器Timer的停止和重启实现代码
- Ubuntu20.04开启root账户的方法步骤
- Android 将view 转换为Bitmap出现空指针问题解决办法