RocketMQ详解(12)——RocketMQ的重试机制
时间:2022-07-24
本文章向大家介绍RocketMQ详解(12)——RocketMQ的重试机制,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
RocketMQ详解(12)——RocketMQ的重试机制
一. MQ的重试机制
由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。
RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。
二. 生产端重试
- 如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
- 相关API DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:
//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
//同步发送消息,并指定超时时间
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
因此,实现生产端的重试十分简单,例如下面的代码可以设置Producer如果在5s内没有发送成功,则重试5次:
//同步发送消息,如果5秒内没有发送成功,则重试5次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);
三. 消费端的重试
- 消费状态: 消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:
public enum ConsumeConcurrentlyStatus {
//消费成功
ConsumeConcurrentlyStatus,
//消费失败,一段时间后重试
RECONSUME_LATER;
}
- Consumer端的重试包括两种情况
- 异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
- 超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。
因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态。 下面分别演示两种重试。
- 异常重试 RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。如下:
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
//模拟业务异常
int i = 1 / 0;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
//抛出异常时,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,尝试重试。当重试指定次数后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
int reconsumeTimes = message.getReconsumeTimes();
System.err.println("Now Retry Times: " + reconsumeTimes);
if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
可以看到控制台打印如下:
Now Retry Times: 3
Message Consumer: Handle New Message: messageId: 0A0E096CA14618B4AAC2562C6D5B0000,topic: DefaultCluster,tags: Tags,messageBody: Message-1
Now Retry Times: 3
Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C32C0007,topic: DefaultCluster,tags: Tags,messageBody: Order-2-完成
Now Retry Times: 3
Now Retry Times: 3
Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C3290006,topic: DefaultCluster,tags: Tags,messageBody: Order-2-支付
Now Retry Times: 3
Now Retry Times: 3
- 消息重试指定的次数后,就返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS不再重试了。 注:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。
- 超时重试 当Consumer处理时间过长,在超时时间内没有返回给Broker消费状态,那么Broker也会自动重试
package william.rmq.consumer.quickstart;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费超时时间(分钟)
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
//订阅主题
consumer.subscribe("DefaultCluster", "*");
//注册消息监听器
consumer.registerMessageListener(this);
//启动消费端
consumer.start();
log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
//模拟耗时操作2分钟,大于设置的消费超时时间
Thread.sleep(1000L * 60 * 2);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
四. 消息的幂等去重
由于MQ的重试机制,难免会引起消息的重复消费问题。比如一个ConsumerGroup中有两个,Consumer1和Consumer2,以集群方式消费。假设一条消息发往ConsumerGroup,由Consumer1消费,但是由于Consumer1消费过慢导致超时,次数Broker会将消息发送给Consumer2去消费,这样就产生了重复消费问题。因此,使用MQ时应该对一些关键消息进行幂等去重的处理。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android 中WallpaperManager用法实例
- Android实现屏幕各尺寸的获取的示例
- Android 中ContentProvider的实例详解
- Android Intent调用 Uri的方法总结
- Android 调用发送短信的方法
- Android 开发之Dialog中隐藏键盘的正确使用方法
- Android基于HttpUrlConnection类的文件下载实例代码
- Android ListView之setEmptyView正确使用方法
- 腾讯云Linux服务器安装Mysql8并实现远程访问
- Android实现动态改变app图标的示例代码
- 浅谈Android 的线程和线程池的使用
- 点餐系统的部署,Java点餐系统部署到腾讯云Linux服务器
- 详解Android 中AsyncTask 的使用
- 解决Android应用冷启动时出现的白屏问题的方法
- Java点餐系统+扫码点餐小程序部署文档(2020版)