RabbitMQ 消息队列 学习笔记(2)

时间:2019-08-30
本文章向大家介绍RabbitMQ 消息队列 学习笔记(2),主要包括RabbitMQ 消息队列 学习笔记(2)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

exchange:发给RabbitMQ的消息其实都是发给exchange,exchange是一个中间人,将收到的消息处理转发。

exchangge类型:
(1)fanout:所有bind到此exchange的queue都可以接收消息(全广播)
(2)direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
(3)topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    表达式符号说明:#代表一个或多个字符,*代表任何字符
        例:#.a会匹配a.a,aa.a,aaa.a等
            *.a会匹配a.a,b.a,c.a等

 

fanout 简单实现:

  fanout实际上是多个消费者将自己的队列绑定到生产者声明的exchange中

producor ;
(1)建立socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道 channel = connection.channel()
(3)声明exchange, channel.exchange_declare(exchange='logs',exchange_type='fanout')
(4)发送消息 channel.basic_publish(exchange='logs', routing_key= '', body=message,)


cosumer ;
(1)建立socket连接 :connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道: channel = connection.channel()
(3)声明exchange : channel.exchange_declare(exchange='logs',exchange_type='fanout')
(4)声明队列 ,设置队列自动删除: result = channel.queue_declare(queue='',exclusive= True)
(5)获取队列名 : queue_name= result.method.queue
(6)绑定exchange:channel.queue_bind(exchange='logs',queue=queue_name
(4)接收广播: channel.basic_consume(queue='',
on_message_callback=recv ,
auto_ack= True)

代码区:
 1 #7512
 2 '''
 3 exchange用法,
 4 fanout广播,广播是实时的
 5 
 6 '''
 7 import pika,sys
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
10 channel = connection.channel()
11 # channel.queue_declare('msg')
12 
13 
14 #fanout广播
15 channel.exchange_declare(exchange='logs',
16                          exchange_type='fanout')
17 
18 message = ' '.join(sys.argv[1:]) or "info :哈哈"
19 # message ="哈哈"
20 channel.basic_publish(exchange='logs',
21                       routing_key= '',
22                       body=message,
23                       )
24 
25 print(" [x] Sent 'Hello World!'")
26 connection.close()
fanout生产者
 1 #7512
 2 '''
 3 广播接收端
 4 
 5 '''
 6 import pika,time
 7 
 8 connection  = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
 9 channel = connection.channel()
10 
11 channel.exchange_declare(exchange='logs',
12                          exchange_type='fanout')
13 
14 
15 # exclusive排他,唯一的,不指定queue名字,rabbit会随机分配一个名字
16 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
17 result = channel.queue_declare(queue='',
18                                exclusive= True)
19 
20 print("result",result,"xxx",result.method.queue)
21 #取queue的名字
22 queue_name= result.method.queueresult.method.queue
23 
24 #绑定交换器exchange
25 channel.queue_bind(exchange='logs',
26                    queue=queue_name)
27 
28 
29 def recv(ch ,methrod ,proporties ,body):
30     print("recv msg : [%s]"%body.decode())
31     # time.sleep(20)
32     # ch.basic_ack(delivery_tag = methrod.delivery_tag)
33 
34 
35 # channel.basic_qos(prefetch_count=1)#最多能处理队列中的1条消息
36 channel.basic_consume(queue='',
37                       on_message_callback=recv ,
38                       auto_ack= True)
39 
40 print(' [*] Waiting for messages. To exit press CTRL+C')
41 channel.start_consuming()
fanout消费者

 

direct 简单实现:

  direct实际上是在fanout的基础上实现的,在fanhout基础上加多了一个routing_key来过滤发消息的目标。

代码区:

 1 #7512
 2 
 3 import pika
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='direct_logs',
 9                          exchange_type="direct")
10 
11 severity = input(">>:")  or "info"
12 massage = input(">>:")  or "哈哈"
13 
14 channel.basic_publish(exchange="direct_logs",
15                       routing_key=severity,
16                       body = massage)
17 
18 print("[x] Send to %r : %r "%(severity,massage))
19 connection.close()
direct生产者
 1 #7512
 2 import pika,sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='direct_logs',
 8                          exchange_type="direct")
 9 
10 result = channel.queue_declare(queue="",
11                       exclusive=True)
12 
13 queue_name = result.method.queue
14 
15 serverities = sys.argv[1:]
16 if not serverities:
17     sys.exit()
18 print(serverities)
19 
20 for i in serverities:
21     channel.queue_bind(exchange='direct_logs',
22                  queue=queue_name,
23                  routing_key= i )
24 
25 
26 def callback(ch, method, properties, body):
27     print("ch:",ch, "\nmethod:",method,"\nproperties" ,properties)
28     print(" [x] Received %r" % body.decode())
29     ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
30 
31 channel.basic_consume(queue=queue_name,
32                       on_message_callback=callback,
33                       auto_ack=False)
34 
35 channel.start_consuming()
direct消费者

 

topic简单实现:

  

 1 #7512
 2 
 3 import pika,sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
 6 
 7 channel = connection.channel()
 8 channel.exchange_declare(exchange="topic_logs",
 9                          exchange_type="topic")
10 
11 key = sys.argv[1:]  if len(sys.argv) > 1 else input(">>:")
12 
13 message =  input(">>:")
14 channel.basic_publish(exchange='topic_logs',
15                       routing_key=key,
16                       body=message
17                       )
18 
19 print("[x] sent [%s] : a msg is [%s]"%(key,message))
20 channel.start_consuming()
topic生产者
 1 #7512
 2 '''
 3 #收所有
 4 '''
 5 import pika,sys
 6 
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange="topic_logs",
11                          exchange_type="topic")
12 
13 result = channel.queue_declare(queue='',
14                       exclusive=True)
15 
16 queue_name = result.method.queue
17 
18 massage = sys.argv[1:] or input(">>:").split()
19 for msg in massage:
20     channel.queue_bind(queue=queue_name,
21                        exchange="topic_logs",
22                        routing_key= msg)
23 
24 def callback(ch, method, properties, body):
25     print(" [x] Received %r" % body.decode())
26     ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
27 
28 channel.basic_consume(queue=queue_name,
29                       on_message_callback=callback,
30                       auto_ack=False)
31 
32 print("[x]  wait the msg...")
33 channel.start_consuming()
topic消费者

 

原文地址:https://www.cnblogs.com/gtq7512/p/11433900.html