RabbitMq死信队列
时间:2020-04-10
本文章向大家介绍RabbitMq死信队列,主要包括RabbitMq死信队列使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
死信队列的作用
死信交换机有什么用呢? 在创建队列的时候 可以给这个队列附带一个交换机, 那么这个队列作废的消息就会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。
死信消息产生的来源
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
死信队列处理的方式
- 丢弃,如果不是很重要,可以选择丢弃
- 记录死信入库,然后做后续的业务分析或处理
- 通过死信队列,由负责监听死信的应用程序进行处理
消息超时进入死信队列
通俗的说,就是消息产生之后,因为设置了超时时间,在这段时间内消息没有被消费就会被扔到死信队列里面。
// 交换机名称
private static final String DESTINATION_NAME = "rabbitMq_topic";
//消息队列
private static final String queueName = "topic_queue";
//routingKey
private static final String routingKey = "topic.#";
//配置死信队列
private static final String dlxExchangeName = "dlx.exchange";
private static final String dlxQueueName = "dlx.queue";
private static final String dlxRoutingKey = "#";
@Test
public void producer() throws IOException, TimeoutException {
//获取连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
// 设置队列中的消息 60s 钟后过期
arguments.put("x-message-ttl", 60000);
//正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消费声明队列
channel.queueDeclare(queueName, true, false, false, arguments);
//消费者队列绑定交换机 绑定路由件 路由键
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
// 创建死信交换器和队列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生产者发送消息者
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.err.println("消息发送完成......");
}
只监听了死信队列的消息,正常消息无需监听接收
/**
* 监听死信队列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
System.out.println("死信消费者启动 ..........");
Thread.sleep(65000);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信队列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}
消息被退回
这个我在之前的整合SpringBoot的时候有实验过。
channel.basicNack(envelope.getDeliveryTag(),false,false);
队列达到最大长度
这个和消息超时差不多,只不过是设置了队列的最大容量而已。
只需要把上面的代码修改一下就可以了。
@Test
public void producer() throws IOException, TimeoutException {
//获取连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
//设置队列长度为3
arguments.put("x-max-length", 3);
//正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消费声明队列
channel.queueDeclare(queueName, true, false, false, arguments);
//消费者队列绑定交换机 绑定路由件 路由键
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
// 创建死信交换器和队列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生产者发送消息者
for (int i = 0; i < 5; i++) {
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
}
System.out.println("消息发送完成......");
}
@Test
public void consumer() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
//此处设置一次只消费1个,且必须是ASK之后的消息才能算
channel.basicQos(1);
System.out.println("消费者启动 ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("正常队列:" + new String(body));
System.out.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
}
/**
* 监听死信队列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
System.out.println("死信消费者启动 ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信队列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}
原文地址:https://www.cnblogs.com/yangk1996/p/12674015.html
- git 常用命令
- Windows系统性能分析
- 在Android中调用WebService
- SQL:数据与运算的融合体
- Windows Server AppFabric Caching支持大数据量的配置
- Python函数之匿名函数
- 配置Windows 2008 R2 防火墙允许远程访问SQL Server 2008 R2
- ASP.NET BBcode
- JavaScript 常用算法
- SVG 使用
- 小程能走网店模式?如何做分销模式的小程序?
- IIS 6 下配置以 FastCGI 跑 PHP
- IIS 7.x Application Request Routing(ARR) 502错误的解决方法
- 移动前端头部标签(HTML5 meta)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 缓冲流---为字节流和字符流复制文件增加缓冲流
- 字符流---输入输出与复制文本文件
- Spring 中的几个 PostProcessor 的区别与联系
- Spring 中基于注解的事务控制及原理分析
- Spring 中根据环境切换配置 @Profile
- Spring 中的属性赋值
- Spring 中控制 Bean 生命周期的几种方式及 BeanPostProcessor 执行原理
- Spring 中的 @Import 注解及向容器中添加 Bean 的几种方式
- Spring 注解开发之 @Conditional
- JDBC 技术应用实践:网上书城后台管理系统
- 思科模拟器:路由器基本配置
- 思科模拟器:实验五——高级路由实验
- 从零开始重新认识 SpringMVC
- 字节跳动Android实习面试凉凉经,两轮面试我被完虐了...
- 高效、简单、方便管理与维护的开源运维工单系统