RabbitMQ-消息确认机制之消息的正确消费

时间:2019-10-31
本文章向大家介绍RabbitMQ-消息确认机制之消息的正确消费,主要包括RabbitMQ-消息确认机制之消息的正确消费使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

---恢复内容开始---

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhuzhezhuzhe1/article/details/80709737

上节中我们讲了如何确保消息的准确发布,今天我们来看看如何确保消息的正确消费。

在之前的基础上我们对消费者(仓库服务)进行完善。

修改配置文件application.yml

消费者的ack方式默认是自动的,也就是说消息一旦被消费(无论是否处理成功),消息都会被确认,然后会从队列中删除。这就意味着当消息处理失败的时候,也会被从队列中删除,这绝对不是我们所期望的。我们希望当消息正确消费时,消息从队列中删除,否则,消息不能删除,该消息应该继续被消费,直到成功消费。

所以,首先我们将ack的方式设置为手动:

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xx
    port: 5672
    username: xxxx
    password: xxxx
    listener:
      direct:
        acknowledge-mode: manual   # 配置该消费者的ack方式为手动

消费成功后手动确认

处理成功时直接确认,处理失败时,将消息重新放回队列中。

  1. package com.space.rbq.store.consumer;
  2. import com.google.gson.Gson;
  3. import com.rabbitmq.client.Channel;
  4. import com.space.rbq.store.bean.Order;
  5. import com.space.rbq.store.service.StoreService;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.io.IOException;
  12. /**
  13. * 负责接收处理订单服务发送的消息
  14. * @author zhuzhe
  15. * @date 2018/6/7 10:09
  16. * @email 1529949535@qq.com
  17. */
  18. @Slf4j
  19. @Component
  20. public class OrderConsumer {
  21. @Autowired
  22. private StoreService storeService;
  23. /*对列名称*/
  24. public final String QUEUE_NAME1 = "first-queue";
  25. /**
  26. * queues 指定从哪个队列(queue)订阅消息
  27. * @param message
  28. * @param channel
  29. */
  30. @RabbitListener(queues = {QUEUE_NAME1})
  31. public void handleMessage(Message message,Channel channel) throws IOException {
  32. try {
  33. // 处理消息
  34. System.out.println("OrderConsumer {} handleMessage :"+message);
  35. // 执行减库存操作
  36. storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
  37. /**
  38. * 第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得
  39. * 第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
  40. * 如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,
  41. * 能够提高效率。
  42. *
  43. * 同样的,如果要nack或者拒绝消息(reject)的时候,
  44. * 也是调用channel里面的basicXXX方法就可以了(要指定tagId)。
  45. *
  46. * 注意:如果抛异常或nack(并且requeue为true),消息会重新入队列,
  47. * 并且会造成消费者不断从队列中读取同一条消息的假象。
  48. */
  49. // 确认消息
  50. // 如果 channel.basicAck channel.basicNack channel.basicReject 这三个方法都不执行,消息也会被确认
  51. // 所以,正常情况下一般不需要执行 channel.basicAck
  52. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  53. }catch (Exception e){
  54. log.error("OrderConsumer handleMessage {} , error:",message,e);
  55. // 处理消息失败,将消息重新放回队列
  56. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
  57. }
  58. }
  59. }
  60. /*
  61. * 消息的标识,false只确认当前一个消息收到,true确认consumer获得的所有消息
  62. * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  63. *
  64. * ack返回false,并重新回到队列
  65. * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  66. *
  67. * 拒绝消息
  68. * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  69. *
  70. */
这样,如果处理失败,handleMessage方法就会一直收到这个消息,直到成功消费。



源码:https://github.com/zhuzhegithub/rabbitmq

回到目录

转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1


版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。

https://blog.csdn.net/zhuzhezhuzhe1


---恢复内容结束---

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhuzhezhuzhe1/article/details/80709737

上节中我们讲了如何确保消息的准确发布,今天我们来看看如何确保消息的正确消费。

在之前的基础上我们对消费者(仓库服务)进行完善。

修改配置文件application.yml

消费者的ack方式默认是自动的,也就是说消息一旦被消费(无论是否处理成功),消息都会被确认,然后会从队列中删除。这就意味着当消息处理失败的时候,也会被从队列中删除,这绝对不是我们所期望的。我们希望当消息正确消费时,消息从队列中删除,否则,消息不能删除,该消息应该继续被消费,直到成功消费。

所以,首先我们将ack的方式设置为手动:

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xx
    port: 5672
    username: xxxx
    password: xxxx
    listener:
      direct:
        acknowledge-mode: manual   # 配置该消费者的ack方式为手动

消费成功后手动确认

处理成功时直接确认,处理失败时,将消息重新放回队列中。

  1. package com.space.rbq.store.consumer;
  2. import com.google.gson.Gson;
  3. import com.rabbitmq.client.Channel;
  4. import com.space.rbq.store.bean.Order;
  5. import com.space.rbq.store.service.StoreService;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.io.IOException;
  12. /**
  13. * 负责接收处理订单服务发送的消息
  14. * @author zhuzhe
  15. * @date 2018/6/7 10:09
  16. * @email 1529949535@qq.com
  17. */
  18. @Slf4j
  19. @Component
  20. public class OrderConsumer {
  21. @Autowired
  22. private StoreService storeService;
  23. /*对列名称*/
  24. public final String QUEUE_NAME1 = "first-queue";
  25. /**
  26. * queues 指定从哪个队列(queue)订阅消息
  27. * @param message
  28. * @param channel
  29. */
  30. @RabbitListener(queues = {QUEUE_NAME1})
  31. public void handleMessage(Message message,Channel channel) throws IOException {
  32. try {
  33. // 处理消息
  34. System.out.println("OrderConsumer {} handleMessage :"+message);
  35. // 执行减库存操作
  36. storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
  37. /**
  38. * 第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得
  39. * 第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
  40. * 如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,
  41. * 能够提高效率。
  42. *
  43. * 同样的,如果要nack或者拒绝消息(reject)的时候,
  44. * 也是调用channel里面的basicXXX方法就可以了(要指定tagId)。
  45. *
  46. * 注意:如果抛异常或nack(并且requeue为true),消息会重新入队列,
  47. * 并且会造成消费者不断从队列中读取同一条消息的假象。
  48. */
  49. // 确认消息
  50. // 如果 channel.basicAck channel.basicNack channel.basicReject 这三个方法都不执行,消息也会被确认
  51. // 所以,正常情况下一般不需要执行 channel.basicAck
  52. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  53. }catch (Exception e){
  54. log.error("OrderConsumer handleMessage {} , error:",message,e);
  55. // 处理消息失败,将消息重新放回队列
  56. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
  57. }
  58. }
  59. }
  60. /*
  61. * 消息的标识,false只确认当前一个消息收到,true确认consumer获得的所有消息
  62. * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  63. *
  64. * ack返回false,并重新回到队列
  65. * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  66. *
  67. * 拒绝消息
  68. * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  69. *
  70. */
这样,如果处理失败,handleMessage方法就会一直收到这个消息,直到成功消费。



源码:https://github.com/zhuzhegithub/rabbitmq

回到目录

转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1


版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。

https://blog.csdn.net/zhuzhezhuzhe1


原文地址:https://www.cnblogs.com/blwy-zmh/p/11772398.html