RabbitMQ-消息确认机制之消息的正确消费
---恢复内容开始---
上节中我们讲了如何确保消息的准确发布,今天我们来看看如何确保消息的正确消费。
在之前的基础上我们对消费者(仓库服务)进行完善。
修改配置文件application.yml
消费者的ack方式默认是自动的,也就是说消息一旦被消费(无论是否处理成功),消息都会被确认,然后会从队列中删除。这就意味着当消息处理失败的时候,也会被从队列中删除,这绝对不是我们所期望的。我们希望当消息正确消费时,消息从队列中删除,否则,消息不能删除,该消息应该继续被消费,直到成功消费。
所以,首先我们将ack的方式设置为手动:
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 5672
username: xxxx
password: xxxx
listener:
direct:
acknowledge-mode: manual # 配置该消费者的ack方式为手动
消费成功后手动确认
处理成功时直接确认,处理失败时,将消息重新放回队列中。
- package com.space.rbq.store.consumer;
-
- import com.google.gson.Gson;
- import com.rabbitmq.client.Channel;
- import com.space.rbq.store.bean.Order;
- import com.space.rbq.store.service.StoreService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * 负责接收处理订单服务发送的消息
- * @author zhuzhe
- * @date 2018/6/7 10:09
- * @email 1529949535@qq.com
- */
- @Slf4j
- @Component
- public class OrderConsumer {
-
- @Autowired
- private StoreService storeService;
-
- /*对列名称*/
- public final String QUEUE_NAME1 = "first-queue";
-
- /**
- * queues 指定从哪个队列(queue)订阅消息
- * @param message
- * @param channel
- */
- @RabbitListener(queues = {QUEUE_NAME1})
- public void handleMessage(Message message,Channel channel) throws IOException {
- try {
- // 处理消息
- System.out.println("OrderConsumer {} handleMessage :"+message);
- // 执行减库存操作
- storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
-
- /**
- * 第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得
- * 第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
- * 如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,
- * 能够提高效率。
- *
- * 同样的,如果要nack或者拒绝消息(reject)的时候,
- * 也是调用channel里面的basicXXX方法就可以了(要指定tagId)。
- *
- * 注意:如果抛异常或nack(并且requeue为true),消息会重新入队列,
- * 并且会造成消费者不断从队列中读取同一条消息的假象。
- */
- // 确认消息
- // 如果 channel.basicAck channel.basicNack channel.basicReject 这三个方法都不执行,消息也会被确认
- // 所以,正常情况下一般不需要执行 channel.basicAck
- // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
-
- }catch (Exception e){
- log.error("OrderConsumer handleMessage {} , error:",message,e);
- // 处理消息失败,将消息重新放回队列
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
- }
- }
- }
- /*
- * 消息的标识,false只确认当前一个消息收到,true确认consumer获得的所有消息
- * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- *
- * ack返回false,并重新回到队列
- * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
- *
- * 拒绝消息
- * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- *
- */
这样,如果处理失败,handleMessage方法就会一直收到这个消息,直到成功消费。源码:https://github.com/zhuzhegithub/rabbitmq
转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1
版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。
https://blog.csdn.net/zhuzhezhuzhe1
---恢复内容结束---
上节中我们讲了如何确保消息的准确发布,今天我们来看看如何确保消息的正确消费。
在之前的基础上我们对消费者(仓库服务)进行完善。
修改配置文件application.yml
消费者的ack方式默认是自动的,也就是说消息一旦被消费(无论是否处理成功),消息都会被确认,然后会从队列中删除。这就意味着当消息处理失败的时候,也会被从队列中删除,这绝对不是我们所期望的。我们希望当消息正确消费时,消息从队列中删除,否则,消息不能删除,该消息应该继续被消费,直到成功消费。
所以,首先我们将ack的方式设置为手动:
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 5672
username: xxxx
password: xxxx
listener:
direct:
acknowledge-mode: manual # 配置该消费者的ack方式为手动
消费成功后手动确认
处理成功时直接确认,处理失败时,将消息重新放回队列中。
- package com.space.rbq.store.consumer;
-
- import com.google.gson.Gson;
- import com.rabbitmq.client.Channel;
- import com.space.rbq.store.bean.Order;
- import com.space.rbq.store.service.StoreService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * 负责接收处理订单服务发送的消息
- * @author zhuzhe
- * @date 2018/6/7 10:09
- * @email 1529949535@qq.com
- */
- @Slf4j
- @Component
- public class OrderConsumer {
-
- @Autowired
- private StoreService storeService;
-
- /*对列名称*/
- public final String QUEUE_NAME1 = "first-queue";
-
- /**
- * queues 指定从哪个队列(queue)订阅消息
- * @param message
- * @param channel
- */
- @RabbitListener(queues = {QUEUE_NAME1})
- public void handleMessage(Message message,Channel channel) throws IOException {
- try {
- // 处理消息
- System.out.println("OrderConsumer {} handleMessage :"+message);
- // 执行减库存操作
- storeService.update(new Gson().fromJson(new String(message.getBody()),Order.class));
-
- /**
- * 第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得
- * 第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
- * 如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,
- * 能够提高效率。
- *
- * 同样的,如果要nack或者拒绝消息(reject)的时候,
- * 也是调用channel里面的basicXXX方法就可以了(要指定tagId)。
- *
- * 注意:如果抛异常或nack(并且requeue为true),消息会重新入队列,
- * 并且会造成消费者不断从队列中读取同一条消息的假象。
- */
- // 确认消息
- // 如果 channel.basicAck channel.basicNack channel.basicReject 这三个方法都不执行,消息也会被确认
- // 所以,正常情况下一般不需要执行 channel.basicAck
- // channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
-
- }catch (Exception e){
- log.error("OrderConsumer handleMessage {} , error:",message,e);
- // 处理消息失败,将消息重新放回队列
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
- }
- }
- }
- /*
- * 消息的标识,false只确认当前一个消息收到,true确认consumer获得的所有消息
- * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- *
- * ack返回false,并重新回到队列
- * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
- *
- * 拒绝消息
- * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- *
- */
这样,如果处理失败,handleMessage方法就会一直收到这个消息,直到成功消费。源码:https://github.com/zhuzhegithub/rabbitmq
转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1
版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。
https://blog.csdn.net/zhuzhezhuzhe1
原文地址:https://www.cnblogs.com/blwy-zmh/p/11772398.html
- .Net多线程编程—任务Task
- 学会WCF之试错法——安全配置报错分析
- 生物信息学技能面试题(第5题)-根据GTF画基因的多个转录本结构
- 学会WCF之试错法——超时
- 学会WCF之试错法——客户端调用基础
- 【直播】我的基因组58:用R包SNPRelate来对我的基因型跟hapmap计划数据比较
- 生物信息学技能面试题(第4题)-多个同样的行列式文件合并起来
- PHP 底层的运行机制与原理
- asp.net web api 版本控制
- 如何编写更好的SQL查询:终极指南(上)
- asp.net web api 异常捕获
- asp.net web api 文件上传
- 使用MySQL正则表达式 __MySQL必知必会
- 史上最好用的免费翻蔷利器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Docker 入门到实战教程(七)安装Redis
- Docker 入门到实战教程(八)安装Mysql
- Docker 入门到实战教程(九)安装Nginx
- Docker教程(九)部署Spring Boot项目
- Docker 入门到实战教程(十一)部署Vue+SpringBoot 前后端分离项目
- Docker 入门到实战教程(十二)ELK+Filebeat搭建日志分析系统
- Docker 入门到实战教程(十三)Docker Compose
- 解决IDEA2020.1版本的lombok插件问题
- 工具系列 | 视频监控RTSP转HLS解决方案
- Redis系列 |(一)六种基本数据结构
- 工具系列 | Jenkins 构建伟大,无所不能
- 工具系列 | H5自定义视频播放器实现
- 前端系列 |原生JS和jQuery循环遍历函数
- 工具系列 | H5如何实现人脸识别
- 形式化分析工具(六):HLPSL Tutorial(Example3)