分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

时间:2022-04-26
本文章向大家介绍分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下),主要内容包括1、概述、2、Consumer、3、PushConsumer 一览、4、PushConsumer 订阅、DefaultMQPushConsumer#registerMessageListener(...)、5、PushConsumer 消息队列分配、MQClientInstance#doRebalance(...)、DefaultMQPushConsumerImpl#doRebalance(...)、RebalanceImpl#doRebalance(...)、5、PushConsumer 消费进度读取、[PullConsumer] RebalancePullImpl#computePullFromWhere(...)、6、PushConsumer 拉取消息、DefaultMQPushConsumerImpl#pullMessage(...)、总结、6、PushConsumer 消费消息、ConsumeRequest、ConsumeMessageConcurrentlyService#processConsumeResult(...)、ConsumeMessageConcurrentlyService#cleanExpireMsg(...)、7、PushConsumer 发回消费失败消息、8、Consumer 消费进度、9、结尾、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

前传:分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述
  • 2、Consumer
  • 3、PushConsumer 一览
  • 4、PushConsumer 订阅
  • 5、PushConsumer 消息队列分配
  • 6、PushConsumer 消费进度读取
  • 7、PushConsumer 拉取消息
  • 8、PushConsumer 消费消息
  • 9、PushConsumer 发回消费失败消息
  • 10、Consumer 消费进度
  • 11、结尾

1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:
    • 在大多数场景下使用。
    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时, Broker挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
  • PullConsumer

本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费 本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费 本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断不断不断Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStoreConsumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 NamesrvBroker 的 API调用,提供给 ProducerConsumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

1: public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 创建订阅数据
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 通过心跳同步Consumer信息到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }
  • 说明 :订阅 Topic
  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
  2:     String subString) throws Exception {
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 处理订阅表达式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\|\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25: 
 26:     return subscriptionData;
 27: }
  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }
  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

RebalanceService

1: public class RebalanceService extends ServiceThread {
  2: 
  3:     /**
  4:      * 等待间隔,单位:毫秒
  5:      */
  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval", "20000"));
  9: 
 10:     private final Logger log = ClientLogger.getLog();
 11:     /**
 12:      * MQClient对象
 13:      */
 14:     private final MQClientInstance mqClientFactory;
 15: 
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19: 
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23: 
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28: 
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31: 
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }
  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。
  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发: 详细解析见:MQClientInstance#doRebalance(...)。
    • 第25行 等待超时,每 20s 调用一次。
    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。
    • Broker 通知 Consumer 加入 或 移除时, Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

MQClientInstance#doRebalance(...)

1: public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }
  • 说明 :遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
  • 疑问:目前代码调试下来, consumerTable 只包含 Consumer 自己。?有大大对这个疑问有解答的,烦请解答下。
  • 第 6 行 :调用 MQConsumerInner#doRebalance(...) 进行队列分配。 DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分别对该接口方法进行了实现。 DefaultMQPushConsumerImpl#doRebalance(...)详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。

DefaultMQPushConsumerImpl#doRebalance(...)

1: public void doRebalance() {
  2:     if (!this.pause) {
  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  4:     }
  5: }
  • 说明:执行消息队列分配。
  • 第 3 行 :调用 RebalanceImpl#doRebalance(...) 进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。

RebalanceImpl#doRebalance(...)

1: /**
  2:  * 执行分配消息队列
  3:  *
  4:  * @param isOrder 是否顺序消息
  5:  */
  6: public void doRebalance(final boolean isOrder) {
  7:     // 分配每个 topic 的消息队列
  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  9:     if (subTable != null) {
 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 11:             final String topic = entry.getKey();
 12:             try {
 13:                 this.rebalanceByTopic(topic, isOrder);
 14:             } catch (Throwable e) {
 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 16:                     log.warn("rebalanceByTopic Exception", e);
 17:                 }
 18:             }
 19:         }
 20:     }
 21:     // 移除未订阅的topic对应的消息队列
 22:     this.truncateMessageQueueNotMyTopic();
 23: }
 24: 
 25: /**
 26:  * 移除未订阅的消息队列
 27:  */
 28: private void truncateMessageQueueNotMyTopic() {
 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
 31:         if (!subTable.containsKey(mq.getTopic())) {
 32: 
 33:             ProcessQueue pq = this.processQueueTable.remove(mq);
 34:             if (pq != null) {
 35:                 pq.setDropped(true);
 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
 37:             }
 38:         }
 39:     }
 40: }
  • #doRebalance(...) 说明 :执行分配消息队列。
    • 第 7 至 20 行 :循环订阅主题集合( subscriptionInner ),分配每一个 Topic 的消息队列。
    • 第 22 行 :移除未订阅的 Topic 的消息队列。
  • #truncateMessageQueueNotMyTopic(...) 说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。

RebalanceImpl#rebalanceByTopic(...)

1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2:     switch (messageModel) {
  3:         case BROADCASTING: {
  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  5:             if (mqSet != null) {
  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  7:                 if (changed) {
  8:                     this.messageQueueChanged(topic, mqSet, mqSet);
  9:                     log.info("messageQueueChanged {} {} {} {}", //
 10:                         consumerGroup, //
 11:                         topic, //
 12:                         mqSet, //
 13:                         mqSet);
 14:                 }
 15:             } else {
 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 17:             }
 18:             break;
 19:         }
 20:         case CLUSTERING: {
 21:             // 获取 topic 对应的 队列 和 consumer信息
 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 24:             if (null == mqSet) {
 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 27:                 }
 28:             }
 29: 
 30:             if (null == cidAll) {
 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
 32:             }
 33: 
 34:             if (mqSet != null && cidAll != null) {
 35:                 // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
 36:                 List<MessageQueue> mqAll = new ArrayList<>();
 37:                 mqAll.addAll(mqSet);
 38: 
 39:                 Collections.sort(mqAll);
 40:                 Collections.sort(cidAll);
 41: 
 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 43: 
 44:                 // 根据 队列分配策略 分配消息队列
 45:                 List<MessageQueue> allocateResult;
 46:                 try {
 47:                     allocateResult = strategy.allocate(//
 48:                         this.consumerGroup, //
 49:                         this.mQClientFactory.getClientId(), //
 50:                         mqAll, //
 51:                         cidAll);
 52:                 } catch (Throwable e) {
 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
 54:                         e);
 55:                     return;
 56:                 }
 57: 
 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();
 59:                 if (allocateResult != null) {
 60:                     allocateResultSet.addAll(allocateResult);
 61:                 }
 62: 
 63:                 // 更新消息队列
 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
 65:                 if (changed) {
 66:                     log.info(
 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
 69:                         allocateResultSet.size(), allocateResultSet);
 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
 71:                 }
 72:             }
 73:             break;
 74:         }
 75:         default:
 76:             break;
 77:     }
 78: }
 79: 
 80: /**
 81:  * 当负载均衡时,更新 消息处理队列
 82:  * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 83:  * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
 84:  *
 85:  * @param topic Topic
 86:  * @param mqSet 负载均衡结果后的消息队列数组
 87:  * @param isOrder 是否顺序
 88:  * @return 是否变更
 89:  */
 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
 91:     boolean changed = false;
 92: 
 93:     // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 95:     while (it.hasNext()) { // TODO 待读:
 96:         Entry<MessageQueue, ProcessQueue> next = it.next();
 97:         MessageQueue mq = next.getKey();
 98:         ProcessQueue pq = next.getValue();
 99: 
100:         if (mq.getTopic().equals(topic)) {
101:             if (!mqSet.contains(mq)) { // 不包含的队列
102:                 pq.setDropped(true);
103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104:                     it.remove();
105:                     changed = true;
106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107:                 }
108:             } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109:                 switch (this.consumeType()) {
110:                     case CONSUME_ACTIVELY:
111:                         break;
112:                     case CONSUME_PASSIVELY:
113:                         pq.setDropped(true);
114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115:                             it.remove();
116:                             changed = true;
117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118:                                 consumerGroup, mq);
119:                         }
120:                         break;
121:                     default:
122:                         break;
123:                 }
124:             }
125:         }
126:     }
127: 
128:     // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130:     for (MessageQueue mq : mqSet) {
131:         if (!this.processQueueTable.containsKey(mq)) {
132:             if (isOrder && !this.lock(mq)) {
133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134:                 continue;
135:             }
136: 
137:             this.removeDirtyOffset(mq);
138:             ProcessQueue pq = new ProcessQueue();
139:             long nextOffset = this.computePullFromWhere(mq);
140:             if (nextOffset >= 0) {
141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142:                 if (pre != null) {
143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144:                 } else {
145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146:                     PullRequest pullRequest = new PullRequest();
147:                     pullRequest.setConsumerGroup(consumerGroup);
148:                     pullRequest.setNextOffset(nextOffset);
149:                     pullRequest.setMessageQueue(mq);
150:                     pullRequest.setProcessQueue(pq);
151:                     pullRequestList.add(pullRequest);
152:                     changed = true;
153:                 }
154:             } else {
155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156:             }
157:         }
158:     }
159: 
160:     // 发起消息拉取请求
161:     this.dispatchPullRequest(pullRequestList);
162: 
163:     return changed;
164: }
  • #rebalanceByTopic(...) 说明 :分配 Topic 的消息队列。
    • 第 21 至 40 行 :获取 Topic 对应的消息队列和消费者们,并对其进行排序。因为各 Consumer 是在本地分配消息队列,排序后才能保证各 Consumer 顺序一致。
    • 第 42 至 61 行 :根据 队列分配策略( AllocateMessageQueueStrategy ) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。
    • 第 63 至 72 行 :更新 Topic 对应的消息队列。
    • 第 3 至 19 行 :广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列。
    • 第 20 至 74 行 :集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列。
  • #updateProcessQueueTableInRebalance(...) 说明 :当分配队列时,更新 Topic 对应的消息队列,并返回是否有变更。
    • 第 132 至 135 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 137 行 :移除消息队列的消费进度。
    • 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
    • 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求
    • 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
    • 第 108 至 120 行 :队列拉取超时,即 当前时间-最后一次拉取消息时间>120s ( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。
    • 第 93 至 126 行 :移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )。
    • 第 128 至 158 行 :增加 分配的消息队列( mqSet ) 新增的消息队列。
    • 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。

RebalanceImpl#removeUnnecessaryMessageQueue(...)

RebalancePushImpl#removeUnnecessaryMessageQueue(...)

1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     // 同步队列的消费进度,并移除之。
  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
  5:     // TODO 顺序消费
  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
  8:         try {
  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 10:                 try {
 11:                     return this.unlockDelay(mq, pq);
 12:                 } finally {
 13:                     pq.getLockConsume().unlock();
 14:                 }
 15:             } else {
 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 17:                     mq, //
 18:                     pq.getTryUnlockTimes());
 19: 
 20:                 pq.incTryUnlockTimes();
 21:             }
 22:         } catch (Exception e) {
 23:             log.error("removeUnnecessaryMessageQueue Exception", e);
 24:         }
 25: 
 26:         return false;
 27:     }
 28:     return true;
 29: }
  • 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
  • 第 2 至 4 行 :同步队列的消费进度,并移除之。
  • 第 5 至 27 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。

[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)

1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
  4:     return true;
  5: }
  • 说明 :移除不需要的消息队列相关的信息,并返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(...)

1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
  2:     for (PullRequest pullRequest : pullRequestList) {
  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
  5:     }
  6: }
  • 说明 :发起消息拉取请求。该调用是 PushConsumer不断不断不断拉取消息的起点

DefaultMQPushConsumerImpl#executePullRequestImmediately(...)

1: public void executePullRequestImmediately(final PullRequest pullRequest) {
  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
  3: }
  • 说明 :提交拉取请求。提交后, PullMessageService 异步执行非阻塞。详细解析见:PullMessageService。

AllocateMessageQueueStrategy

AllocateMessageQueueAveragely

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配队列策略。
  • 第 7 至 25 行 :参数校验。
  • 第 26 至 36 行 :平均分配消息队列。
    • [0,mod)mqAll.size()/cidAll.size()+1。前面 modConsumer 平分余数,多获得 1 个消息队列。
    • [mod,cidAll.size())mqAll.size()/cidAll.size()
    • 第 27 行 : index :当前 Consumer 在消费集群里是第几个。这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因。如果不排序, Consumer 在本地计算出来的 index 无法一致,影响计算结果。
    • 第 28 行 : mod :余数,即多少消息队列无法平均分配。
    • 第 29 至 31 行 : averageSize :代码可以简化成 (mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())
    • 第 32 行 : startIndexConsumer 分配消息队列开始位置。
    • 第 33 行 : range :分配队列数量。之所以要 Math#min(...) 的原因:当 mqAll.size()<=cidAll.size() 时,最后几个 Consumer 分配不到消息队列。
    • 第 34 至 36 行 :生成分配消息队列结果。
  • 举个例子:

固定消息队列长度为4

Consumer * 2 可以整除

Consumer * 3 不可整除

Consumer * 5 无法都分配

消息队列[0]

Consumer[0]

Consumer[0]

Consumer[0]

消息队列[1]

Consumer[0]

Consumer[0]

Consumer[1]

消息队列[2]

Consumer[1]

Consumer[1]

Consumer[2]

消息队列[3]

Consumer[1]

Consumer[2]

Consumer[3]

AllocateMessageQueueByMachineRoom

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配可消费的 Broker 对应的消息队列。
  • 第 7 至 15 行 :参数校验。
  • 第 16 至 23 行 :计算可消费的 Broker 对应的消息队列。
  • 第 25 至 34 行 :平均分配消息队列。该平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是将多余的结尾部分分配给前 remConsumer
  • 疑问:使用该分配策略时, ConsumerBroker 分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。

AllocateMessageQueueAveragelyByCircle

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :环状分配消息队列。

AllocateMessageQueueByConfig

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :分配配置的消息队列。
  • 疑问 :该分配策略的使用场景。

5、PushConsumer 消费进度读取

RebalancePushImpl#computePullFromWhere(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列开始消费位置。
  • PushConsumer 读取消费进度有三种选项:
    • CONSUME_FROM_LAST_OFFSET :第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_FIRST_OFFSET :第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_TIMESTAMP :第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费

[PullConsumer] RebalancePullImpl#computePullFromWhere(...)

暂时跳过。?

6、PushConsumer 拉取消息

PullMessageService

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • #executePullRequestLater(...) :第 26 至 40 行 : 提交延迟拉取消息请求。
  • #executePullRequestImmediately(...) :第 42 至 53 行 :提交立即拉取消息请求。
  • #executeTaskLater(...) :第 55 至 63 行 :提交延迟任务
  • #pullMessage(...) :第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。
  • #run(...) :第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue ),进行消息拉取。

DefaultMQPushConsumerImpl#pullMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • #pullMessage(...) 说明 :拉取消息。
    • 执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
    • 当发起请求产生异常时,提交延迟拉取消息请求。对应 Broker 处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。
    • 第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
    • 第 9 行 :设置消息处理队列最后拉取消息时间。
    • 第 11 至 18 行 : Consumer 未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。
    • 第 20 至 25 行 : Consumer 处于暂停中,不进行消息拉取,提交延迟拉取消息请求。
    • 第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
    • 第 39 至 49 行 : Consumer并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。
    • 第 50 至 70 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 72 至 78 行 : Topic 对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。
    • 第 222 至 224 行 :判断请求是否使用 Consumer 本地的订阅信息( SubscriptionData ),而不使用 Broker 里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。
    • 第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
    • 第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
    • 第 237 至 255 行 :
  • PullCallback :拉取消息回调:
  • 第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
  • 第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息( FOUND ) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到 ConsumeMessageService。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率( pullInterval ),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。
    • 第 140 至 149 行 :没有新消息( NO_NEW_MSG ) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见: #correctTagsOffset(...)。 * 第 148 行 :提交立即拉取消息请求。
    • 第 150 至 159 行 :有新消息但是不匹配( NO_MATCHED_MSG )。逻辑同 NO_NEW_MSG
    • 第 160 至 189 行 :拉取请求的消息队列位置不合法 ( OFFSET_ILLEGAL)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为 dropped。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到 Broker。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???
  • 第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
  • #correctTagsOffset(...) :更正消费进度。
    • 第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。

PullAPIWrapper#pullKernelImpl(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明?。
  • 第 34 至 43 行 :获取 Broker 信息( Broker 地址、是否为从节点)。
    • #recalculatePullFromWhichNode(...)
    • #MQClientInstance#findBrokerAddressInSubscribe(...)
  • 第 45 至 78 行 :请求拉取消息
  • 第 81 行 :当 Broker 信息不存在,则抛出异常。

PullAPIWrapper#recalculatePullFromWhichNode(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列拉取消息对应的 Broker 编号。

MQClientInstance#findBrokerAddressInSubscribe(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :获取 Broker 信息( Broker 地址、是否为从节点)。

PullAPIWrapper#processPullResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理拉取结果。
    • 更新消息队列拉取消息 Broker 编号的映射。
    • 解析消息,并根据订阅信息消息 tagCode匹配合适消息。
  • 第 16 行 :更新消息队列拉取消息 Broker 编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker 编号,会使用更新后的 Broker 编号。
  • 第 18 至 55 行 :解析消息,并根据订阅信息消息 tagCode 匹配合适消息。
    • 第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
    • 第 24 至 35 行 :根据订阅信息 tagCode 匹配消息。
    • 第 37 至 43 行 : Hook
    • 第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
    • 第 54 行 :设置消息队列。
  • 第 58 行 :清空消息二进制数组。

ProcessQueue#putMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

总结

如果用最简单粗暴的方式描述 PullConsumer 拉取消息的过程,那就是如下的代码:

while (true) {
    if (不满足拉取消息) {
        Thread.sleep(间隔);
        continue;
    }
    主动拉取消息();
}

6、PushConsumer 消费消息

ConsumeMessageConcurrentlyService 提交消费请求

ConsumeMessageConcurrentlyService#submitConsumeRequest(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交立即消费请求。
  • 第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
  • 第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
    • 第 25 至 33 行 :计算当前拆分请求包含的消息。
    • 第 35 至 38 行 :提交拆分消费请求。
    • 第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交延迟消费请求。
  • 第 34 行 :直接调用 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。如果消息数超过批量消费上限,会不会是BUG

ConsumeRequest

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :消费请求。提交请求执行消费。
  • 第 24 至 28 行 :废弃处理队列不进行消费。
  • 第 34 至 44 行 :Hook。
  • 第 51 行 :当消息为重试消息,设置 Topic为原始 Topic。例如:原始 TopicTopicTest,重试时 Topic%RETRY%please_rename_unique_group_name_4,经过该方法, Topic 设置回 TopicTest
  • 第 53 至 58 行 :设置开始消费时间。
  • 第 61 行 :进行消费
  • 第 71 至 85 行 :解析消费返回结果类型
  • 第 87 至 90 行 : Hook
  • 第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
  • 第 101 至 106 行 : Hook
  • 第 108 至 110 行 :统计。
  • 第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。

ConsumeMessageConcurrentlyService#processConsumeResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理消费结果。
  • 第 8 至 10 行 :消费请求消息未空时,直接返回。
  • 第 12 至 32 行 :计算 ackIndex 值。 consumeRequest.msgs[0-ackIndex]为消费成功,需要进行 ack 确认。
    • 第 14 至 23 行 : CONSUME_SUCCESSackIndex=context.getAckIndex()
    • 第 24 至 29 行 : RECONSUME_LATERackIndex=-1
  • 第34 至 63 行 :处理消费失败的消息。
    • 第 43 至 52 行 :发回消费失败的消息到 Broker。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。
    • 第 54 至 59 行 :发回 Broker 失败的消息,直接提交延迟重新消费。
    • 如果发回 Broker 成功,结果因为例如网络异常,导致 Consumer以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。
    • 第 36 至 41 行 : BROADCASTING :广播模式,无论是否消费失败,不发回消息到 Broker,只打印日志。
    • 第 42 至 60 行 : CLUSTERING :集群模式,消费失败的消息发回到 Broker
  • 第 65 至 69 行 :移除【消费成功】【消费失败但发回 Broker成功】的消息,并更新最新消费进度。
    • 为什么会有【消费失败但发回 Broker成功】的消息?见第 56 行
    • ProcessQueue#removeMessage(...)

ProcessQueue#removeMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

ConsumeMessageConcurrentlyService#cleanExpireMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时清理过期消息,默认周期:15min。

ProcessQueue#cleanExpiredMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :移除过期消息。
  • 第 2 至 5 行 :顺序消费时,直接返回。
  • 第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
  • 第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
  • 第 29 行 :发回超时消息到 Broker
  • 第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。

7、PushConsumer 发回消费失败消息

DefaultMQPushConsumerImpl#sendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :发回消息。
  • 第 4 至 8 行 : Consumer 发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。
  • 第 10 至 25 行 :发生异常时, Consumer 内置默认 Producer 发送消息。
    • ?疑问:什么样的情况下会发生异常呢?

MQClientAPIImpl#consumerSendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

8、Consumer 消费进度

OffsetStore

  • RemoteBrokerOffsetStoreConsumer 集群模式 下,使用远程 Broker 消费进度。
  • LocalFileOffsetStoreConsumer 广播模式下,使用本地 文件 消费进度。

OffsetStore#load(...)

LocalFileOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :从本地文件加载消费进度到内存。
OffsetSerializeWrapper

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :本地 Offset 存储序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
    "offsetTable":{{
            "brokerName":"broker-a",
            "queueId":3,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":2,
            "topic":"TopicTest"
        }:1471,{
            "brokerName":"broker-a",
            "queueId":1,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":0,
            "topic":"TopicTest"
        }:1470
    }
}

RemoteBrokerOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :不进行加载,实际读取消费进度时,从 Broker 获取。

OffsetStore#readOffset(...)

读取消费进度类型:

  • READ_FROM_MEMORY :从内存读取。
  • READ_FROM_STORE :从存储( Broker文件 )读取。
  • MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取。

LocalFileOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 文件 读取消费进度。

RemoteBrokerOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 Broker 读取消费进度。

OffsetStore#updateOffset(...)

该方法 RemoteBrokerOffsetStoreLocalFileOffsetStore 实现相同。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

OffsetStore#persistAll(...)

LocalFileOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化消费进度。将消费进度写入文件

RemoteBrokerOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。

MQClientInstance#persistAllConsumerOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时进行持久化,默认周期:5000ms。
  • 重要说明 :
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

9、结尾

?可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。 感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。?真的有丢丢长。

本文转载自「芋道源码」 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/