如何使用RabbitMQ和Python的Puka为多个用户提供消息

时间:2022-06-06
本文章向大家介绍如何使用RabbitMQ和Python的Puka为多个用户提供消息,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

准备

RabbitMQ

只有在安装和配置软件后,才能使用RabbitMQ发送和接收消息,安装教程可以参考CentOS安装RabbitMQ的教程

Puka Python库

本文中的所有示例都是使用Python语言提供的,该语言使用处理AMQP消息传递协议的puka库进行备份。但由于AMQP是一种广泛采用的协议,因此任何其他编程语言都可以实现类似的目标。

可以使用 Python pip包管理器快速安装puka

pip install puka

pip并不总是与Linux发行版捆绑在一起。在基于Debian的发行版(包括Ubuntu)上,可以使用以下命令轻松安装:

apt-get install python-pip

基于RHEL,如CentOS:

yum install python-setuptools
easy_install pip

RabbitMQ简介及其术语

消息传递特指RabbitMQ介绍了一些描述消息代理及其机制的基本原理的术语。

  • 生产者发送消息的一方,因此发送消息意味着生产者正在创建消息。
  • 消费者接收消息的一方,因此接收消息意味着消费消息。
  • 队列是一个缓冲区,其中存储已发送的消息并准备接收。单个队列可以容纳多少条消息没有限制。对于有多少生产者可以向队列发送消息也没有限制,也没有多少消费者可以尝试访问它。当消息命中现有队列时,它会在那里等待,直到消费者访问该特定队列为止。当消息命中不存在的队列时,它将被丢弃。
  • Exchange是驻留在生产者和队列之间的实体。生产者永远不会直接向队列发送消息。它将消息发送到交换机,交换机又将消息放置到一个或多个队列中,具体取决于所使用的交换实体。举例子来说,交换就像邮递员:它处理邮件,以便将邮件传递到正确的队列(邮箱),消费者可以从中收集邮件。
  • 绑定是队列和交换之间的连接。Exchange提供特定exchange绑定的队列。究竟如何取决于exchange本身。

本文将使用上述五个术语。还有一个与puka python库严格相关的库,其被作为首选库。这可以理解为对AMQP服务器的同步请求,可以保证请求的执行(无论是否成功)以及决定在完成请求之前所等待的客户端。

虽然puka可以异步工作,但在我们的示例中,puka将用作同步库。这意味着在每次请求(承诺)之后,puka将持续等待直到下一步执行前。关于RabbitMQ的更多基本概念详情请参考腾讯云+社区

使用简单示例测试RabbitMQ和Puka

要测试消息代理和puka是否工作正常,并掌握发送和接收消息在实践中的工作方式,请创建一个名为的示例python脚本 rabbit_test.py

vim rabbit_test.py

粘贴脚本内容:

import puka

# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")

# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)

# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)

# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)

# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
producer.wait(send_promise)

print "Message sent!"

# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)

print "Starting receiving!"

while True:
    received_message = consumer.wait(receive_promise)
    print "GOT: %r" % (received_message['body'],)
    break

:wq保存文件并退出。

运行脚本会打印脚本并发送到RabbitMQ队列,测试程序会在之后立即收到消息。

输出应如下所示:

root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'
root@rabbitmq:~#

让我们来看一下此代码中发生的情况:

  1. 消费者和生产者都被创建并连接到驻留在localhost的同一个RabbitMQ服务器上
  2. 生产者声明一个队列,以确保在生成消息时它存在。如果不是这样的话,则队列可能不存在,因此消息可能会立即被丢弃。
  3. 生产者使用路由密钥将消息发送到nameless_exchange,路由密钥指定预先创建的队列。之后,消息将命中exchange,然后exchange将其置于“rabbit”队列中。然后该消息就在那里等待,直到有人接收它。
  4. 消费者访问“rabbit”队列并开始接收存储在那里的消息。因为有一条消息在等待,所以它会立即发送。它被接收后,意味着它将不再留在队列中。
  5. 接收的消息将打印在屏幕上。

Fanout Exchange

在前面的示例中,无名exchange将消息传递到名为“rabbit”的特定队列。无名exchange需要队列名称才能工作,这意味着它只能将消息传递给单个队列。

RabbitMQ中还有其他类型的交换,其中一个是fanout,这是我们在本文中的主要关注点。fanout交换是一种简单的blind工具,可以将消息传递给它所知道的所有队列。通过fanout交换,不需要提供特定的队列名称。在生成消息之前,将发送到该类交换的消息传递到绑定到交换的所有队列。可以连接到交换机的队列数量没有限制。

发布/订阅模式

通过fanout交换,我们可以轻松创建发布/订阅模式。生产者定期向他们可能不知道的用户发送消息(制作消息并将其发送到fanout exchange)。新订阅者订阅业务通讯(将自己的队列绑定到同一个简报fanout),从业务通讯fanout交换将向所有注册用户(队列)发送消息。

虽然一对一的消息传递非常简单,开发人员经常使用其他通信手段,一对多(其中“多”是不明确的,可以之间的任何批次)是一种非常流行的方案,其中的消息代理可以提供巨大的帮助。

生产者应用

生产者应用程序的唯一作用是创建一个fanout exchange,并为该交换产生周期性消息(每隔几秒)。其将自动生成消息。此应用程序将充当业务通讯。

创建一个名为newsletter_produce.py的python脚本

vim newsletter_produce.py

并粘贴脚本内容:

import puka
import datetime
import time

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)

# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)

# send current time in a loop
while True:
    message = "%s" % datetime.datetime.now()

    message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
    producer.wait(message_promise)

    print "SENT: %s" % message

    time.sleep(1)

producer.close()

让我们一步一步地用例子来解释代码中发生的事情。

  1. 生成器客户端已创建并连接到本地RabbitMQ实例。从现在开始,它可以自由地与RabbitMQ通信。
  2. newsletter创建一个命名的fanout exchange。在该步骤之后,交换存在于RabbitMQ服务器上,可用于将队列绑定到它并通过它发送消息。
  3. 在无限循环中,将向newsletter交换所生成具有当前时间的消息。请注意,它的routing_key是空的,这意味着没有指定特定的队列。交换机将进一步向正确的队列传递消息。

该应用程序在运行时会将当前时间通知所有的业务订阅者。

接收者应用

接收者应用程序将创建一个临时队列并将其绑定到命名的fanout交换。之后,它将开始等待消息。在将队列绑定到交换机之后,由此消费者接收由之前创建的生产者发送的每条消息。此应用程序将充当订阅者- 可以一次多次运行应用程序,但仍然所有实例都将接收广播消息。

创建一个名为newsletter_consume.py的python脚本

vim newsletter_consume.py

并粘贴脚本内容:

import puka

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "GOT: %r" % message['body']

consumer.close()

接收者代码比生产者代码复杂一点。让我们一步一步地看一下:

  1. 接收者客户端已创建并连接到本地RabbitMQ实例。
  2. 创建临时队列。临时意味着没有提供名称,RabbitMQ将自动生成队列名称。此外,在客户端断开连接后,此类队列将被销毁。这是一种创建队列的常用方法,这些队列只存在于绑定到其中一个交换机而没有其他特殊用途的情况下。由于要创建一个队列来接收任何内容,因此避免考虑队列名称是一种方便的方法。
  3. 创建的队列绑定到newsletter exchange。从那一刻起,fanout exchange将把每条消息传递到该队列。
  4. 在无限循环中,接收者在队列上等待,接收到达队列并将其打印在屏幕上的每条消息。

该应用程序在运行时会从业务通讯处收到时间通知。它可以一次执行多次,此应用程序的每个实例都将获得当前时间。

测试两个应用程序

要测试业务通讯及其使用者,请打开与虚拟服务器的多个SSH会话(如果在本地计算机上工作,打开多个终端窗口)。

在其中一个窗口中运行生产者应用程序。

root@rabbitmq:~# python newsletter_produce.py

它将开始显示当前时间:

SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...

在每个其他窗口中运行接收者应用程序:

root@rabbitmq:~# python newsletter_consume.py

此应用程序的每个实例都将收到生产者广播的时间通知:

GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...

这意味着RabbitMQ正确注册了fanout交换,将订户队列绑定到此交换,并将发送的消息传递到正确的队列。换句话说,RabbitMQ正在按预期工作。

进一步阅读

发布/订阅是一种简单的(在概念上和实现中)消息传递模式,通常可以派上用场; 但RabbitMQ可以做到更多。有许多方法可以使用RabbitMQ来解决消息传递问题,包括高级消息路由,消息确认,安全性或持久性。

本文的主要目标是使用简单的示例介绍基本的消息传递概念


参考文献:《How To Use RabbitMQ and Python's Puka to Deliver Messages to Multiple Consumers》