RocketMQ详解(7)——顺序消费
时间:2022-07-24
本文章向大家介绍RocketMQ详解(7)——顺序消费,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
RocketMQ详解(7)——顺序消费
一. 顺序消费原理
- 消息的有序性是指消息的消费顺序能够严格保存与消息的发送顺序一致。例如,一个订单产生了3条消息,分别是订单创建、订单付款和订单完成。在消息消费时,同一条订单要严格按照这个顺序进行消费,否则业务会发生混乱。同时,不同订单之间的消息又是可以并发消费的,比如可以先执行第三个订单的付款,再执行第二个订单的创建。
- RocketMQ采用了局部顺序一致性的机制,实现了单个队列中的消息严格有序。也就是说,如果想要保证顺序消费,必须将一组消息发送到同一个队列中,然后再由消费者进行注意消费。
- RocketMQ推荐的顺序消费解决方案是:安装业务划分不同的队列,然后将需要顺序消费的消息发往同一队列中即可,不同业务之间的消息仍采用并发消费。这种方式在满足顺序消费的同时提高了消息的处理速度,在一定程度上避免了消息堆积问题。
二. 生产端实现
- 生产端要实现顺序消费,需要借助于MessageQueueSelector接口。
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
- MessageQueueSelector接口定义了一个select方法,具体参数含义为:
- mqs:该Topic下所有可选的MessageQueue
- msg:待发送的消息
- arg:发送消息时传递的参数
可以实现MessageQueueSelector接口,在select方法中自定义选择哪个MessageQueue。然后调用
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
- 示例代码 本例模拟订单消息的发送。共有3个订单,每个订单都包含下单、支付、结算、完成四个流程,对应4条消息。同一个订单的消息要求严格按照顺序消费,不同订单的消息可以并发执行。 首先实现MessageQueueSelector接口,定制MessageQueue选择策略:
package william.rmq.producer.order;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:36
* @Description: 自定义MessageQueueSelector,根据发送消息时传递的参数,选择指定的MessageQueue
*/
public class OrderMessageQueueSelector implements MessageQueueSelector{
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//选择以参数arg为索引的MessageQueue
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}
在select方法中,根据传入的arg参数决定目标MessageQueue的索引。
下面实现发送消息逻辑:
package william.rmq.producer.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import william.rmq.producer.common.CommonSendCallback;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:32
* @Description:顺序消息生产端
*/
@Slf4j
@Service
public class OrderMessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
private static final String[] ORDER_MESSAGES = {"下单","结算","支付","完成"};
@PostConstruct
public void sendMessage() {
try {
//设置namesrv
producer.setNamesrvAddr(namesrvAddr);
//启动Producer
producer.start();
System.err.println("Order Message Producer Start...");
//创建3组消息,每组消息发往同一个Queue,保证消息的局部有序性
String tags = "Tags";
OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();
//注:要实现顺序消费,必须同步发送消息
for (int i = 0;i < 3;i++){
String orderId = "" + (i + 1);
for (int j = 0,size = ORDER_MESSAGES.length;j < size;j++){
String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];
String keys = message;
byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(RocketMQConstant.TEST_TOPIC_NAME, tags, keys, messageBody);
producer.send(mqMsg, orderMessageQueueSelector,i);
}
}
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
- 使用DefaultMQProducer的send()方法,指定MessageQueueSelector和参数,Broker将会将逻辑上需要保证顺序性的消息发往同一队列。 注:想要实现顺序消费,发送方式必须为同步发送,异步发送无法保证消息的发送顺序!
三. 消费端实现
- 消费端想要实现顺序消费,只要设置监听器实现MessageListenerOrderly接口即可。
- 示例代码 首先自定义MessageListenerOrderly接口实现类,实现顺序消费:
package william.rmq.consumer.order;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:53
* @Description:顺序消息监听器
*/
public class OrderMessageListener implements MessageListenerOrderly{
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeOrderlyStatus.SUCCESS;
}
//设置自动提交
context.setAutoCommit(true);
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Handle Order Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: "
+ msg.getTags() + ",keys: " + msg.getKeys() + ",messageBody: " + messageBody);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ConsumeOrderlyStatus.SUCCESS;
}
}
- 下面就是消费逻辑:
package william.rmq.consumer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/11 17:53
* @Description:顺序消息的消费者
*/
@Service
public class OrderMessageConsumer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
//设置namesrv地址
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe(RocketMQConstant.TEST_TOPIC_NAME, "*");
//注册消息监听器,这里因为要实现顺序消费,所以必须注册MessageListenerOrderly
consumer.registerMessageListener(new OrderMessageListener());
//启动消费端
consumer.start();
System.err.println("Order Message Consumer Start...");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
- 先启动消费端工程,在启动生产端工程,可以看到消费端的控制台输出如下:
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8440000,topic: DefaultCluster,tags: Tags,keys: Order-1-下单,messageBody: Order-1-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8510001,topic: DefaultCluster,tags: Tags,keys: Order-1-结算,messageBody: Order-1-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8540002,topic: DefaultCluster,tags: Tags,keys: Order-1-支付,messageBody: Order-1-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8580003,topic: DefaultCluster,tags: Tags,keys: Order-1-完成,messageBody: Order-1-完成
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C85A0004,topic: DefaultCluster,tags: Tags,keys: Order-2-下单,messageBody: Order-2-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C85F0005,topic: DefaultCluster,tags: Tags,keys: Order-2-结算,messageBody: Order-2-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8620006,topic: DefaultCluster,tags: Tags,keys: Order-2-支付,messageBody: Order-2-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8680007,topic: DefaultCluster,tags: Tags,keys: Order-2-完成,messageBody: Order-2-完成
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C86E0008,topic: DefaultCluster,tags: Tags,keys: Order-3-下单,messageBody: Order-3-下单
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C8720009,topic: DefaultCluster,tags: Tags,keys: Order-3-结算,messageBody: Order-3-结算
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C881000A,topic: DefaultCluster,tags: Tags,keys: Order-3-支付,messageBody: Order-3-支付
Handle Order Message: messageId: 0A0E089B386F18B4AAC23B99C883000B,topic: DefaultCluster,tags: Tags,keys: Order-3-完成,messageBody: Order-3-完成
- 可以看到,在消费端,消息完全按照发送的顺序进行了消费,保证了消息的顺序性。
- 在多Consumer的情况下,不同Queue上的消息可以并发消费,同一个Queue上的消息仍然可以保证顺序消费。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 快速学习-Sentinel 熔断降级
- C#中关于SqlDataAdapter的Update(dataTable)方法
- Jmeter保存下载的文件
- SNAP Java API处理Sentinel-1数据
- springboot开发spark-submit的java代码
- Kustomize ConfigMapGenerate自动生成ConfigMap中的坑
- Godot游戏开发实践之二:AI之寻路新方式
- Vue 侦听器 watch 扩展之立即触发回调、深度监听和注销
- WPF开发之以管理员身份运行
- 快速学习-Sentinel: 分布式系统的流量防卫兵
- Godot游戏开发实践之一:使用High Level Multiplayer API制作多人游戏(上)
- Godot游戏开发实践之一:使用High Level Multiplayer API制作多人游戏(下)
- ansible生产环境使用场景(二)
- Go 语言实现 RPC 调用
- django FileResponse 解决中文命名文件下载后乱码问题