RocketMQ详解(6)——Producer详解
时间:2022-07-24
本文章向大家介绍RocketMQ详解(6)——Producer详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
RocketMQ详解(6)——Producer详解
一. Producer的特性
- 消息过滤 对于Producer,可以对单个主题发送消息,也可以对多个主题发送消息,这种设计非常灵活。而且,可以通过Tag定义一些简单的过滤,通常已经可以满足我们90%的需求了。对于一些更复杂的过滤场景,可以使用Filter实现。
- Producer的模式
RocketMQ提供了三种不同模式的Producer:
- 普通模式:NormalProducer 这种模式自不必说,使用传统的send()方法发送消息即可。这种模式下不能保证消息的顺序一致性。
- 顺序模式:OrderProducer 这种模式可以保证消息的严格顺序消费。如果想要实现全局顺序,可以将消息发往同一个Queue;如果要保证局部顺序,则可以发往多个Queue。
- 事务模式:TransactionProducer
支持以事务的方式对消息进行提交处理,在RocketMQ中事务消息分为两个阶段:
- 第一个阶段将消息预发送给Broker,此时消息已经在队列中了,但是消费端不可见。
- 第二个阶段为本地消息回调处理,如果事务处理成功,返回COMMIT_MESSAGE,将消息正式发送且消费端可见;处理失败则返回ROLLBACK_MESSAGE,此消息直接丢弃。
二. DefaultMQProducer——普通生产者
DefaultMQProducer是一个默认的消息生产者,可以支持发送普通消息和顺序消息。
DefaultMQProducer中定义了一些发送消息相关的属性,还提供了发送消息的相关方法,可以支持同步发送和异步发送,可以发往Broker,由Broker决定具体发往的Queue,也可以指定发往的Queue。
下面简单结合源码,对其API进行介绍:
//生产者组
private String producerGroup;
//创建Topic时的topicKey,在测试时可指定Broker自增模式
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
//每个Topic中默认有4个Queue来存储消息
private volatile int defaultTopicQueueNums = 4;
//默认发送超时3000ms
private int sendMsgTimeout = 3000;
//默认情况下,当消息体字节数超过4k时,消息会被压缩(Consumer收到消息会自动解压缩)
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步发送消息时,消息发送失败后的最大重试次数。RocketMQ在消息重试机制上有很好的支持,但是重试可能会引起重复消息的问题,这需要在逻辑上进行幂等处理。
private int retryTimesWhenSendFailed = 2;
//异步发送时的最大重试次数,类似retryTimesWhenSendFailed
private int retryTimesWhenSendAsyncFailed = 2;
//如果消息发送成功,但是返回SendResult != SendStatus.SEND_OK,是否尝试发往别的Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//默认最大消息长度:4M,当消息长度超过限制时,RocketMQ会自动抛出异常
private int maxMessageSize = 1024 * 1024 * 4;
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
public DefaultMQProducer(RPCHook rpcHook) {
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}
//启动方法,发送消息前必须调用
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
//关闭方法,释放资源
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
}
//同步发送消息,该方法会阻塞直到消息发送成功
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
//同步发送消息,指定超时时间
@Override
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
//异步发送消息,并注册发送回调。该方法会立即返回,当发送完成后,Broker会回调SendCallback通知发送结果
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
//异步发送消息,并指定超时时间
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
//同步发送消息,发往指定队列
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
//异步发送消息到指定队列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
//创建一个Topic。RocketMQ可支持Topic的自动创建,也可手动调用createTopic方法创建。创建Topic时需指定Topic的key,测试时可使用Broker自增key的方式,但是实际生产情况下应使用具有业务意义的key。创建时还可以指定Topic的name、Topic中Queue的数量等。
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 关于MySQL server has gone away
- PyQt5 技术篇-在clipboard.dataChanged.connect()里如何写入剪切板示例演示,pyqt5监听剪切板变动并写入剪切板内容
- 去除WordPress链接中出现的index.php
- MySQL 语法问题:You can‘t specify target table ‘xxx‘ for update in FROM clause. 原因及解决方法
- 配置 prometheus-operator 报警规则
- SQL语句查询出的数据进行字符串拼接,oracle批量删除数据库用户实例演示
- prometheus-operator 监控 k8s 外部集群
- Python+selenium 自动化-操作已启用的chrome浏览器实例演示,chrome启用调试端口方法
- JavaScript 技术篇-js检测原生对象类型实例演示,js的3种对象类型
- Python 技术篇-利用pyqt5库读取剪切板已复制数据的格式类型实例演示,python判断复制文件的文件类型
- 第36期:二叉树的遍历(小白必看)
- Python 技术篇-index()字符串倒叙匹配获取索引,字符串切片反向输出,逆向输出字符串
- JavaScript 技术篇-navigator.permissions读取chrome剪切板权限获取不生效原因:只有在https协议下使用有效。手动设置chrome页面剪切板读取权限方法
- 生产prometheus-operator 监控二进制kubernetes
- Excel 技术篇-跨页签统计某一区域下符合条件值的数量方法,COUNTIF函数、数量统计公式的用法实例演示