《深入RabbitMQ》笔记

时间:2022-07-25
本文章向大家介绍《深入RabbitMQ》笔记,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

三大抽象组件: 交换器、队列、抽象 AMQP(高级消息队列协议):

  • 消息帧的构造,其中前面三个是消息帧头: |帧类型|信道编号|帧大小| 帧有效载荷|标记结束位置 |
  • 帧有五种类型: 协议头帧、方法帧、内容头帧、消息体帧、心跳帧

当消息与任一绑定的队列符合匹配标准时,RabbitMQ服务器将以FIFO的顺序将消息放入队列中。放入队列数据结构中的并不是实际消息,而是消息的引用

默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。当添加更多消息时,队列的大小也会随之增加。RabbitMQ可以将这些消息保存在内存中或写入磁盘,具体取决于消息Basic.Properties中指定的delivery-mode属性。

提示 当你为生产环境编写发布者应用程序时,请使用JSON或XML等数据序列化格式以便消费者可以轻松地反序列化消息,这样在解决可能出现的任何问题时更易于阅读。

如果仔细观察,你可能已经注意到,尽管在发布消息时没有指定message_id或timestamp属性,但从消费者打印出来的每条消息中都有它们。如果不指定它们,rabbitpy客户端库将自动为你填充这些属性。另外,如果你发送了一个Python dict结构作为消息,rabbitpy会自动将数据序列化为JSON格式,并将content-type属性设置为application/json。

Basic.Properties几个重要属性:

  • cotent-type
  • content-encoding
  • Message-id: unique-id
  • correlation-id :虽然在AMQP规范中没有关于correlation-id的正式定义,但它的一个用途是指定该消息是另一个消息的响应,通过携带关联消息的message-id可以做到这一点。另一种选择是使用它来传送关联消息的事务ID或其他类似数据
  • expiration:已经过期的消息发布到服务器,则该消息不会被路由到任何队列,而是直接被丢弃
  • x-message-ttl:
  • delivery-mode: 1表示非持久化 2表示持久化
  • user-id 标志已登录用户
  • app-id 标识应用的相关信息
  • type 消息的类型名称,自定义约定格式等可以用,如谷歌的protobuff
  • reply-to: 使用reply-to可以构建一个用来回复消息的私有响应队列
  • headers: RabbitMQ可以根据headers表中填充的值路由消息,而不需要依赖于路由键
  • priority: 优先级0-9,值越小优先级越大
  • cluster-id不应该使用,rabbitMQ没有实现该属性的行为

Basic.Publish:

  • mandatory: mandatory标志是一个与Basic.PublishRPC命令一起传递的参数,告诉RabbitMQ如果消息不可路由,它应该通过Basic.Return RPC命令将消息返回给发布者(见图4.3)。设置mandatory标志可以被认为是开启故障检测模式;它只会让RabbitMQ向你通知失败,而不会通知成功。如果消息路由正确,你的发布者将不会收到通知

发布者确认作为事务的轻量级替代方法 如rabbitpy里实现的channel.enable_publisher_confirms(),是异步响应

事物机制

使用HA(高可用)队列避免节点故障 它允许队列在多个服务器上拥有冗余副本

rabbitmq提供了相关的管理API查询状态

消费消息:

在简单的消息速度测试中,使用Basic.Consume至少是使用Basic.Get的两倍,速度不同的最明显原因是使用Basic.Get会导致每条消息都会产生与RabbitMQ同步通信的开销,这一过程由发送请求帧的客户端应用程序和发送应答的RabbitMQ组成

当一个客户端发出一个Basic.Consume时,一旦有消息可用时RabbitMQ就会进行发送,直到客户端发出一个Basic.Cancel为止

优化消费者性能

  • 使用no-ack模式实现更快的吞吐量 [最大的吞吐量,速度最快,可靠性最低] ,如果有可用消息,RabbitMQ将会持续向消费者发送它们直到套接字缓冲区被填满为止,linux系统默认的是128KB,对于大多数场景而言,可以设置为16MB(16777216)

大多数linux发型版可以在/etc/sysctl.conf中设置,或手动通过以下命令更改:

echo 16777216 > /proc/sys/net/core/rmem_default
echo 16777216 > /proc/sys/net/core/rmem_max
  • 通过服务质量设置控制消费者预取 QOS(服务质量),使用单个消费者对一个简单消息进行基准测试,图5.7表明在这种情况下,预取总量为2500是消息速率达到峰值的最佳设置。QOS可以允许一次确认多个消息,即message.ack(all_previous=True),缺点是确认多个存在一定的风险性。
  • 消费者使用事务 注意:事务不适用于已禁用确认的消费者。
  • 消息拒绝 reject()只适用于单个,如果你不确定是消息本身还是消费者的其他原因引发了错误,那么检查redelivered标志是一个好方法,可以帮你在碰到问题时决定是否应该拒绝那些要重新发送或丢弃的消息;如果是在rabbitMQ中拒绝多个的话,需要basic.Nack,这个是RMQ对AMQP协议的改良。

死信交换器 注意 死信交换器与第4章讨论的备用交换器不同。过期或被拒绝的消息通过死信交换器进行投递,而备用交换器则路由那些无法由RabbitMQ路由的信息。

队列的类型

临时队列

  • 自动删除的队列。应用场景:聊天室,每个用户分配一个自动删除的队列
#! /usr/bin/env python3
# -*- coding:utf-8 -*
# product
import rabbitpy

url = 'amqp://guest:guest@localhost:5672/%2F'
with rabbitpy.Connection(url) as conn:
    with conn.channel() as channel:
        exchange = rabbitpy.Exchange(channel,'ad-example')
        exchange.declare()
        # queue = rabbitpy.Queue(channel,'ad_example',auto_delete=True)
        # queue.declare()
        # binded = queue.bind(exchange,'ad-example-routing-key')
        message = rabbitpy.Message(channel,
        'this_is delete',
        {'content_type':'text/plain'},
        opinionated=True
        )
        rtl = message.publish(exchange,'ad-example-routing-key')
        print(rtl)
#! /usr/bin/env python3
# -*- coding:utf-8 -*
# 自动删除的comsumer
import rabbitpy
import time

url = 'amqp://guest:guest@localhost:5672/%2F'
with rabbitpy.Connection(url) as conn:
    with conn.channel() as channel:
        exchange = rabbitpy.Exchange(channel,'ad-example')
        exchange.declare()
        queue = rabbitpy.Queue(channel,'ad_example_steve',auto_delete=True)
        queue.declare()
        binded = queue.bind(exchange,'ad-example-routing-key')
        while(True):
            if(len(queue)<=0):
                time.sleep(2)
                continue
            message = queue.get()
            message.pprint()
            message.ack()
  • 只允许单个消费者:exclusive
  • 自动过期队列: ■ 队列只有在没有消费者的情况下才会过期。如果你有连接着消费者的队列,则只有在发出Basic.Cancel请求或断开连接之后才会自动将其删除。 ■ 队列只有在TTL周期之内没有收到Basic.Get请求时才会到期。一旦一个Basic.Get请求中已经包含了一个具有过期值的队列,那么过期设置无效,该队列将不会被自动删除。 ■ 与任何其他队列一样,不能重新声明或更改x-expires的设置和参数。如果能够重新声明队列,然后用x-expires参数的值延长过期时间,那么你将违反AMQP规范中的硬性规则,即客户端不得尝试用不同的设置重新声明队列。■ RabbitMQ不保证删除过期队列这一过程的时效性。

永久队列

创建队列时声明:durable=True

队列消息自动过期: 创建队列时,设置arguments={"x-message-ttl":1000} 声明队列时同时指定死信交换器和TTL值将导致该队列中已到期的消息成为死信消息

最大长度队列: 从RabbitMQ 3.1.0开始,可以在声明队列时指定最大长度arguments={"x-max-length":1000}

消息路由模式

exchange_type:

  • direct。根据路由key全匹配。
  • fanout。Direct交换器使得队列能够接收特定目的的消息。不同于此,fanout交换器并不作区分。所有发往fanout交换器的消息会被投递到所有绑定到该交换器上的队列中
  • topic。路由key模式匹配,通过使用星号(*)和井号(#)字符,你可以在同一时刻匹配路由键的特定部分,甚至是多个部分。星号将会匹配路由键中下一个句点前的所有字符,而井号键将会匹配接下来所有的字符,包括句点
  • headers交换器。第四种內建交换器类型是headers交换器。它通过采用消息属性中的headers表支持任意的路由策略。绑定至headers交换器的队列会向Queue.Bind参数中传入键值对数组以及x-match参数。x-match参数是字符串类型,可以设置为any或者all。如果将其设置为any,同时headers表中的值匹配了任何一个绑定值的话,消息就会被路由过去。如果将x-match设置为all的话,那么所有传入Queue.Bind中的参数值必须全部匹配才行。这并不排除消息在headers表中拥有额外的键值对。传统观点认为,由于额外的计算复杂性,headers交换器比其他交换类型要慢得多。但是在本章基准测试中,我发现在headers属性中使用相同数量的值时,所有内置交换机之间在性能方面并没有显着差异
  • 一致性哈希交换器。它将数据分发给绑定的队列上。它可以为用于接收消息的队列做负载均衡,基于路由键或者消息属性中的headers表的哈希值来分发消息至绑定队列。

交换器间路由,将一个exchange绑定到另外一个exchange

使用替代协议

MQTT

原因:

AMQP 0-9-1这一健壮的协议可以满足大多数应用程序与RabbitMQ的通信需求。而对于特殊使用场景,我们有更好的选择。举例来说,移动设备由于其高延迟、不可靠的网络通信会给AMQP带来诸多问题。相对而言,某些应用场景下,客户端应用程序不愿维护长连接,但是却想高速发送消息。这时,基于状态的AMQP就显得过于复杂了。此外,一些应用程序可能已经支持消息通信了,但却没有采用AMQP协议MQTT被设计用来在资源约束的设备以及低带宽的环境下使用,而不必牺牲消息通信的可靠性

应用场景:

MQTT协议适用于移动端应用。STOMP相对于AMQP来说更为简单。Web版的STOMP协议被设计用于Web浏览器。statelessd适用于高速消息发送。

消息队列遥测传输(MQ Telemetry Transpor,即MQTT)协议是一种轻量级的消息通信协议,在移动端应用中应用广泛。RabbitMQ通过插件机制来支持它。

评估是否适合用mqtt协议,架构是否能从mqtt的最后遗愿功能中获益,(LWT使得客户端能够在无意间断开连接时,发送一条指定的消息)。也许你会触及到MQTT的最大消息长度:256MB。。

优缺点:发送小型消息如应用状态等,MQTT比http更合适,但如果是传输图片视频等信息的话,http更合适(http支持快文件上传)。

amq.topic交换器是默认的交换器,MQTT客户端会将消息发往该交换器上。在发布消息时,MQTT插件会自动将MQTT topic名称中的正斜杠变更为句点,用作AMQP路由键。

当通过MQTT连接RabbitMQ来订阅消息时,RabbitMQ将创建新的队列。队列名称将采用mqtt-subscriber-[NAME]qos[N]的格式。其中[NAME]是唯一的客户端名称,[N]是客户端连接设置的QoS等级。举例来说,一个名为mqtt-subscriber-facebookqos0的队列,代表订阅者名称为facebook,并且QoS设置为了0。一旦为订阅请求创建队列之后,那么该队列将会采用AMQP点分路由键的语义,被绑定到topic交换器上

对于MQTT客户端发送消息来说,加号符号(+)是用于路由键的单层匹配,而非星号(*)

STOMP

STOMP专门设计用于基于流的处理,STOMP帧是UTF-8文本,由命令和命令对应的载荷组成,并以null(0x00)字节结束。不同于AMQP和MQTT协议,STOMP是可读的,而且不需要二进制位封装信息来定义STOMP消息帧和内容,是一种人类可读的文本协议,采用utf-8编码。 STOMP协议是一种人类可读的基于文本的流协议,其设计简单易用。尽管AMQP和MQTT等二进制协议可能更高效,STOMP协议通过使用更少的数据来传输相同的消息也颇具优势,特别是在使用STOMP插件和RabbitMQ时。队列创建和绑定行为需要较少的代码,但也需要付出代价。由STOMP插件创建的代理AMQP连接,在与RabbitMQ通信进行时需要对STOMP数据进行翻译,这相对于直接使用AMQP连接来说会有额外的开销。

statelessd

个人觉得类似于一个维护连接状态的中间件。

出现背景:当我们开始使用RabbitMQ时,我们立刻发现对于我们的PHP应用程序栈来说,有状态的AMQ协议非常昂贵。我们发现PHP无法维持跨客户端请求的开放连接和信道状态。为了发布消息,PHP应用程序在处理每个请求时,都需要与RabbitMQ建立新的连接。为了解决这个问题,我们最终创建了HTTP转AMQP的发布网关statelessd。它需要接收高速的HTTP请求,同时管理用于消息发布所需的连接栈。另外,它不会成为性能瓶颈,并且能可靠地将消息发送到RabbitMQ。