Node.js结合RabbitMQ高级特性Prefetch实现消费端限流策略
应用范围为服务访问量突然剧增,原因可能有多种外部的调用或内部的一些问题导致消息积压,对服务的访问超过服务所能处理的最大峰值,导致系统超时负载从而崩溃。
业务场景
举一些我们平常生活中的消费场景,例如:火车票、机票、门票等,通常来说这些服务在下单之后,后续的出票结果都是异步通知的,如果服务本身只支持每秒1000访问量,由于外部服务的原因突然访问量增加到每秒2000并发,这个时候服务接收者因为流量的剧增,超过了自己系统本身所能处理的最大峰值,如果没有对消息做限流措施,系统在这段时间内就会造成不可用,在生产环境这是一个很 严重
的问题,实际应用场景不止于这些,本文通过RabbitMQ来讲解如果对消费端做限流措施。
消费端限流机制
RabbitMQ提供了服务质量保证 ( QOS
) 功能,对channel(通道)预先设置一定的消息数目,每次发送的消息条数都是基于预先设置的数目,如果消费端一旦有未确认的消息,这时服务端将不会再发送新的消费消息,直到消费端将消息进行完全确认,注意:此时消费端不能设置自动签收,否则会无效。
在 RabbitMQv3.3.0
之后,放宽了限制,除了对channel设置之外,还可以对每个消费者进行设置。
以下为 Node.js 开发语言 amqplib
库对于限流实现提供的接口方法 prefetch
export interface Channel extends events.EventEmitter { prefetch(count: number, global?: boolean): Promise<Replies.Empty>; ...}
prefetch 参数说明:
- number:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。
- global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。
建立生产端
生产端没什么变化,和正常声明一样,关于源码参见 https://github.com/Q-Angelo/project-training/tree/master/nodejs/rabbitmq-prefetch
const amqp = require('amqplib');
async function producer() { // 1. 创建链接对象 const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道 const channel = await connection.createChannel();
// 3. 声明参数 const exchangeName = 'qosEx'; const routingKey = 'qos.test001'; const msg = 'Producer:';
// 4. 声明交换机 await channel.assertExchange(exchangeName, 'topic', { durable: true });
for (let i=0; i<5; i++) { // 5. 发送消息 await channel.publish(exchangeName, routingKey, Buffer.from(`${msg} 第${i}条消息`)); }
await channel.close();}
producer();
建立消费端
const amqp = require('amqplib');
async function consumer() { // 1. 创建链接对象 const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道 const channel = await connection.createChannel();
// 3. 声明参数 const exchangeName = 'qosEx'; const queueName = 'qosQueue'; const routingKey = 'qos.#';
// 4. 声明交换机、对列进行绑定 await channel.assertExchange(exchangeName, 'topic', { durable: true }); await channel.assertQueue(queueName); await channel.bindQueue(queueName, exchangeName, routingKey);
// 5. 限流参数设置 await channel.prefetch(1, false);
// 6. 限流,noAck参数必须设置为false await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString());
// channel.ack(msg); }, { noAck: false });}
consumer();
- 未确认消息情况测试
在 consumer 中我们暂且将 channel.ack(msg)
注释掉,分别启动生产者和消费者,看看是什么情况?
如上图所示,总共5条消息按照预先设置的发送了一条消息,因为我将 channel.ack(msg)
注释掉了,服务端在未得到 ack 确认,将不会在发送剩下已 Ready 消息。
- 确认消息测试
修改 consumer 代码,打开确认消息注释,重新启动消费端进行测试
await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString());
channel.ack(msg); // 打开注释}, { noAck: false });
如上图所示,Unacked 为0,消息已全部消费成功。
RabbitMQ限流使用总结
限流在我们的实际工作中还是很有意义的,在使用上生产端没有变化,重点在消费端,着重看以下两点:
- 限流情况 ack 不能设置自动签收,修改
{noAck:false}
- 增加限流参数设置
channel.prefetch(1,false)
资料
- 个人博客: https://www.nodejs.red/
- RabbitMQ系列:RabbitMQ高级消息队列系列文章不断更新中
作者:五月君 链接:https://www.imooc.com/article/287046 来源:慕课网
- Linux进程间通信(IPC)机制总览
- 负载均衡 - 综述
- 浅谈ASP.NET的Postback
- WCF版的PetShop之一:PetShop简介[提供源代码下载]
- 维吉尼亚密码及程序实现
- 迪菲-赫尔曼密钥交换
- 分布式系统组件之配置中心
- github 提交报403 forbidden的错误解决
- Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架
- redis主从集群搭建及容灾部署(哨兵sentinel)
- 浅谈ASP.NET的Postback
- redis配置详解(中英文)
- 我所理解的Remoting(1):Marshaling & Activation[下篇]
- 5分钟短视频,带你走进日志易SPL,助你日志分析更容易
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 详解I/O多路转接之poll&epoll
- 如何科学修改 node_modules 里的文件
- C++之继承相关问题——菱形继承&继承
- 【redis】来吧,展示一下redis 发布-订阅模式
- rsync 用法教程
- 【译】构建RESTful API的13种最佳实践
- 如何优雅的实现消息通信?
- koa框架实现微信公众号回复用户小程序卡片
- frida实战笔记
- 让项目效果更酷!ThingJS地图新功能——3D线条渲染
- 重磅来袭:腾讯云ClickHouse支持数据均衡服务
- 聊聊claudb的set command
- Flutter通过BasicMessageChannel与Android iOS 的双向通信
- MySQL案例:binlog_row_image如何取舍
- “青柠日报”小程序