RocketMQ源码 — 十、 RocketMQ顺序消息
RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别
发送顺序消息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
send方法带有参数MessageQueueSelector,MessageQueueSelector是让用户自己决定消息发送到哪一个队列,如果是局部消息的话,用来决定消息与队列的对应关系。
顺序消息消费
consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeOrderlyStatus.SUCCESS; } });
从使用上可以推断顺序消息需要从发送到消费整个过程中保证有序,所以顺序消息具体表现为
- 发送消息是顺序的
- broker存储消息是顺序的
- consumer消费是顺序的
下面分别看看rmq怎么实现顺序消息
发送顺序消息
因为broker存储消息有序的前提是producer发送消息是有序的,所以这两个结合在一起说。
消息发布是有序的含义:producer发送消息应该是依次发送的,所以要求发送消息的时候保证:
- 消息不能异步发送,同步发送的时候才能保证broker收到是有序的。
- 每次发送选择的是同一个MessageQueue
同步发送
从刚开始的例子中发送消息的时候,会调用下面的方法
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); }
CommunicationMode.SYNC表明了producer发送消息的时候是同步发送的。同步发送表示,producer发送消息之后不会立即返回,会等待broker的response。
broker收到producer的请求之后虽然是启动线程处理的,但是在线程中将消息写入commitLog中以后会发送response给producer,producer在收到broker的response并且是处理成功之后才算是消息发送成功。
同一个MessageQueue
为了保证broker收到消息也是顺序的,所以producer只能向其中一个队列发送消息。因为只有是同一个队列才能保证消息是发往同一个broker,只有同一个broker处理发来的消息才能保证顺序。所以发送顺序消息的时候需要用户指定MessageQueue,在send调用过程中会调用下面的方法,下面的方法中回调了用户指定的select queue的方法
private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { // 调用用户指定的select方法来选出一个queue,如果是全局顺序,用户必须保证自己选出的queue是同一个 mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout); } else { throw new MQClientException("select message queue return null.", null); } } throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); }
这样每次发送的消息都是同一个MessageQueue,也就是都会发送到同一个broker,这个发送消息的过程都保证了顺序,也就保证了broker存储在CommitLog中的消息也是顺序的。
顺序消费
保证了broker中物理存储的消息是顺序的,只要保证消息消费是顺序的就能保证整个过程是顺序消息了。
还是开始的例子中,顺序消费和普通消费的listener是不一样的,顺序消费需要实现的是下面这个接口
org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
在consumer启动的时候会根据listener的类型判断应该使用哪一个service来消费
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { // 顺序消息 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { // 非顺序消息 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
consumer拉取消息是按照offset拉取的,所以consumer能保证拉取到consumer的消息是连续有序的,但是consumer拉取到消息后又启动了线程去处理消息,所以就不能保证先拉取到的消息先处理
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage // org.apache.rocketmq.client.consumer.PullCallback#onSuccess // 将拉取到的消息放入ProcessQueue boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 这里的consumeMessageService就是ConsumeMessageOrderlyService DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume); // org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 拉取到的消息构造ConsumeRequest,然后放入线程池等待执行 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
上面将请求放入线程池了,所以线程执行的顺序又不确定了,那么consumer消费就变成无序的了吗?
答案显然是不会变成无序的,因为上面有一行关键的代码
processQueue.putMessage(pullResult.getMsgFoundList());
先说一下ProcessQueue这个关键的数据结构。一个MessageQueue对应一个ProcessQueue,是一个有序队列,该队列记录一个queueId下所有从brokerpull回来的消息,如果消费成功了就会从队列中删除。ProcessQueue有序的原因是维护了一个TreeMap
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
msgTreeMap:里面维护了从broker pull回来的所有消息,TreeMap是有序的,key是Long类型的,没有指定comparator,默认是将key强转为Comparable,然后进行比较,因为key是当前消息的offset,而Long实现了Comparable接口,所以msgTreeMap里面的消息是按照offset排序的。
所以是ProcessQueue保证了拉取回来的消息是有序的,继续上面说到的启动线程执行ConsumeRequest.run方法来消费消息
Override public void run() { // 保证一个队列只有一个线程访问,因为顺序消息只有一个队列,也就保证了只有一个线程消费 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 从ProcessQueue中获取消息进行消费,获取出来的消息也是有序的 final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); // 省略中间代码 }
从ProcessQueue中获取出来的消息是有序的,consumer保证了消费的有序性。
总结
rmq从发送消息、broker存储消息到consumer消费消息整个过程中,环环相扣,最终保证消息是有序的。
转载于:https://www.cnblogs.com/sunshine-2015/p/9074956.html
原文地址:https://www.cnblogs.com/it-deepinmind/p/12994211.html
- 字符串的排列
- 斐波那契额数列及青蛙跳台阶问题
- 在Mac OS X上配置Apache2
- 合并两个排序的链表
- 还有5天,你的比特币最重要的孩子UB-UBTC 可能就永远不属于你了
- Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)
- Webpack多入口文件、热更新等体验
- 从hello world 解析程序运行机制
- 万达大量员工“被”辞职?曲德君回应:万达网科没有倒
- iOS Programming – 触摸事件处理(2)
- 洋葱海外仓融资2亿元 官网启用msyc.cc域名
- Webpack单元测试,e2e测试
- [看图说话] 基于Spark UI性能优化与调试——初级篇
- Unix下c程序内存泄露检测工具
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PowerBI 超级粘性用户计算 - 原理与实现
- Chrome 技术篇-常用web调试手法:清除缓存并硬性重新加载
- 数据库之索引模块
- Python 爬虫篇-爬取web页面所有可用的链接实战演示,展示网页里所有可跳转的链接地址
- Python爬虫,微信公众号话题标签内容采集打印PDF输出
- Windows 技术篇-设置dns提升网速,刷新dns缓存
- 搭建高可用的Replication集群归档大量的冷数据
- Python 技术篇-文件操控:文件的移动和复制
- Python发邮件脚本,Python调用163邮箱SMTP服务实现邮件群发
- 为PXC集群引入Mycat并构建完整的高可用集群架构
- Python3 字典
- 安装Percona Server数据库(in CentOS 8)
- Python 基础篇-正斜杠("/")和反斜杠("")的用法
- 在CentOS8下搭建PXC集群
- Python 基础篇-相对路径、绝对路径的写法