RocketMQ详解(4)——入门程序
时间:2022-07-24
本文章向大家介绍RocketMQ详解(4)——入门程序,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
RocketMQ详解(4)——入门程序
本节演示使用SpringBoot整合RocketMQ的入门程序,包括消息的生产端和消费端两个工程。
开发环境:JDK1.8、IntelliJ IDEA、Maven3.5.3、SpringBoot2.0.3、RocketMQ 4.3.0
首先,引入RocketMQ的客户端依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<rocketmq.version>4.3.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RocketMQ依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
配置RocketMQ nameserver的地址:
spring:
rocketmq:
namesrvAddr: localhost:9876
一. 消息生产者
- 启动类
package william.rmq.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:44
* @Description:RocketMQ生产端启动类
*/
@SpringBootApplication
public class RocketMQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProducerApplication.class);
}
}
- 消息生产者
package william.rmq.producer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 10:58
* @Description:RocketMQ消息生产者
*/
@Service
@Slf4j
public class MessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
@PostConstruct
public void start(){
try {
producer.setNamesrvAddr(namesrvAddr);
producer.start();
log.info("Message Producer Start...");
System.err.println("Message Producer Start...");
}catch (Exception e){
log.error("Message Producer Start Error!!",e);
}
}
public void sendMessage(String data, String topic, String tags, String keys) {
try {
byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message mqMsg = new Message(topic, tags, keys, messageBody);
producer.send(mqMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Message Producer: Send Message Success {}", sendResult);
System.err.println("Message Producer: Send Message Success {}" + sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("Message Producer: Send Message Error ", throwable);
System.err.println("Message Producer: Send Message Error " + throwable);
}
});
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}
}
}
- 首先,创建了一个DefaultMQProducer实例,并指定producerGroup消费者组。RocketMQ中的每个Producer都要属于一个producerGroup。接着,设置连接到的nameserver的地址,然后调用DefaultMQProducer的start()方法启动消费者。 在Producer启动后,创建RocketMQ的消息。消息需要知道要发往的队列topic,消息标签tags,消息标识keys和消息内容。其中,topic是一个逻辑上的概念,标识一个可发布和订阅的主题,下面会包含一个或多个Queue来实际存储消息。tags可指定消息的标签属性,可以用来进行消息的过滤。keys可以用来识别同一个topic下的不同消息。 创建好消息后,直接调用DefaultMQProducer的send()方法,就可以将消息发送到Broker上。该方法还可以设置一个SendCallback回调,来处理消息发送成功和失败的情况。
二. 消息消费者
- 启动类
package william.rmq.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:47
* @Description:RocketMQ消费端启动类
*/
@SpringBootApplication
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class);
}
}
- 消息消费者
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
//指定nameserver地址
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}"
, msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
} catch (Exception e) {
log.error("Consume Message Error!!", e);
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
- 首先,创建DefaultMQPushConsumer对象,与Producer相同,每一个Consumer也必须属于一个ConsumerGroup。本例中使用了DefaultMQPushConsumer,顾名思义,该类型的消费者属于“推消息”模式,当消费者将消息发送到订阅的Topic后,会自动回调消息监听器的方法消费消息,而不需要消费者手动拉取消息消费。相反,另一种类型DefaultMQPullConsumer则需要消费者调用pull()方法从Topic上拉取消息才能进行消费。 类似Producer,DefaultMQPushConsumer也需要设置nameserver的地址。然后指定消费的位置:从队列的头部消费或从尾部消费。接下来设置消费模式。本例采用集群消费,同一个ConsumerGroup的不同消费者会以负载均衡的方式分摊消息。在Push模式下,消费者需要注册一个消息监听器以处理消息。本例中MessageConsumer实现了MessageListenerConcurrently接口,可将自身实例注册为一个Listener,在consumeMessage()回调中处理消费消息的逻辑。 最后,调用start()方法启动消费者。 三. 发送测试
package william.rmq.producer.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import william.rmq.producer.RocketMQProducerApplication;
import william.rmq.producer.quickstart.MessageProducer;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 12:39
* @Description:
*/
@SpringBootTest(classes = RocketMQProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestSendMessage {
@Autowired
private MessageProducer producer;
@Test
public void testSendMessage(){
String message = "Message-";
String topic = "DefaultCluster";
String tags = "Tags";
String keys = "Keys-";
for (int i = 1;i <= 5;i++){
producer.sendMessage(message + i,topic,tags,keys + i);
}
}
}
- 分别启动生产者、消费者工程,运行发送测试,可以看到控制台输出如下,说明消息生产和消费正常。 生产端:
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70001, offsetMsgId=C0A81FFA00002A9F0000000000001064, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=11], queueOffset=1]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECED0003, offsetMsgId=C0A81FFA00002A9F0000000000000EEC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=9], queueOffset=0]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE70000, offsetMsgId=C0A81FFA00002A9F0000000000000FA8, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=4], queueOffset=3]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECE80002, offsetMsgId=C0A81FFA00002A9F0000000000001120, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=1], queueOffset=2]
Message Producer: Send Message Success {}SendResult [sendStatus=SEND_OK, msgId=C0A81FFA05C118B4AAC223DDECF50004, offsetMsgId=C0A81FFA00002A9F00000000000011DC, messageQueue=MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=5], queueOffset=3]
- 消费端:
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70001,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECF50004,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE80002,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECE70000,topic: DefaultCluster,tags: Tags
Message Consumer: Handle New Message: messageId: C0A81FFA05C118B4AAC223DDECED0003,topic: DefaultCluster,tags: Tags
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- DTS(数据库传输服务)
- R语言关联挖掘实例(购物篮分析)
- 用R语言中的神经网络预测时间序列:多层感知器和极限学习机
- 基于R语言股票市场收益的统计可视化分析
- Mac系统R语言升级后无法加载包报错 package or namespace load failed in dyn.load
- 如何从xml文件创建R语言数据框dataframe
- Matlab马尔可夫链蒙特卡罗法(MCMC)估计随机波动率(SV) 模型
- 如何从xml文件创建R语言数据框dataframe
- R语言POT超阈值模型和极值理论EVT分析
- R语言使用灰色关联分析(Grey Relation Analysis,GRA)中国经济社会发展指标
- R语言中的模拟过程和离散化:泊松过程和维纳过程
- R语言Lee-Carter模型对年死亡率建模预测预期寿命
- R语言有极值(EVT)依赖结构的马尔可夫链(MC)对洪水极值分析
- RxSwift 封装 CoreBluetooth(一) 配置
- Golang 操作Excel文件