python中RabbitMQ的使用(工作队列)
消息可以理解为任务,消息发送者可以看成任务派送者(sender),消息接收者可以看成工作者(worker)。
当工作者接收到一个任务,还没完任务时分配者又发一个任务,此时需要多个工作者来共同处理这些任务。
任务分派结构图如下:
注:此时有一个任务派送人P,两个工作接收者C1和C2。
现在我们来模拟该情况:
1.首先打开三个终端:
2.分别在前两个终端运行receive1.py
3.在第三个终端多次运行send1.py
此时将会轮流向worker1和worker2分派任务。
问题:
在以上任务分配和完成情况中,有几个问题将会产生:
1.工作者任务是否完成?
2.工作者挂掉后,如何防止未完成的任务丢失,并且如何处理这些任务?
3.RabbitMQ自身出现问题,此时如何防止任务丢失?
4.任务有轻重之分,如何实现公平调度?
方案:
1.消息确认(Message acknowledgment)
当任务完成后,工作者(receiver)将消息反馈给RabbitMQ:
1 def callback(ch, method, properties, body): 2 print " [x] Received %r" % (body,) 3 #停顿5秒,方便ctrl+c退出 4 time.sleep(5) 5 print " [x] Done" 6 #当工作者完成任务后,会反馈给rabbitmq 7 ch.basic_ack(delivery_tag=method.delivery_tag)
2.保留任务(no_ack=False)
当工作者挂掉后,防止任务丢失:
# 去除no_ack=True参数或者设置为False后可以实现 # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。 channel.basic_consume(callback, queue='task_queue', no_ack=False)
3.消息持久化存储(Message durability)
声明持久化存储:
# durable=True即声明持久化存储 channel.queue_declare(queue='task_queue', durable=True)
在发送任务时,用delivery_mode=2来标记任务为持久化存储:
1 # 用delivery_mode=2来标记任务为持久化存储: 2 channel.basic_publish(exchange='', 3 routing_key='task_queue', 4 body=message, 5 properties=pika.BasicProperties( 6 delivery_mode=2, 7 ))
4.公平调度(Fair dispatch)
使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务
channel.basic_qos(prefetch_count=1)
完整代码如下:
receive1.py
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 import pika 4 import time 5 6 hostname = '192.168.1.133' 7 parameters = pika.ConnectionParameters(hostname) 8 connection = pika.BlockingConnection(parameters) 9 10 # 创建通道 11 channel = connection.channel() 12 # durable=True后将任务持久化存储,防止任务丢失 13 channel.queue_declare(queue='task_queue', durable=True) 14 15 16 # ch.basic_ack为当工作者完成任务后,会反馈给rabbitmq 17 def callback(ch, method, properties, body): 18 print " [x] Received %r" % (body,) 19 time.sleep(5) 20 print " [x] Done" 21 ch.basic_ack(delivery_tag=method.delivery_tag) 22 23 # basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务, 24 # 即只有工作者完成任务之后,才会再次接收到任务。 25 channel.basic_qos(prefetch_count=1) 26 27 # 去除no_ack=True参数或者设置为False后可以实现 28 # 一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。 29 channel.basic_consume(callback, queue='task_queue', no_ack=False) 30 # 开始接收信息,按ctrl+c退出 31 print ' [*] Waiting for messages. To exit press CTRL+C' 32 channel.start_consuming()
send1.py
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 import pika 4 import random 5 6 hostname = '192.168.1.133' 7 parameters = pika.ConnectionParameters(hostname) 8 connection = pika.BlockingConnection(parameters) 9 10 # 创建通道 11 channel = connection.channel() 12 # 如果rabbitmq自身挂掉的话,那么任务会丢失。所以需要将任务持久化存储起来,声明持久化存储: 13 channel.queue_declare(queue='task_queue', durable=True) 14 15 number = random.randint(1, 1000) 16 message = 'hello world:%s' % number 17 18 # 在发送任务的时候,用delivery_mode=2来标记任务为持久化存储: 19 channel.basic_publish(exchange='', 20 routing_key='task_queue', 21 body=message, 22 properties=pika.BasicProperties( 23 delivery_mode=2, 24 )) 25 print " [x] Sent %r" % (message,) 26 connection.close()
示例如下:
首先启动三个终端,两个先执行receive1.py,第三个多次执行rend1.py:
终端3:
此时分配三个任务,33分配给worker1,170分配给worker2,262分配给worker1
终端1:
worker1完成任务33后,开始任务262,我们在任务完成前使用(CRTL+C)使worker1挂掉
终端2:
worker2完成任务170,本来没有任务,但是worker1挂掉,此时接收他的任务262
原文地址:https://www.cnblogs.com/Hale-wang/p/11821157.html
- CentOS环境下Docker私有仓库搭建
- spark三种连接join
- 用firebug给firefox添加信任链接
- Elasticsearch索引别名、Filtered索引别名、Template
- 追本溯源:Oracle 只读表空间的探索实践
- firefox查看微信公众平台的数据分析时就出现不信任链接怎么办?
- spark-streaming集成Kafka处理实时数据
- 使用spark与MySQL进行数据交互的方法
- python分布式环境下的限流器
- commons-pool与commons-pool2连接池(Hadoop连接池)
- Glusterfs 文件系统
- 时过境迁:Oracle跨平台迁移之XTTS方案与实践
- Kazoo Python Zookeeper 选主
- Linux Redis集群搭建与集群客户端实现
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- redis学习(十一)
- 互联网软件常见开发方法
- 利用Vue实现简易tab切换效果
- 第3天:最近笔试编程题汇总
- 设计模式学习(四)-UML中的类图及类图之间的关系
- 数据结构与算法(一)——学习工具的推荐
- sparc v8架构的异常处理
- 设计模式(五)-工厂方法模式
- html 的div或其他元素监听 resize事件不生效的解决办法
- 基于docker部署skywalking实现全链路监控
- 数据结构与算法(二)——十大排序算法
- VUE项目使用.env文件配置全局环境变量
- 设计模式学习(六)-抽象工厂模式
- 彻底完美解决安卓苹果手机点击输入框网页页面自动放大缩小
- 第22天:NLP实战(六)——基于PaddleHub的疫情期间网民情绪识别