高并发场景下请求合并的实践(转)
转自https://www.cnblogs.com/flylinran/p/10177304.html
前言
项目中一般会请求第三方的接口,也会对外提供接口,可能是RPC,也可能是HTTP等方式。在对外提供接口时,有必要提供相应的批量接口,好的批量实现能够提升性能。
高并发场景中,调用批量接口相比调用非批量接口有更大的性能优势。但有时候,请求更多的是单个接口,不能够直接调用批量接口,如果这个接口是高频接口,对其做请求合并就很有必要了。比如电影网站的获取电影详情接口,APP的一次请求是单个接口调用,用户量少的时候请求也不多,完全没问题;但同一时刻往往有大量用户访问电影详情,是个高并发的高频接口,如果都是单次查询,后台就不一定能hold住了。为了优化这个接口,后台可以将相同的请求进行合并,然后调用批量的查询接口。如下图所示
无合并请求
合并请求前,我们一般是调用服务层的单次创建方法。看起来都比较简单,且易于理解。
以创建设备接口为例。
@Reference(check = false)
private DeviceService deviceService;
/**
* 注册设备
*
* @param productKey 产品key
* @param deviceName 设备名
* @return 设备ID
*/
public R<Long> registDevice(String productKey, String deviceName) {
log.debug("开始注册: {}, {}", productKey, deviceName);
DeviceRequestDto deviceCreateQuery = new DeviceRequestDto()
.setProductKey(productKey)
.setName(deviceName);
Long deviceId = deviceService.createDevice(deviceCreateQuery);
return deviceId != null
? R.ok(deviceId)
: R.error(DEVICE_CREATE_ERROR);
}
请求合并
请求合并的好处前面有提到,那不能每次写接口就做请求合并吧?我们要明白,技术无好坏,要在特定的业务场景下衡量利弊,采用与否需要深思熟虑。合并请求会令代码变得复杂,也会增加一定的接口延迟,其中还可能存在各种未知的风险。
合并请求是针对高并发场景的一种手段,我们实现请求合并之前,要结合业务场景思考一番,是否值得承受的合并带来的访问延迟?用户体验是否会打折扣?自身的技术是否足够hold住请求合并带来的未知风险?
思路:收到前端的请求时,先存起来,隔段时间批量请求第三方服务批量接口,然后分别通知存起来的请求,并且响应前端。
代码实现
还是针对上述设备注册接口,我们对其进行改造,来实现一个简单的请求合并。
1. 批量接口
首先,我们需要有能够批量调用的接口。在对外提供接口时,也非常有必要提供相应的批量接口,且内部实现应该是优化过的。
此处我们在服务层模拟了一个批量创建设备的接口, 如下:
- 方法签名
/**
* 批量创建设备接口
*
* @param deviceRequestDtoList 入参信息
* @return 创建结果
*/
R<List<DeviceCreateResp>> batchCreateDevice(List<DeviceCreateQuery> deviceList);
- 入参
@Data
public class DeviceCreateQuery implements Serializable {
/**
* 产品标识
*/
private String productKey;
/**
* 设备名称
*/
private String name;
/**
* 请求源,一次批量请求保证唯一
*/
private String requestSource;
}
- 返回值
@Data
public class DeviceCreateResp implements Serializable {
/**
* 设备ID
*/
private Long deviceId;
/**
* 请求源,一次批量请求保证唯一
*/
private String requestSource;
}
2. 合并单个请求
- 积攒请求的阻塞队列
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
- 积攒请求的自定义结构
@Data
static class DeviceCreateRequest {
/** 产品key */
private String productKey;
/** 设备名 */
private String deviceName;
/** 请求源,需保证唯一 */
private String requestSource;
/** CompletableFuture接口 */
private CompletableFuture<Long> completedFuture;
}
- 积攒请求
public R<Long> registDevice(String productKey, String deviceName) {
log.debug("开始注册: {}, {}", productKey, deviceName);
// 缓存请求 ====== start
CompletableFuture<Long> completedFuture = new CompletableFuture<>();
DeviceCreateRequest deviceCreateRequest = new DeviceCreateRequest();
deviceCreateRequest.setProductKey(productKey);
deviceCreateRequest.setDeviceName(deviceName);
deviceCreateRequest.setRequestSource(UUID.randomUUID().toString());
deviceCreateRequest.setCompletedFuture(completedFuture);
deviceCreateQueue.add(deviceCreateRequest);
// 缓存请求 ====== end
Long deviceId = null;
try {
deviceId = completedFuture.get();
} catch (Exception e) {
log.error("设备注册失败", e);
}
return deviceId != null
? R.ok(deviceId)
: R.error(DEVICE_CREATE_ERROR);
}
3. 发送批量请求
此处使用了spring,在init方法中利用定时任务线程池批量分发请求。同时使用了newScheduledThreadPool
,其中线程池大小和定时间隔时长需要根据业务量做权衡
/** 积攒请求的阻塞队列 */
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
/** 线程池数量 */
@Value("${iot.register.merge.device.request.num:100}")
private int createDeviceMergeNum;
/** 定时间隔时长 */
@Value("${iot.register.merge.device.request.period:30}")
private long createDeviceMergePeriod;
@Reference(check = false)
private DeviceService deviceService;
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// 把出queue的请求存储一次
List<DeviceCreateRequest> questBak = new ArrayList<>();
// 批量创建设备的入参
List<DeviceCreateQuery> deviceCreateQueryList = new ArrayList<>();
int size = deviceCreateQueue.size();
for (int i = 0; i < size; i++) {
DeviceCreateRequest deviceCreateRequest = deviceCreateQueue.poll();
if (Objects.nonNull(deviceCreateRequest)) {
questBak.add(deviceCreateRequest);
deviceCreateQueryList.add(buildDeviceCreateQuery(deviceCreateRequest));
}
}
if (!deviceCreateQueryList.isEmpty()) {
try {
List<DeviceCreateResp> response = deviceService.batchCreateDevice(deviceCreateQueryList);
Map<String, Long> collect = response.stream()
.collect(Collectors.toMap(
DeviceCreateResp::getRequestSource, DeviceCreateResp::getDeviceId
));
// 通知请求的线程
for (DeviceCreateRequest deviceCreateRequest : questBak) {
deviceCreateRequest.getCompletedFuture().complete(collect.get(deviceCreateRequest.getRequestSource()));
}
} catch (Throwable throwable) {
log.error("批量注册设备异常", throwable);
// 通知请求的线程-异常
questBak.forEach(deviceCreateRequest -> deviceCreateRequest.getCompletedFuture().obtrudeException(throwable));
}
}
}, 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS);
}
总结
请求合并是解决高并发场景下某些问题的一种思路,本文只做了一个简单的实现,算是对这块知识的一次实践吧。用到了BlockingDeque
、CompletableFuture
接口,涉及Java多线程相关的知识,实现方式比较野蛮。业界有很多优秀的开源框架做请求合并,比如hystrix
,需要花时间好好学习哈哈。
原文地址:https://www.cnblogs.com/ffaiss/p/11009922.html
- Java 持久化操作之 --XML
- 算法模板——splay区间反转 1
- 3223: Tyvj 1729 文艺平衡树
- 1212: [HNOI2004]L语言
- POJ 2942Knights of the Round Table(tarjan求点双+二分图染色)
- 算法模板——平衡树Treap
- Java并发编程
- 算法模板——线段树2(区间加+区间乘+区间求和)
- 1798: [Ahoi2009]Seq 维护序列seq
- 【LeetCode 389】 关关的刷题日记30 Find the Difference
- 1708: [Usaco2007 Oct]Money奶牛的硬币
- 1856: [Scoi2010]字符串
- 【LeetCode 409】 关关的刷题日记31Longest Palindrome
- Git的奇技淫巧?
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- laravel csrf排除路由,禁止,关闭指定路由的例子
- thinkphp框架类库扩展操作示例
- Laravel ORM 数据model操作教程
- Laravel框架基础语法与知识点整理【模板变量、输出、include引入子视图等】
- laravel 解决Eloquent ORM的save方法无法插入数据的问题
- laravel框架中控制器的创建和使用方法分析
- php 使用expat方式解析xml文件操作示例
- laravel利用中间件做防非法登录和权限控制示例
- laravel框架中表单请求类型和CSRF防护实例分析
- Yii框架getter与setter方法功能与用法分析
- laravel框架中视图的基本使用方法分析
- laravel5 Eloquent 实现事务方式
- Laravel 微信小程序后端搭建步骤详解
- Laravel使用swoole实现websocket主动消息推送的方法介绍
- Laravel框架Eloquent ORM删除数据操作示例