rabbitmq学习笔记、、安装、配置、docker、springboot整合
时间:2021-08-04
本文章向大家介绍rabbitmq学习笔记、、安装、配置、docker、springboot整合,主要包括rabbitmq学习笔记、、安装、配置、docker、springboot整合使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
## 1 前言 |
---|
### 什么是消息中间件 |
#### 简介: |
1.消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 |
2.消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 |
#### 作用: |
1.可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。 |
① 异步解耦 |
② 流量削峰 |
③ 降低耦合度 |
## 2 消息中间件的应用场景 |
- 1.异步发送短信 |
- 2.异步调用第三方接口 |
- 3.异步接收订单状态变化 |
- 4.异步发送优惠券 |
- 5.开通会员 |
## 3 消息中间件名词 |
- Producer 生产者:投递消息到MQ服务器端 |
- Consumer 消费者:从MQ服务器端获取消息处理业务逻辑 |
- Broker MQ服务器端 |
- Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题 |
- Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表 |
- Message 生产者投递消息报文:json |
## 4 主流消息中间件对比 |
## 5 消息的可靠性 |
``` |
生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息 |
如果mq服务器端宕机之后,消息如何保证不丢失 |
1.持久化机制 |
如果mq接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失? |
不会丢失,消息确认机制 必须要消费者消费该消息成功之后,在通知给mq服务器端 |
删除该消息。 |
Mq服务器端将该消息推送消费者: |
消费者已经和mq服务器保持长连接。 |
消费者主动拉取消息: |
消费者第一次刚启动的时候 |
Mq如何实现抗高并发思想 |
Mq消费者根据自身能力情况 ,拉取mq服务器端消息消费。 |
默认的情况下是取出一条消息。 |
缺点:存在延迟的问题 |
需要考虑mq消费者提高速率的问题: |
如何消费者提高速率:消费者实现集群、消费者批量获取消息即可。 |
``` |
## 6 RabbitMQ常用信息 |
``` |
RabbitMQ 管理平台地址 http://127.0.0.1:15672 |
默认账号:guest/guest 用户可以自己创建新的账号 |
``` |
``` |
Virtual Hosts: |
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢? |
RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每 |
个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 |
默认的端口15672:rabbitmq管理平台端口号 |
默认的端口5672: rabbitmq消息中间内部通讯的端口 |
默认的端口号25672 rabbitmq集群的端口号 |
``` |
## 7 docker搭建RabbitMQ |
``` |
下载 |
docker pull rabbitmq:management |
docker images |
创建实例并启动 |
docker run -d --hostname rabbit-crazyfur --name crazyfur-rabbitmq --restart=always \ |
-p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management |
``` |
## 8 快速入门RabbitMQ简单队列 |
### Maven依赖 |
``` |
``` |
### 控制台创建虚拟主机和队列 |
### 1、创建链接 |
``` |
package com.example.rabbitmqdemo.hello; |
import com.rabbitmq.client.Connection; |
import com.rabbitmq.client.ConnectionFactory; |
import java.io.IOException; |
import java.util.concurrent.TimeoutException; |
/** |
* @author xiaomao |
* @version 1.0 |
* @des 描述 |
* @date 2021/8/3 0003 |
* //TODO |
*/ |
public class HelloRabbitMq { |
/** |
* 获取连接 |
* @return |
* @throws IOException |
* @throws TimeoutException |
*/ |
public static Connection getConnection() throws IOException, TimeoutException { |
// 1.创建连接 |
ConnectionFactory connectionFactory = new ConnectionFactory(); |
// 2.设置连接地址 |
connectionFactory.setHost("127.0.0.1"); |
// 3.设置端口号: |
connectionFactory.setPort(5672); |
// 4.设置账号和密码 |
connectionFactory.setUsername("guest"); |
connectionFactory.setPassword("guest"); |
// 5.设置VirtualHost |
connectionFactory.setVirtualHost("/study"); |
return connectionFactory.newConnection(); |
} |
} |
``` |
### 2、创建生产者 |
``` |
package com.example.rabbitmqdemo.hello; |
import com.rabbitmq.client.Channel; |
import com.rabbitmq.client.Connection; |
import java.io.IOException; |
import java.util.concurrent.TimeoutException; |
/** |
* @author xiaomao |
* @version 1.0 |
* @des 描述 |
* @date 2021/8/3 0003 |
* //TODO |
*/ |
public class Producer { |
private static final String QUEUE_NAME = "test"; |
public static void main(String[] args) throws IOException, TimeoutException { |
// 1.创建连接 |
Connection connection = HelloRabbitMq.getConnection(); |
// 2.设置通道 |
Channel channel = connection.createChannel(); |
// 3.设置消息 |
String msg = "我是消息"; |
System.out.println("msg:" + msg); |
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); |
channel.close(); |
connection.close(); |
} |
} |
``` |
### 3、创建消费者 |
``` |
package com.example.rabbitmqdemo.hello; |
import com.rabbitmq.client.*; |
import java.io.IOException; |
import java.util.concurrent.TimeoutException; |
/** |
* @author xiaomao |
* @version 1.0 |
* @des 描述 |
* @date 2021/8/3 0003 |
* //TODO |
*/ |
public class Consumer { |
private static final String QUEUE_NAME = "test"; |
public static void main(String[] args) throws IOException, TimeoutException { |
// 1.创建连接 |
Connection connection = HelloRabbitMq.getConnection(); |
// 2.设置通道 |
Channel channel = connection.createChannel(); |
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { |
@Override |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
String msg = new String(body, "UTF-8"); |
System.out.println("消费者获取消息:" + msg); |
} |
}; |
// 3.监听队列 |
channel.basicConsume(QUEUE_NAME, true, defaultConsumer); |
} |
} |
``` |
> 先启动生产者,再启动消费者测试 |
## 9 RabbitMQ如何保证消息不丢失 |
``` |
1.生产者角色 确保生产者投递消息到MQ服务器端成功。Ack 消息确认机制同步或者异步的形式方式1:Confirms方式2:事务消息2.消费者角色 在rabbitmq情况下: 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。 在kafka中的情况下: 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。3.Mq服务器端 在默认的情况下 都会对队列中的消息实现持久化持久化硬盘。 |
``` |
``` |
1.使用消息确认机制+持久技术A.消费者确认收到消息机制 channel.basicConsume(QUEUE_NAME, false, defaultConsumer);注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。在处理完消息时,返回应答状态,true表示为自动应答模式。channel.basicAck(envelope.getDeliveryTag(), false);B.生产者确认投递消息成功 使用Confirm机制 或者事务消息 |
``` |
### 2.RabbitMQ默认创建是持久化的 |
``` |
参数名称详解:durable是否持久化 durable为持久化、 Transient 不持久化autoDelete 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 |
``` |
### 使用rabbitmq事务消息(生产者确认-事务) |
``` |
channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// int i = 5 / 0;channel.txCommit(); |
``` |
### 生产者投递确认(同步) |
``` |
public class Producer { private static final String QUEUE_NAME = "test"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.创建一个新连接 Connection connection = RabbitMQConnection.getConnection(); //2.设置channel Channel channel = connection.createChannel(); //3.发送消息 String msg = "生产者投递确认"; |
``` |
### 消费者消费确认 |
``` |
public class Consumer { private static final String QUEUE_NAME = "test"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接 Connection connection = HelloRabbitMq.getConnection(); // 2.设置通道 Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msg); // Integer deliveryMode = properties.getDeliveryMode(); long deliveryTag = envelope.getDeliveryTag(); // 签收 channel.basicAck(deliveryTag,false); channel.close(); connection.close(); } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }} |
``` |
## 10 RabbitMQ五种消息模式 |
### RabitMQ工作队列 |
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。 |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps1.jpg) |
采用工作队列 |
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。 |
channel.basicQos(1) |
### RabbitMQ交换机类型 |
- Direct exchange(直连交换机) |
- Fanout exchange(扇型交换机) |
- Topic exchange(主题交换机) |
- Headers exchange(头交换机) |
Virtual Hosts---区分不同的团队 |
----队列 存放消息 |
----交换机 路由消息存放在那个队列中 类似于nginx |
---路由key 分发规则 |
### RabbitMQ Fanout 发布订阅 |
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。 |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps2.jpg) |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps3.jpg) |
原理: |
1. 需要创建两个队列 ,每个队列对应一个消费者 |
2. 队列需要绑定我们交换机 |
3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来; |
4. 消费者从队列中获取这个消息 |
#### 案列 |
##### 创建交换机 |
##### 创建队列 |
##### 生产者代码 |
``` |
import com.example.rabbitmqdemo.ysj.hello.HelloRabbitMq;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO /public class ProducerFanout { /* * 定义交换机的名称 */ private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 创建Connection Connection connection = HelloRabbitMq.getConnection(); // 创建Channel Channel channel = connection.createChannel(); // 通道关联交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); String msg = "6666666"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); }} |
``` |
##### 消费者代码 |
``` |
import com.example.rabbitmqdemo.ysj.hello.HelloRabbitMq;import com.rabbitmq.client.*;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO /public class MailConsumer { /* * 定义邮件队列 / private static final String QUEUE_NAME = "fanout_email_queue"; /* * 定义交换机的名称 */ private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); // 创建我们的连接 Connection connection = HelloRabbitMq.getConnection(); // 创建我们通道 final Channel channel = connection.createChannel(); // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("邮件消费者获取消息:" + msg); } }; // 开始监听消息 自动签收 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }} |
``` |
``` |
import com.example.rabbitmqdemo.ysj.hello.HelloRabbitMq;import com.rabbitmq.client.*;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO /public class SmsConsumer { /* * 定义短信队列 / private static final String QUEUE_NAME = "fanout_email_sms"; /* * 定义交换机的名称 */ private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); // 创建我们的连接 Connection connection = HelloRabbitMq.getConnection(); // 创建我们通道 final Channel channel = connection.createChannel(); // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("短信消费者获取消息:" + msg); } }; // 开始监听消息 自动签收 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }} |
``` |
### Direct路由模式 |
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息 |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps4.jpg) |
### Topic主题模式 |
当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。 |
#号表示支持匹配多个词 |
*号表示只能匹配一个词 |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps5.jpg) |
## 11 SpringBoot整合RabbitMQ |
### Maven依赖 |
``` |
``` |
### 配置 |
``` |
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO /@Componentpublic class RabbitMQConfig { /* * 定义交换机 / private String EXCHANGE_SPRINGBOOT_NAME = "/mayikt_ex"; /* * 短信队列 / private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; /* * 邮件队列 / private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; /* * 配置smsQueue * * @return / @Bean public Queue smsQueue() { return new Queue(FANOUT_SMS_QUEUE); } /* * 配置emailQueue * * @return / @Bean public Queue emailQueue() { return new Queue(FANOUT_EMAIL_QUEUE); } /* * 配置fanoutExchange * * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME); } // 绑定交换机 sms @Bean public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(smsQueue).to(fanoutExchange); } // 绑定交换机 email @Bean public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(emailQueue).to(fanoutExchange); }} |
``` |
### 生产者 |
``` |
import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO /@RestControllerpublic class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; /* * 发送消息 * * @return / @RequestMapping("/sendMsg") public String sendMsg(String msg) { /* * 1.交换机名称 * 2.路由key名称 * 3.发送内容 */ amqpTemplate.convertAndSend("/mayikt_ex", "", msg); return "success"; }} |
``` |
### 短信消费者 |
``` |
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO */@Slf4j@Component@RabbitListener(queues = "fanout_sms_queue")public class FanoutSmsConsumer { @RabbitHandler public void process(String msg) { log.info(">>短信消费者消息msg:{}<<", msg); }} |
``` |
### 邮件消费者 |
``` |
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * @author xiaomao * @version 1.0 * @des 描述 * @date 2021/8/3 0003 * //TODO */@Slf4j@Component@RabbitListener(queues = "fanout_email_queue")public class FanoutEmailConsumer { @RabbitHandler public void process(String msg) { log.info(">>邮件消费者消息msg:{}<<", msg); }} |
``` |
## 生产者如何获取消费结果 |
``` |
1.根据业务来定消费者消费成功结果:1.能够在数据库中插入一条数据2.Rocketmq 自带全局消息id,能够根据该全局消息获取消费结果原理: 生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id,当我们消费者消费该消息成功之后,消费者会给我们mq服务器端发送通知标记该消息消费成功。生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口查询该消息是否有被消费成功。1.异步返回一个全局id,前端使用ajax定时主动查询;2.在rocketmq中,自带根据消息id查询是否消费成功 |
``` |
## 12 RabbitMQ实战解决方案 |
### RabbitMQ死信队列 |
#### 死信队列产生的背景 |
``` |
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。 |
``` |
#### 产生死信队列的原因 |
``` |
1. 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。2. 队列达到最大的长度 (队列容器已经满了)3. 消费者消费多次消息失败,就会转移存放到死信队列中 |
``` |
![img](file:///C:\Users\Administrator\AppData\Local\Temp\ksohtml16488\wps6.jpg) |
#### 死信队列的架构原理 |
``` |
死信队列和普通队列区别不是很大普通与死信队列都有自己独立的交换机和路由key、队列和消费者。区别:1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。 |
``` |
#### 死信队列应用场景 |
``` |
1.30分钟订单超时设计A. Redis过期key :B. 死信延迟队列实现:采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后就会将该消息转移到死信备胎消费者实现消费。备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下则会开始回滚库存操作。 |
``` |
### RabbitMQ消息幂等问题 |
#### RabbitMQ消息自动重试机制 |
``` |
1. 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。需要人为指定重试次数限制问题2. 在什么情况下消费者需要实现重试策略? A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。B.消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。如果是重试多次还是失败消息,需要重新发布消费者版本实现消费可以使用死信队列 |
``` |
``` |
Mq在重试的过程中,有可能会引发消费者重复消费的问题。Mq消费者需要解决 幂等性问题幂等性 保证数据唯一 方式1:生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中。消费者获取到我们该消息,可以根据该全局唯一id实现去重复。全局唯一id 根据业务来定的 订单号码作为全局的id实际上还是需要再db层面解决数据防重复。方式2:业务逻辑是在做insert操作 使用唯一主键约束方式2:业务逻辑是在做update操作 使用乐观锁 |
``` |
1. 当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试) |
2. 应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿 |
#### 如何合理选择消息重试 |
1. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试 ? |
2. 消费者获取消息后,应该代码问题抛出数据异常,是否需要重试? |
总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。 |
#### Rabbitmq如何开启重试策略 |
``` |
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /meite_rabbitmq listener: simple: retry: ####开启消费者(程序出现异常的情况下会)进行重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 |
``` |
#### 消费者重试过程中,如何避免幂等性问题 |
重试的过程中,为了避免业务逻辑重复执行,建议提前全局**id提前查询,如果存在的情况下,就无需再继续做该流程。 |
重试的次数最好有一定间隔次数,在数据库底层层面保证数据唯一性,比如加上唯一id |
#### SpringBoot开启消息确认机制 |
``` |
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /meiteVirtualHosts listener: simple: retry: ####开启消费者(程序出现异常的情况下会)进行重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 acknowledge-mode: manual |
``` |
##### 消费者ack代码 |
``` |
@Slf4j@Component@RabbitListener(queues = "fanout_order_queue")public class FanoutOrderConsumer { @Autowired private OrderManager orderManager; @Autowired private OrderMapper orderMapper; @RabbitHandler public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {// try { log.info(">>orderEntity:{}<<", orderEntity.toString()); String orderId = orderEntity.getOrderId(); if (StringUtils.isEmpty(orderId)) { log.error(">>orderId is null<<"); return; } OrderEntity dbOrderEntity = orderMapper.getOrder(orderId); if (dbOrderEntity != null) { log.info(">>该订单已经被消费过,无需重复消费!<<"); // 无需继续重试 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } int result = orderManager.addOrder(orderEntity); log.info(">>插入数据库中数据成功<<"); if (result >= 0) { // 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }// int i = 1 / 0;// } catch (Exception e) {// // 将失败的消息记录下来,后期采用人工补偿的形式// } }} |
``` |
## 死信队列配置 |
``` |
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /meiteVirtualHostsserver: port: 8080###模拟演示死信队列mayikt: dlx: exchange: mayikt_dlx_exchange queue: mayikt_order_dlx_queue routingKey: dlx ###备胎交换机 order: exchange: mayikt_order_exchange queue: mayikt_order_queue routingKey: mayikt.order |
``` |
``` |
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;@Componentpublic class DeadLetterMQConfig { /** * 订单交换机 / @Value("${mayikt.order.exchange}") private String orderExchange; /* * 订单队列 / @Value("${mayikt.order.queue}") private String orderQueue; /* * 订单路由key / @Value("${mayikt.order.routingKey}") private String orderRoutingKey; /* * 死信交换机 / @Value("${mayikt.dlx.exchange}") private String dlxExchange; /* * 死信队列 / @Value("${mayikt.dlx.queue}") private String dlxQueue; /* * 死信路由 / @Value("${mayikt.dlx.routingKey}") private String dlxRoutingKey; /* * 声明死信交换机 * * @return DirectExchange / @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /* * 声明死信队列 * * @return Queue / @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /* * 声明订单业务交换机 * * @return DirectExchange / @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /* * 声明订单队列 * * @return Queue / @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /* * 绑定死信队列到死信交换机 * * @return Binding / @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /* * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); }} |
``` |
原文地址:https://www.cnblogs.com/crazyfur/p/15098052.html
- WebAssembly详解及其使用案例
- 30分钟精通快应用
- BZOJ2440: [中山市选2011]完全平方数(莫比乌斯+容斥原理)
- Dapper扩展之~~~Dapper.Contrib
- JVM活学活用——GC算法 垃圾收集器
- BZOJ1101: [POI2007]Zap(莫比乌斯反演)
- freeRTOS事件组学习
- 洛谷P2522 [HAOI2011]Problem b(莫比乌斯反演)
- JVM活学活用——优化springboot
- .Net中的AOP读书笔记系列之AOP介绍
- 洛谷P3327 [SDOI2015]约数个数和(莫比乌斯反演)
- .Net中的AOP系列之构建一个汽车租赁应用(上)
- Modbus RTU驱动程序开发指引
- freeRTOS任务创建
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PyQt5 技巧篇-解决相对路径无法加载图片问题,styleSheet通过"相对"路径加载图片,python获取当前运行文件的绝对路径。
- 基于MMM搭建MySQL Replication集群高可用架构
- Python 技术篇-按任意格式灵活获取日期、时间、年月日、时分秒。日期格式化。
- 当删库时如何避免跑路
- Python 句法错误:"SyntaxError: invalid character in identifier",原因及解决方法
- Python3 多线程问题:ModuleNotFoundError: No module named 'thread',原因及解决办法。
- 文件传输和秒传
- 关于数据库的各种备份与还原姿势详解
- Python 技术篇-多线程的2种创建方法,多线程的简单用法,快速上手。
- Python 技术篇-调用浏览器访问指定网页,一行代码实现。非Selenium。
- 数据库热备份神器 - XtraBackup
- Python 技术篇-读取文件,将内容保存dict字典中。去掉字符串中的指定字符方法。dict字典的遍历。
- PyQt5 技术篇-plainTextEdit控件获得文本内容方法、设置文本内容方法。
- PyQt5 技术篇-鼠标移动控件显示提示,Qt Designer控件提示设置方法。
- PyQt5 技术篇-窗口名、窗口图标的设置方法。