RabbitMq死信队列

时间:2020-04-10
本文章向大家介绍RabbitMq死信队列,主要包括RabbitMq死信队列使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

死信队列的作用

死信交换机有什么用呢? 在创建队列的时候 可以给这个队列附带一个交换机, 那么这个队列作废的消息就会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。

死信消息产生的来源

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信队列处理的方式

  • 丢弃,如果不是很重要,可以选择丢弃
  • 记录死信入库,然后做后续的业务分析或处理
  • 通过死信队列,由负责监听死信的应用程序进行处理

消息超时进入死信队列

通俗的说,就是消息产生之后,因为设置了超时时间,在这段时间内消息没有被消费就会被扔到死信队列里面。

 // 交换机名称
    private static final String DESTINATION_NAME = "rabbitMq_topic";
    //消息队列
    private static final String queueName = "topic_queue";
    //routingKey
    private static final String routingKey = "topic.#";

    //配置死信队列
    private static final String dlxExchangeName = "dlx.exchange";
    private static final String dlxQueueName = "dlx.queue";
    private static final String dlxRoutingKey = "#";

    @Test
    public void producer() throws IOException, TimeoutException {
        //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        Map<String, Object> arguments = new HashMap<String, Object>(16);
        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange", dlxExchangeName);
        // 设置队列中的消息 60s 钟后过期
        arguments.put("x-message-ttl", 60000);
        //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
        channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
        //消费声明队列
        channel.queueDeclare(queueName, true, false, false, arguments);
        //消费者队列绑定交换机 绑定路由件 路由键
        channel.queueBind(queueName, DESTINATION_NAME, routingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

        //生产者发送消息者
        channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.err.println("消息发送完成......");

    }

只监听了死信队列的消息,正常消息无需监听接收

    /**
     * 监听死信队列
     *
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    @Test
    public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        System.out.println("死信消费者启动 ..........");
        Thread.sleep(65000);
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:" + new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(dlxQueueName, consumer);
    }

消息被退回

这个我在之前的整合SpringBoot的时候有实验过。

channel.basicNack(envelope.getDeliveryTag(),false,false);

队列达到最大长度

这个和消息超时差不多,只不过是设置了队列的最大容量而已。
只需要把上面的代码修改一下就可以了。

    @Test
    public void producer() throws IOException, TimeoutException {
        //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<String, Object>(16);
        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange", dlxExchangeName);
        //设置队列长度为3
        arguments.put("x-max-length", 3);
        //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
        channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
        //消费声明队列
        channel.queueDeclare(queueName, true, false, false, arguments);
        //消费者队列绑定交换机 绑定路由件 路由键
        channel.queueBind(queueName, DESTINATION_NAME, routingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

        //生产者发送消息者
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
        }
        System.out.println("消息发送完成......");
    }
 @Test
    public void consumer() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //此处设置一次只消费1个,且必须是ASK之后的消息才能算
        channel.basicQos(1);
        System.out.println("消费者启动 ..........");
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("正常队列:" + new String(body));
                System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, consumer);
    }

    /**
     * 监听死信队列
     *
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    @Test
    public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = MQConnectionUtils.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        System.out.println("死信消费者启动 ..........");
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:" + new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(dlxQueueName, consumer);
    }


原文地址:https://www.cnblogs.com/yangk1996/p/12674015.html