RabbitMQ 消息队列 学习笔记(2)
时间:2019-08-30
本文章向大家介绍RabbitMQ 消息队列 学习笔记(2),主要包括RabbitMQ 消息队列 学习笔记(2)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
exchange:发给RabbitMQ的消息其实都是发给exchange,exchange是一个中间人,将收到的消息处理转发。
exchangge类型:
(1)fanout:所有bind到此exchange的queue都可以接收消息(全广播)
(2)direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
(3)topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
fanout 简单实现:
fanout实际上是多个消费者将自己的队列绑定到生产者声明的exchange中
producor ;
(1)建立socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道 channel = connection.channel()
(3)声明exchange, channel.exchange_declare(exchange='logs',exchange_type='fanout')
(4)发送消息 channel.basic_publish(exchange='logs', routing_key= '', body=message,)
cosumer ;
(1)建立socket连接 :connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道: channel = connection.channel()
(3)声明exchange : channel.exchange_declare(exchange='logs',exchange_type='fanout')
(4)声明队列 ,设置队列自动删除: result = channel.queue_declare(queue='',exclusive= True)
(5)获取队列名 : queue_name= result.method.queue
(6)绑定exchange:channel.queue_bind(exchange='logs',queue=queue_name
(4)接收广播: channel.basic_consume(queue='',
on_message_callback=recv ,
auto_ack= True)
代码区:
1 #7512
2 '''
3 exchange用法,
4 fanout广播,广播是实时的
5
6 '''
7 import pika,sys
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
10 channel = connection.channel()
11 # channel.queue_declare('msg')
12
13
14 #fanout广播
15 channel.exchange_declare(exchange='logs',
16 exchange_type='fanout')
17
18 message = ' '.join(sys.argv[1:]) or "info :哈哈"
19 # message ="哈哈"
20 channel.basic_publish(exchange='logs',
21 routing_key= '',
22 body=message,
23 )
24
25 print(" [x] Sent 'Hello World!'")
26 connection.close()
1 #7512
2 '''
3 广播接收端
4
5 '''
6 import pika,time
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
9 channel = connection.channel()
10
11 channel.exchange_declare(exchange='logs',
12 exchange_type='fanout')
13
14
15 # exclusive排他,唯一的,不指定queue名字,rabbit会随机分配一个名字
16 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
17 result = channel.queue_declare(queue='',
18 exclusive= True)
19
20 print("result",result,"xxx",result.method.queue)
21 #取queue的名字
22 queue_name= result.method.queueresult.method.queue
23
24 #绑定交换器exchange
25 channel.queue_bind(exchange='logs',
26 queue=queue_name)
27
28
29 def recv(ch ,methrod ,proporties ,body):
30 print("recv msg : [%s]"%body.decode())
31 # time.sleep(20)
32 # ch.basic_ack(delivery_tag = methrod.delivery_tag)
33
34
35 # channel.basic_qos(prefetch_count=1)#最多能处理队列中的1条消息
36 channel.basic_consume(queue='',
37 on_message_callback=recv ,
38 auto_ack= True)
39
40 print(' [*] Waiting for messages. To exit press CTRL+C')
41 channel.start_consuming()
direct 简单实现:
direct实际上是在fanout的基础上实现的,在fanhout基础上加多了一个routing_key来过滤发消息的目标。
代码区:
1 #7512
2
3 import pika
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='direct_logs',
9 exchange_type="direct")
10
11 severity = input(">>:") or "info"
12 massage = input(">>:") or "哈哈"
13
14 channel.basic_publish(exchange="direct_logs",
15 routing_key=severity,
16 body = massage)
17
18 print("[x] Send to %r : %r "%(severity,massage))
19 connection.close()
1 #7512
2 import pika,sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
5 channel = connection.channel()
6
7 channel.exchange_declare(exchange='direct_logs',
8 exchange_type="direct")
9
10 result = channel.queue_declare(queue="",
11 exclusive=True)
12
13 queue_name = result.method.queue
14
15 serverities = sys.argv[1:]
16 if not serverities:
17 sys.exit()
18 print(serverities)
19
20 for i in serverities:
21 channel.queue_bind(exchange='direct_logs',
22 queue=queue_name,
23 routing_key= i )
24
25
26 def callback(ch, method, properties, body):
27 print("ch:",ch, "\nmethod:",method,"\nproperties" ,properties)
28 print(" [x] Received %r" % body.decode())
29 ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
30
31 channel.basic_consume(queue=queue_name,
32 on_message_callback=callback,
33 auto_ack=False)
34
35 channel.start_consuming()
topic简单实现:
1 #7512
2
3 import pika,sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
6
7 channel = connection.channel()
8 channel.exchange_declare(exchange="topic_logs",
9 exchange_type="topic")
10
11 key = sys.argv[1:] if len(sys.argv) > 1 else input(">>:")
12
13 message = input(">>:")
14 channel.basic_publish(exchange='topic_logs',
15 routing_key=key,
16 body=message
17 )
18
19 print("[x] sent [%s] : a msg is [%s]"%(key,message))
20 channel.start_consuming()
1 #7512
2 '''
3 #收所有
4 '''
5 import pika,sys
6
7 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
8 channel = connection.channel()
9
10 channel.exchange_declare(exchange="topic_logs",
11 exchange_type="topic")
12
13 result = channel.queue_declare(queue='',
14 exclusive=True)
15
16 queue_name = result.method.queue
17
18 massage = sys.argv[1:] or input(">>:").split()
19 for msg in massage:
20 channel.queue_bind(queue=queue_name,
21 exchange="topic_logs",
22 routing_key= msg)
23
24 def callback(ch, method, properties, body):
25 print(" [x] Received %r" % body.decode())
26 ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
27
28 channel.basic_consume(queue=queue_name,
29 on_message_callback=callback,
30 auto_ack=False)
31
32 print("[x] wait the msg...")
33 channel.start_consuming()
原文地址:https://www.cnblogs.com/gtq7512/p/11433900.html
- overlayfs存储驱动的使用以及技术探究
- 分页解决方案 之 分页算法——Pager_SQL的详细使用方法和注意事项
- 利用虚拟硬盘(把内存当作硬盘)来提高数据库的效率(目前只针对SQL Server 2000)可以提高很多
- 分页解决方案 之 分页算法——Pager_SQL的思路和使用方法
- 让你的笔记本更快一点——我的笔记本的性能测试和虚拟硬盘(把内存当成硬盘)的使用感觉
- 分页解决方案 之 数据访问函数库——另类的思路、另类的写法,造就了不一样的发展道路。
- 分页解决方案 之 QuickPager的使用方法(在UserControl里面使用分页控件的方法)
- 分页解决方案 之 QuickPager的使用方法(URL分页、自动获取数据)
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自定义获取数据)
- QuickPager asp.net 分页控件、表单控件等自定义控件下载 和介绍 【2009.09.07更新】
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自动获取数据)
- 【自然框架】之鼠标点功能现(二):表单控件的“应用”—— 代码?只写需要的!
- 基于Docker环境中源码部署容器Nginx
- 使用Ansible playbooks快速构建etcd集群
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 三歪吐血总结了各个中间件是如何实现持久化的
- ThreadPoolExecutor源码学习
- Docker六脉神剑(四) 使用Docker-Compose进行服务编排搭建lnmp环境
- 干的想喝水,一篇文章带你读懂硬盘工作原理!
- 微信小程序开发实战(11):滚动组件(picker)
- Docker六脉神剑 (五) Docker Swarm集群搭建及基础服务部署
- 思科模拟器GNS3将路由器变成交换机的方法
- docker安装nginx并配置https
- Docker Swarm集群部署lnmp+redis
- Maven快速入门
- TomCat安装及快速部署
- SpringCloud+MyBatis分页处理(前后端分离)
- 手把手教你搭建SpringCloud项目
- SpringCloud的@Value注解及GitLab配置使用
- 使用 cdk8s 与 Argo CD 进行 GitOps 实践