RocketMQ详解(6)——Producer详解

时间:2022-07-24
本文章向大家介绍RocketMQ详解(6)——Producer详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

RocketMQ详解(6)——Producer详解

一. Producer的特性

  1. 消息过滤 对于Producer,可以对单个主题发送消息,也可以对多个主题发送消息,这种设计非常灵活。而且,可以通过Tag定义一些简单的过滤,通常已经可以满足我们90%的需求了。对于一些更复杂的过滤场景,可以使用Filter实现。
  2. Producer的模式 RocketMQ提供了三种不同模式的Producer:
    1. 普通模式:NormalProducer 这种模式自不必说,使用传统的send()方法发送消息即可。这种模式下不能保证消息的顺序一致性。
    2. 顺序模式:OrderProducer 这种模式可以保证消息的严格顺序消费。如果想要实现全局顺序,可以将消息发往同一个Queue;如果要保证局部顺序,则可以发往多个Queue。
    3. 事务模式:TransactionProducer 支持以事务的方式对消息进行提交处理,在RocketMQ中事务消息分为两个阶段:
      1. 第一个阶段将消息预发送给Broker,此时消息已经在队列中了,但是消费端不可见。
      2. 第二个阶段为本地消息回调处理,如果事务处理成功,返回COMMIT_MESSAGE,将消息正式发送且消费端可见;处理失败则返回ROLLBACK_MESSAGE,此消息直接丢弃。

二. DefaultMQProducer——普通生产者

DefaultMQProducer是一个默认的消息生产者,可以支持发送普通消息和顺序消息。

DefaultMQProducer中定义了一些发送消息相关的属性,还提供了发送消息的相关方法,可以支持同步发送和异步发送,可以发往Broker,由Broker决定具体发往的Queue,也可以指定发往的Queue。

下面简单结合源码,对其API进行介绍:

//生产者组
private String producerGroup;

//创建Topic时的topicKey,在测试时可指定Broker自增模式
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

//每个Topic中默认有4个Queue来存储消息
private volatile int defaultTopicQueueNums = 4;

//默认发送超时3000ms
private int sendMsgTimeout = 3000;

//默认情况下,当消息体字节数超过4k时,消息会被压缩(Consumer收到消息会自动解压缩)
private int compressMsgBodyOverHowmuch = 1024 * 4;

//同步发送消息时,消息发送失败后的最大重试次数。RocketMQ在消息重试机制上有很好的支持,但是重试可能会引起重复消息的问题,这需要在逻辑上进行幂等处理。
private int retryTimesWhenSendFailed = 2;

//异步发送时的最大重试次数,类似retryTimesWhenSendFailed
private int retryTimesWhenSendAsyncFailed = 2;

//如果消息发送成功,但是返回SendResult != SendStatus.SEND_OK,是否尝试发往别的Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;

//默认最大消息长度:4M,当消息长度超过限制时,RocketMQ会自动抛出异常
private int maxMessageSize = 1024 * 1024 * 4; 

public DefaultMQProducer() {
    this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

public DefaultMQProducer(final String producerGroup) {
    this(producerGroup, null);
}

public DefaultMQProducer(RPCHook rpcHook) {
    this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}

//启动方法,发送消息前必须调用
@Override
public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

//关闭方法,释放资源
@Override
public void shutdown() {
    this.defaultMQProducerImpl.shutdown();
}


//同步发送消息,该方法会阻塞直到消息发送成功
@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}

//同步发送消息,指定超时时间
@Override
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, timeout);
}

//异步发送消息,并注册发送回调。该方法会立即返回,当发送完成后,Broker会回调SendCallback通知发送结果
@Override
public void send(Message msg,
                 SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, sendCallback);
}

//异步发送消息,并指定超时时间
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}


//同步发送消息,发往指定队列
@Override
public SendResult send(Message msg, MessageQueue mq)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, mq);
}

//异步发送消息到指定队列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
    throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}

//创建一个Topic。RocketMQ可支持Topic的自动创建,也可手动调用createTopic方法创建。创建Topic时需指定Topic的key,测试时可使用Broker自增key的方式,但是实际生产情况下应使用具有业务意义的key。创建时还可以指定Topic的name、Topic中Queue的数量等。
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
    createTopic(key, newTopic, queueNum, 0);
}


}