RocketMQ详解(10)——Consumer详解

时间:2022-07-24
本文章向大家介绍RocketMQ详解(10)——Consumer详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

RocketMQ详解(10)——消费模式详解

一. 不同类型的消费者

根据使用者对读取操作的控制情况,消费在可以分为两种类型:

  1. DefaultMQPushConsumer:有系统控制读取操作,收到消息后自动调用监听器回调处理。
  2. DefaultMQPullConsumer:读取操作中的大部分功能由使用者自主控制。

二. DefaultMQPushConsumer的使用

  1. 使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的回调方法。系统收到消息后会自动调用回调方法来处理消息,自动保存Offset,并且加入新的DefaultMQPushConsumer后会自动做负载均衡。
  2. 示例代码
package william.rmq.consumer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
   @Value("${spring.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");

   @PostConstruct
   public void start() {
       try {
           consumer.setNamesrvAddr(namesrvAddr);

           //从消息队列头部开始消费
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

           //设置广播消费模式
           consumer.setMessageModel(MessageModel.BROADCASTING);

           //订阅主题
           consumer.subscribe("DefaultCluster", "*");

           //注册消息监听器
           consumer.registerMessageListener(this);

           //启动消费端
           consumer.start();

           log.info("Message Consumer Start...");
           System.err.println("Message Consumer Start...");
       } catch (MQClientException e) {
           log.error("Message Consumer Start Error!!",e);
       }

   }

   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       if (CollectionUtils.isEmpty(msgs)){
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
       msgs.stream()
           .forEach(msg -> {
               try {
                   String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                   log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}"
                           , msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
                   System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
               } catch (Exception e) {
                   log.error("Consume Message Error!!", e);
               }
           });
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }

}
  1. DefaultMQPushConsumer需要设置三个参数:
    1. 这个Consumer所属的ConsumerGroup
    2. NameServer的IP和端口
    3. 订阅的Topic名称

    下面对这几个参数进行详细介绍:

  2. ConsumerGroup用于把多个Consumer组织到一起,提高并发处理能力,ConsumerGroup需要和消息模式MessageModel配合使用。 RocketMQ支持两种消息模式:
    1. MessageModel.CLUSTERING——集群模式:同一个ConsumerGroup里面的每个Consumer只消费所订阅的消息的一部分内容,同一个ConsumerGroup下所有Consumer消费的内容合起来才是所订阅的Topic内容的整体,从而达到负载均衡的目的。
    2. MessageModel.BROADCASTING——广播模式:同一个ConsumerGroup下的每个Consumer都能消费到所订阅Topic的所有消息,也就是一个消息会被多次分发,被多个Consumer消费。
  3. NameServer的ip和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,如”ip1:port1;ip2:port2”
  4. Topic名称用来标识消息类型,需要提前创建。如果不想消费某个Topic下的所有消息,可以通过指定Tag进行消息过滤,如Consumer.subscribe(“TopicTest”,”tags1 || tag2 || tag3”),表示这个Consumer要消费TopicTest主体下的带有tag1或tag2或tag3的消息(Tag指的是在发送消息时设置的标签)。在设置Tag参数时,用null或者”*”表示要消费这个Topic下的所有消息。

三. DefaultMQPushConsumer的处理流程

本节结合源码分析DefaultMQPushConsumer的处理流程。

DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl中,消息处理逻辑是在pullMessage()方法的PullCallback回调中。在PullCallback回调中有个switch语句,根据Broker返回的消息类型做响应的处理,具体逻辑看源码:

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                         subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                       pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                                   DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case NO_MATCHED_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal, {} {}",
                             pullRequest.toString(), pullResult.toString());
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                                        pullRequest.getNextOffset(), false);

                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);
                            } catch (Throwable e) {
                                log.error("executeTaskLater Exception", e);
                            }
                        }
                    }, 10000);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
};

PullCallback是一个消息拉取回调,Consumer从Broker拉取消息会,会根据拉取状态回调对应的onSuccess或onException方法。在onSuccess()的处理中,会根据不同的PullStatus进行不同的处理,PullStatus的状态有:

  1. PullStatus.FOUND:成功拉取消息
  2. PullStatus.NO_NEW_MSG:没有新的消息可被拉取
  3. PullStatus.NO_MATCHED_MSG:过滤结果不匹配
  4. PullStatus.OFFSET_ILLEGAL:offset非法

DefaultMQPushConsumer中有很多PullRequest的方法,如executePullRequestImmediately(),之所以在PushConsumer中也使用PullRequest的方式拉取消息,是因为RocketMQ通过长轮询的方式来实现Push和Pull两种模式,长轮询可以即有Pull的优点,又兼具Push的实时性。

Push方式是Broker端接收到消息后,主动把消息推给Consumer端,实时性高。对于一个提供消息队列服务的Server来说,用Push方式会有很多弊端:首先是消息的流量难以控制,当Push的消息过多时会加大Server的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,且Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,在造成消息堆积等各种潜在的问题。

Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量的消息后,妥当处理完毕再继续拉取。Pull方式的问题是循环拉取的时间间隔不好设定,间隔太短就会处于”忙等”的状态,浪费资源;间隔太长又可能导致Server端有消息到来时没有及时被处理。

“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又保证实时性的目的。

“长轮询”的核心是,Broker端HOLD住客户端的请求一小段时间,如果在这段时间内有消息到达,就利用现有的链接立刻返回消息给Consumer。”长轮询”的主动权还是掌握在Consumer手上,即使Broker有大量的消息积压,也不会主动推送给Consumer。

长轮询方式的局限性在于,HOLD住Consumer端请求时,需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。

四. DefaultMQPullConsumer的处理流程

  1. 使用DefaultMQPullConsumer和使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的回调方法。此外,还需要进行额外的处理。下面给出使用的示例代码:
package william.rmq.consumer.pull;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/15 09:25
* @Description:使用DefaultMQPullConsumer拉取消息
*/
@Service
@Slf4j
public class PullMessageConsumer {
   /**记录每个MessageQueue的消费位点offset,可以持久化到DB或缓存Redis,这里作为演示就保存在程序中*/
   private static final Map<MessageQueue,Long> OFFSET_TABLE = new ConcurrentHashMap<>();

   @Value("${spring.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   /**使用DefaultMQPullConsumer实现拉取消息模式*/
   private DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("DefaultPullConsumer");

   /**每次拉取消息的最大数量*/
   private static final int MAX_PULL_SIZE_EACH_TIME = 32;

   @PostConstruct
   public void start() {
       try {
           //设置namesrv地址
           consumer.setNamesrvAddr(namesrvAddr);

           //启动消费端
           consumer.start();
           System.err.println("Order Message Consumer Start...");

           //从指定的Topic pull消息
           Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(RocketMQConstant.TEST_TOPIC_NAME);

           //遍历MessageQueue,获取Message
           for (MessageQueue messageQueue : messageQueues){
               //获取该MessageQueue的消费位点
               long offset = consumer.fetchConsumeOffset(messageQueue, true);
               System.err.println("Consumer From Queue: " + messageQueue + ",from offset: " + offset);

               while (true){
                   try {
                       //拉取消息
                       PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), MAX_PULL_SIZE_EACH_TIME);
                       System.err.println("Pull Message Result: " + pullResult);

                       //记录offset
                       saveMessageQueueOffset(messageQueue,pullResult.getNextBeginOffset());
                       switch (pullResult.getPullStatus()){
                           //拉取到消息
                           case FOUND: break;

                           //没有匹配的消息
                           case NO_MATCHED_MSG: break;

                           //暂时没有新消息
                           case NO_NEW_MSG: continue;

                           //offset非法
                           case OFFSET_ILLEGAL: break;

                           default: break;
                       }
                   }
                   catch (Exception e){
                       log.error("Pull Message Error!!",e);
                   }
               }
           }

           //关闭Consumer
           consumer.shutdown();

       } catch (Exception e) {
           throw new RuntimeException(e);
       }

   }

   private long getMessageQueueOffset(MessageQueue messageQueue){
       Long offset = OFFSET_TABLE.get(messageQueue);
       return (offset != null ? offset : 0);
   }

   private void saveMessageQueueOffset(MessageQueue messageQueue,long offset){
       OFFSET_TABLE.put(messageQueue,offset);
   }
}

分别启动生产端和消费端程序,可看到消费端控制台打印如下:

Order Message Consumer Start...
Consumer From Queue: MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=0],from offset: 0
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=57, msgFoundList=32]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=25]
Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=58, minOffset=0, maxOffset=58, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=59, minOffset=0, maxOffset=59, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=60, minOffset=0, maxOffset=60, msgFoundList=1]
Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=61, minOffset=0, maxOffset=61, msgFoundList=1]
  1. 示例代码的处理逻辑是遍历指定Topic下的所有MessageQueue,然后从中pull消息,并记录消费的offset。主要包括下面三件事:
    1. 获取MessageQueue并遍历 一个Topic包含多个MessageQueue,如果这个Consumer需要获取Topic下的所有消息,就要遍历所有的MessageQueue。如果有特殊情况,也可以选择某些指定的MessageQueue来消费。
    2. 维护Offset 从一个MessageQueue中拉取消息时,要传入Offset参数,随着不断读取消息,offset不断增加,这个时候需要用户把offset存储下来,根据具体情况可以保存在内存中、写到磁盘或数据库等。
    3. 根据不同的拉取状态做不同的处理 拉取消息的请求发出后,会返回FOUND、NO_NEW_MSG、NO_MATCHED_MSG和OFFSET_ILLEGAL四种状态。需要根据每个状态做不同的处理。比较重要的两个状态是FOUND和NO_NEW_MSG,分别表示拉取到新的消息和没有新的消息。

    五. Consumer的启动、关闭流程

    1. Consumer分为Push和Pull两种模式,对于DefaultMQPullConsumer来说,使用者主动权很高,可以根据实际需要启动、暂停和停止消费过程。需要特别注意的是offset的保存,要在程序的异常处理部分考虑把offset写入磁盘的处理,准确记录每个MessageQueue消费的offset,才能保证消费的准确性。
    2. DefaultMQPushConsumer的退出,要显式调用shutdown()方法,以便释放资源、保存offset等。这个调用要加到Consumer所在应用的退出逻辑中。DefaultMQPushConsumer在启动时,会检查各种配置,然后连接NameServer获取Topic信息。如果启动时出现异常,如无法连接NameServer,程序仍然可以正常启动不报错(日志会打印Warn信息)。在单机情况下,可以故意写错NameServer的地址模拟这种异常。
    3. 之所以DefaultMQPushConsumer在无法连接NameServer时仍然能正常启动,是考虑到分布式系统的设计。RocketMQ集群可以有多个NameServer和Broker,某个节点出现异常后整个集群仍然可用。所以DefaultMQPushConsumer在出现连接异常时不是立刻退出,而是不断尝试重连。