[记录点滴]Redis实现简单消息队列
[记录点滴]Redis实现简单消息队列
0x00 摘要
本文提出了一种用Redis实现简单消息队列的方案,适合在资源不足的条件下临时使用。
0x01 缘由
一个兄弟创业,资源严重不足,但是还希望搭建一个消息队列,于是就咨询我。我恰好有些相关经验,就和他分享。他的需求如下:
- 主要目的是为了解耦,消息先存入队列,然后再从队列转存到数据库。
- 对消息可靠性要求不高,使用场景是消息容忍丢失,或者说对性能的渴求大于可靠性。
- 不考虑分组消费,重复消费和广播。
- 不考虑消息序列顺序。
- 系统现在已经有Redis做缓存。
- 人力和财力资源不足以再使用专业的Queue。
在这种情况下,我建议他在Redis上构建消息队列,暂时渡过难关。
0x02 背景概念
2.1 Redis是否适合做消息队列
首先说结论:Redis肯定是不适合做消息队列的,因为这个本身就不是Redis设计的初衷。
但是如果确实资源受限,为了降低系统的维护成本和实现复杂度,还是可以考虑使用Redis的。
2.1.1 Redis的问题
因为Redis就不是为消息队列设计的,所以它没有考虑一些消息队列的基本问题:
- 队列丢东西怎么办?
- 如果队列暂时无法被插入新数据,有没有办法把新数据暂时存储在临时存储上等队列恢复时候再重新插入?
- 消费者读取数据时候是否需要一个“commit”的语义?是否需要确认已经读取处理完毕?
- 队列长度是否有限制?如果达到最大长度怎么办?......
如果按照是否容忍错误来区分,可以分为两种队列,但是这两种都不适合Redis。
2.1.2 不容忍错误
这种队列的要求是:不允许丢失消息,要保证一致性。比如下单操作。
在这个需求下,用Redis是不实际的,因为你需要考虑如何在Redis基础上做一次性和异步幂等,保证exactly once。那样就应该使用常见的MQ,比如RabbitMQ, Kafka....
2.1.3 容忍错误
比如日志收集。这种允许一定程度的数据丢失,这种其实也不适合Redis,而且现成的方案有很多,比如fluentd,logstash……
2.2 Redis做消息队列的方案
一般来说有四种方式
- 基于List的 LPUSH+BRPOP 的实现
- 基于PUB/SUB,订阅/发布模式
- 基于Sorted-Set的实现
- 基于Stream类型的实现Redis在本实例中的应用
或者可以考虑基于Redis作者写的disque来做开发?
2.4 本文采取的方案
本文采用Redis的List作为队列可以用来在不同程序之间交换消息。生成者使用LPUSH或者RPUSH将一个消息放入队列。消费者使用RPOP或者LPOP命令取出队列中等待时间最长的消息。List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。
但是这样有两个问题:
- 队列为空时候,LPOP或者RPOP会一直轮训,这样极大消耗资源。
- 如果客户端在消费一个消息时候崩溃,则未处理完的消息也就因此丢失。
因此需要
- 使用RPOPLPUSH命令(或者它的阻塞版本BRPOPLPUSH)。
- 或者引入阻塞读blpop和brpop(b代表blocking),阻塞读 在队列没有数据的时候进入休眠状态。
2.4.1 RPOPLPUSH
RPOPLPUSH好处在于:它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中。如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用LREM命令将这个消息从备份列表删除。
最后,还可以添加一个客户端专门用于监视备份表,它自动地将超过一定处理时限的消息重新放入队列中去(负责处理该消息的客户端可能已经崩溃),这样就不会丢失任何消息了。
需要注意的问题 :RPOPLPUSH重新入队,即把备份列表右侧元素(表尾)重新入队,可能会出现消息被重复消费的情况。因此消费操作要实现幂等性,即保证重复消费结果一致.
2.4.2 BLPOP和BRPOP
好处在于 :阻塞读在队列没有数据的时候进入休眠状态,一旦数据到来则立刻醒过来,消息延迟几乎为零。
需要注意的问题 :空闲连接的问题。如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。
0x03 生产者LUA
他的数据是由LUA产生的,由Openresty运行。
具体代码摘要如下:
local REDIS = require "redis_iresty"
local REDIS_STORE = REDIS:new(CONF)
REDIS_STORE:lpush(LOG_LIST, log)
0x04 消费者 JAVA
因为生产者是 LPUSH,所以消费者使用 RPOPLPUSH。
4.1 数据变量
因为RPOPLPUSH不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,所以mKey是消息列表,mKeyRollback是备份列表。从Redis读出消息后临时存储在mActionList。
protected List<String> mActionList = new CopyOnWriteArrayList<String>();
@Value("${key.list}")
private String mKey;
@Value("${key.rollback.list}")
private String mKeyRollback;
4.2 消费函数
consume是消费函数。当出现异常时候,会从备份列表中把消息再写回到消息队列。
public boolean consume() {
rollbackLastLaunch(); //上次同步失败的,这次先弄回去
while(true) {
try {
if (schedulejob) {
timerecord = System.currentTimeMillis();
schedulejob = false;
}
// 从消息队列取出消息,同时Redis操作会自动把取出的消息放入备份队列。
String action = mRedisStore.listRightPopAndLeftPush(mKey, mKeyRollback, mWaitTimeLimit, TimeUnit.SECONDS);
if(action != null) {
mActionList.add(action);
}
currentTimeStamp = System.currentTimeMillis();
if (mActionList.size() >= mBatchSize ||
(currentTimeStamp - timerecord >= mTimeElapsedLimit && mActionList.size() > 0)) {
schedulejob = true;
boolean res = sync2MySql();
if (res == true) {
clearRollback(); //清除备份列表
} else {
rollbackLastLaunch(); //rollback();
}
mActionList.clear();
}
} catch (Exception e) {
//发生了网络异常,需要把processing中的id再放回到waiting queue中
//如果redis, mysql异常,都会在这里被catch
rollbackLastLaunch();
} finally {
}
}
}
}
具体Redis操作是StringRedisTemplate.opsForList().rightPopAndLeftPush
函数。
public String listRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) {
getTemplate().setDefaultSerializer(new StringRedisSerializer());
return getTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit);
}
使用这个是因为它支持配置超时时间。
V rightPopAndLeftPush(K var1, K var2, long var3, TimeUnit var5);
4.3 删除备份消息
clearRollback函数是当消息被成功处理之后,从备份队列中删除备份消息。
protected void clearRollback() {
Long count = mRedisStore.getListSize(mKeyRollback);
while(count > 0 ) {
mRedisStore.listRightPop(mKeyRollback);
count--;
}
}
4.4 处理异常
当出现问题时候,会调用rollbackLastLaunch函数,从备份列表中把消息再写回到消息队列。
因为我们需要在一个Redis操作中执行lpop和rpush两个操作,必须把这两个操作构建成一个原子序列,所以这里涉及到了Lua脚本的使用。通过内嵌对 Lua 环境的支持, Redis 解决了长久以来不能高效地处理 CAS (check-and-set)命令的缺点, 并且可以通过组合使用多个命令, 轻松实现以前很难实现或者不能高效实现的模式。
void rollbackLastLaunch() {
try {
Long count = mRedisStore.getListSize(mKeyRollback);
Long dbsize = 0l;
while(count > 0 ) {
List<String> keys = new ArrayList<String>();
keys.add(mKeyRollback);
keys.add(mKey);
DefaultRedisScript<Long> script = new DefaultRedisScript<Long>();
script.setScriptText("local action = redis.call('lpop', KEYS[1]); local result = redis.call('rpush', KEYS[2], action); return result;");
script.setResultType(Long.class);
dbsize += mRedisStore.executeScript(script, keys, null);
count--;
}
} catch (Exception e) {
}
}
0x05 参考
Redis 阻塞、安全队列 BLPOP / BRPOP / LPUSH
- 晚上好啊!这是今天人工智能精选要闻
- 构建Flex应用的10大误区
- Flex的起步推动新语言学习
- 简单科普云计算相关内容
- Silverlight初级教程-开发工具
- WCF的Binding模型之五:绑定元素(Binding Element)
- 2018年物联网发展趋势
- Silverlight初级教程-概述
- WCF的Binding模型之四:信道工厂(Channel Factory)
- 如何做一个好的前端重构工程师
- 云计算是否真的对企业来说无所不能?
- 科协带你开个2017年科技世界的总结会!
- 基于StockRanker算法的机器学习量化策略
- Enterprise Library深入解析与灵活应用(6):自己动手创建迷你版AOP框架
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 精解四大集合框架:Map核心知识总结
- 厉害了,Matplotlib还能这样画散点图!
- 20种小技巧,玩转Google Colab
- 不用try catch,如何机智的捕获错误
- StyleGAN2玩出新高度!从华盛顿到特朗普,无缝切换生成历届美国总统
- 深入理解MySQL中事务隔离级别的实现原理
- 浅析常见的算法范式
- 检查 JavaScript 变量是否为数字的几种方式
- 玩转Google Colab!附20种小技巧
- 猿实战19——商品发布之商详数据准备
- 【大家的项目】code-minimap
- 尤大 几天前发在 GitHub 上的 vue-lit 是啥?
- 用回溯算法求解数独问题
- bug 回忆录(四)
- 新手入门系列之-React / Vue 应用持续集成Docker 化