rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关

时间:2019-09-27
本文章向大家介绍rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关,主要包括rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

转:https://blog.csdn.net/u014373554/article/details/92686063

项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败、消息持久化、消费者失败处理方法和发送消息解决方法及手动确认的模式

先引入pom.xml

<!--rabbitmq-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application 配置文件

spring:
rabbitmq:
  host: IP地址
  port: 5672
  username: 用户名
  password: 密码

RabbitConfig配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


/**
 Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 Queue:消息的载体,每个消息都会被投到一个或多个队列。
 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 Producer:消息生产者,就是投递消息的程序.
 Consumer:消息消费者,就是接受消息的程序.
 Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    public static final String EXCHANGE_A = "my_mq_exchange_A";
    public static final String EXCHANGE_B = "my_mq_exchange_B";
    public static final String EXCHANGE_C = "my_mq_exchange_C";

    public static final String QUEUE_A="QUEUE_A";
    public static final String QUEUE_B="QUEUE_B";
    public static final String QUEUE_C="QUEUE_C";


    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试
        connectionFactory.setChannelCacheSize(100);//解决多线程发送消息

        return connectionFactory;
    }
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMandatory(true); //设置发送消息失败重试
        return template;

    }
    //配置使用json转递数据
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /*public SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());
        return container;
    }*/

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无 routingkey的概念
     * HeadersExchange: 通过添加属性key - value匹配
     * DirectExchange: 按照routingkey分发到指定队列
     * TopicExchange : 多关键字匹配
     * @return
     */
    @Bean
    public DirectExchange defaultExchange(){
        return new DirectExchange(EXCHANGE_A,true,false);
    }

    @Bean
    public Queue queueA(){
        return  new Queue(QUEUE_A,true);// 队列持久化
    }

    @Bean
    public Queue queueB(){
        return  new Queue(QUEUE_B,true);// 队列持久化
    }

    /**
     * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

}

生成者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * 生产者
 */
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public ProducerMessage(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        rabbitTemplate.setReturnCallback(this::returnedMessage);
        rabbitTemplate.setMandatory(true);
    }

    public void  sendMsg (Object content){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);

    }

    /**
     * 消息发送到队列中,进行消息确认
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(" 消息确认的id: " + correlationData);
        if(ack){
            log.info("消息发送成功");
            //发送成功 删除本地数据库存的消息
        }else{
            log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);
            // 根据本地消息的状态为失败,可以用定时任务去处理数据

        }
    }

    /**
     * 消息发送失败返回监控
     * @param message
     * @param i
     * @param s
     * @param s1
     * @param s2
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);

    }
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 消费者
 */

@Slf4j
@Component

public class ComsumerMessage {

    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public void handleMessage(Message message,Channel channel) throws  IOException{
        try {
            String json = new String(message.getBody());
            JSONObject jsonObject = JSONObject.fromObject(json);
            log.info("消息了【】handleMessage" +  json);
            int i = 1/0;
            //业务处理。
            /**
             * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据
             * 1、有数据则不消费,直接应答处理
             * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
             * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
             */

            //手动应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            log.error("消费消息失败了【】error:"+ message.getBody());
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            // 处理消息失败,将消息重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }

    }

}

发送消息:调用生成的方法

import com.zz.blog.BlogApplicationTests;
import com.zz.blog.mq.ProducerMessage;
import net.sf.json.JSONObject;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
public class Message extends BlogApplicationTests {
    @Autowired
    private ProducerMessage producerMessage;

    @Test
    public void sendMessage(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", UUID.randomUUID().toString());
        jsonObject.put("name","TEST");
        jsonObject.put("desc","订单已生成");
        //防止发送消息失败,将发送消息存入本地。

        producerMessage.sendMsg(jsonObject.toString());

    }
}

rabbitTemplate的发送消息流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发

当然了,这种解决方式也有一定的问题
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

消息存入本地:在message 发消息的写数据库中。

消息应答成功,则删除本地消息,失败更改消息状态,可以使用定时任务去处理。

消息持久化:

消费者: 

/**
 * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据
 * 1、有数据则不消费,直接应答处理
 * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
 * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
 */

原文地址:https://www.cnblogs.com/duende99/p/11597619.html