RocketMQ详解(4)——入门程序

时间:2022-07-24
本文章向大家介绍RocketMQ详解(4)——入门程序,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

RocketMQ详解(4)——入门程序

本节演示使用SpringBoot整合RocketMQ的入门程序,包括消息的生产端和消费端两个工程。

开发环境:JDK1.8、IntelliJ IDEA、Maven3.5.3、SpringBoot2.0.3、RocketMQ 4.3.0

首先,引入RocketMQ的客户端依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.3.RELEASE</version>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <rocketmq.version>4.3.0</rocketmq.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <!--RocketMQ依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>

</dependencies>

配置RocketMQ nameserver的地址:

spring:
  rocketmq:
    namesrvAddr: localhost:9876

一. 消息生产者

  1. 启动类
package william.rmq.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:44
* @Description:RocketMQ生产端启动类
*/
@SpringBootApplication
public class RocketMQProducerApplication {
   public static void main(String[] args) {
       SpringApplication.run(RocketMQProducerApplication.class);
   }
}
  1. 消息生产者
package william.rmq.producer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 10:58
* @Description:RocketMQ消息生产者
*/
@Service
@Slf4j
public class MessageProducer {
   @Value("${spring.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");

   @PostConstruct
   public void start(){
       try {
           producer.setNamesrvAddr(namesrvAddr);
           producer.start();
           log.info("Message Producer Start...");
           System.err.println("Message Producer Start...");
       }catch (Exception e){
           log.error("Message Producer Start Error!!",e);
       }
   }

   public void sendMessage(String data, String topic, String tags, String keys) {
       try {
           byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);

           Message mqMsg = new Message(topic, tags, keys, messageBody);

           producer.send(mqMsg, new SendCallback() {
               @Override
               public void onSuccess(SendResult sendResult) {
                   log.info("Message Producer: Send Message Success {}", sendResult);
                   System.err.println("Message Producer: Send Message Success {}" + sendResult);
               }

               @Override
               public void onException(Throwable throwable) {
                   log.error("Message Producer: Send Message Error ", throwable);
                   System.err.println("Message Producer: Send Message Error " + throwable);
               }
           });
       } catch (Exception e) {
           log.error("Message Producer: Send Message Error ", e);
       }

   }
}
  1. 首先,创建了一个DefaultMQProducer实例,并指定producerGroup消费者组。RocketMQ中的每个Producer都要属于一个producerGroup。接着,设置连接到的nameserver的地址,然后调用DefaultMQProducer的start()方法启动消费者。 在Producer启动后,创建RocketMQ的消息。消息需要知道要发往的队列topic,消息标签tags,消息标识keys和消息内容。其中,topic是一个逻辑上的概念,标识一个可发布和订阅的主题,下面会包含一个或多个Queue来实际存储消息。tags可指定消息的标签属性,可以用来进行消息的过滤。keys可以用来识别同一个topic下的不同消息。 创建好消息后,直接调用DefaultMQProducer的send()方法,就可以将消息发送到Broker上。该方法还可以设置一个SendCallback回调,来处理消息发送成功和失败的情况。

二. 消息消费者

  1. 启动类
package william.rmq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:47
* @Description:RocketMQ消费端启动类
*/
@SpringBootApplication
public class RocketMQConsumerApplication {
   public static void main(String[] args) {
       SpringApplication.run(RocketMQConsumerApplication.class);
   }
}
  1. 消息消费者
package william.rmq.consumer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
   @Value("${spring.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");

   @PostConstruct
   public void start() {
       try {
           //指定nameserver地址
           consumer.setNamesrvAddr(namesrvAddr);

           //从消息队列头部开始消费
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

           //集群消费模式
           consumer.setMessageModel(MessageModel.CLUSTERING);

           //订阅主题
           consumer.subscribe("DefaultCluster", "*");

           //注册消息监听器
           consumer.registerMessageListener(this);

           //启动消费端
           consumer.start();

           log.info("Message Consumer Start...");
           System.err.println("Message Consumer Start...");
       } catch (MQClientException e) {
           log.error("Message Consumer Start Error!!",e);
       }

   }

   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       if (CollectionUtils.isEmpty(msgs)){
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
       msgs.stream()
           .forEach(msg -> {
               try {
                   String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                   log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}"
                           , msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
                   System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
               } catch (Exception e) {
                   log.error("Consume Message Error!!", e);
               }
           });
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }

}
  1. 首先,创建DefaultMQPushConsumer对象,与Producer相同,每一个Consumer也必须属于一个ConsumerGroup。本例中使用了DefaultMQPushConsumer,顾名思义,该类型的消费者属于“推消息”模式,当消费者将消息发送到订阅的Topic后,会自动回调消息监听器的方法消费消息,而不需要消费者手动拉取消息消费。相反,另一种类型DefaultMQPullConsumer则需要消费者调用pull()方法从Topic上拉取消息才能进行消费。 类似Producer,DefaultMQPushConsumer也需要设置nameserver的地址。然后指定消费的位置:从队列的头部消费或从尾部消费。接下来设置消费模式。本例采用集群消费,同一个ConsumerGroup的不同消费者会以负载均衡的方式分摊消息。在Push模式下,消费者需要注册一个消息监听器以处理消息。本例中MessageConsumer实现了MessageListenerConcurrently接口,可将自身实例注册为一个Listener,在consumeMessage()回调中处理消费消息的逻辑。 最后,调用start()方法启动消费者。 三. 发送测试
package william.rmq.producer.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import william.rmq.producer.RocketMQProducerApplication;
import william.rmq.producer.quickstart.MessageProducer;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 12:39
* @Description:
*/
@SpringBootTest(classes = RocketMQProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestSendMessage {
   @Autowired
   private MessageProducer producer;

   @Test
   public void testSendMessage(){
       String message = "Message-";
       String topic = "DefaultCluster";
       String tags = "Tags";
       String keys = "Keys-";

       for (int i = 1;i <= 5;i++){
           producer.sendMessage(message + i,topic,tags,keys + i);
       }
   }
}
  1. 分别启动生产者、消费者工程,运行发送测试,可以看到控制台输出如下,说明消息生产和消费正常。 生产端:
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70001, offsetMsgId=C0A81FFA00002A9F0000000000001064, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=11], queueOffset=1]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECED0003, offsetMsgId=C0A81FFA00002A9F0000000000000EEC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=9], queueOffset=0]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70000, offsetMsgId=C0A81FFA00002A9F0000000000000FA8, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=4], queueOffset=3]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE80002, offsetMsgId=C0A81FFA00002A9F0000000000001120, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=1], queueOffset=2]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECF50004, offsetMsgId=C0A81FFA00002A9F00000000000011DC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=5], queueOffset=3]
  1. 消费端:
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70001,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECF50004,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE80002,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70000,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECED0003,topic: DefaultCluster,tags: Tags