分布式任务队列Celery的使用
一、简介
Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:
- 简单:熟悉celery的工作流程后,配置使用简单
- 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活:几乎celery的各个组件都可以被扩展及自定制
应用场景举例:
1.web应用:当用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站的并发以及用户的体验感。
2.任务场景:比如在运维场景下需要批量在几百台机器执行某些命令或者任务,此时Celery可以轻松搞定。
3.定时任务:向定时导数据报表、定时发送通知类似场景,虽然Linux的计划任务可以帮我实现,但是非常不利于管理,而Celery可以提供管理接口和丰富的API。
二、架构&工作原理
Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend),如下图:
工作原理:
- 任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;
- 任务执行单元Worker实时监视消息队列获取队列中的任务执行;
- Woker执行完任务后将结果保存在Backend中;
消息中间件Broker
消息中间件Broker官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ。
任务执行单元Worker
Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。
结果存储Backend
Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。
三、安装使用
这里我使用的redis作为消息中间件,redis安装可以参考https://www.cnblogs.com/wdliu/p/9360286.html。
Celery安装:
pip3 install celery
简单使用
目录结构:
project/ ├── __init__.py ├── config.py └── tasks.py
各目录文件说明:
__init__.py:初始化Celery以及加载配置文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from celery import Celery app = Celery('project') # 创建 Celery 实例 app.config_from_object('project.config') # 加载配置模块
config.py: Celery相关配置文件,更多配置参考:http://docs.celeryproject.org/en/latest/userguide/configuration.html
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 CELERY_TIMEZONE='Asia/Shanghai' # 时区配置 CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个 'project.tasks', )
tasks.py :任务定义文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def show_name(name):
启动Worker:
celery worker -A project -l debug
各个参数含义:
worker: 代表第启动的角色是work当然还有beat等其他角色;
-A :项目路径,这里我的目录是project
-l:启动的日志级别,更多参数使用celery --help查看
查看日志输出,会发现我们定义的任务,以及相关配置:
虽然启动了worker,但是我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果:
AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:
- state: 返回任务状态;
- task_id: 返回任务id;
- result: 返回任务结果,同get()方法;
- ready(): 判断任务是否以及有结果,有结果为True,否则False;
- info(): 获取任务信息,默认为结果;
- wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
- successfu(): 判断任务是否成功,成功为True,否则为False;
四、进阶使用
对于普通的任务来说可能满足不了我们的任务需求,所以还需要了解一些进阶用法,Celery提供了诸多调度方式,例如任务编排、根据任务状态执行不同的操作、重试机制等,以下会对常用高阶用法进行讲述。
定时任务&计划任务
Celery的提供的定时任务主要靠schedules来完成,通过beat组件周期性将任务发送给woker执行。在示例中,新建文件period_task.py,并添加任务到配置文件中:
period_task.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒执行add sender.add_periodic_task( crontab(hour=16, minute=56, day_of_week=1), #每周一下午四点五十六执行sayhai sayhi.s('wd'),name='say_hi' ) @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
config.py
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 CELERY_TIMEZONE='Asia/Shanghai' # 时区配置 CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个 'project.tasks', 'project.period_task', #定时任务 )
taskproj/taskproj/__init__.py:
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ['celery_app']
我们可以观察worker日志:
还可以通过配置文件方式指定定时和计划任务,此时的配置文件如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 CELERY_TIMEZONE='Asia/Shanghai' # 时区配置 CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个 'project.tasks', 'project.period_task', ) app.conf.beat_schedule = { 'period_add_task': { # 计划任务 'task': 'project.period_task.add', #任务路径 'schedule': crontab(hour=18, minute=16, day_of_week=1), 'args': (3, 4), }, 'add-every-30-seconds': { # 每10秒执行 'task': 'project.period_task.sayhi', #任务路径 'schedule': 10.0, 'args': ('wd',) }, }
此时的period_task.py只需要注册到woker中就行了,如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
原文地址:https://www.cnblogs.com/harryblog/p/11607597.html
- Nginx 配置文件安全分析
- 一个写SQL语句的便利工具
- OpenFlow网络中的路由服务
- Python中萌新不知道的小魔法(一)
- 基于Scrapy的全球最大成人网站PornHub爬虫
- Python标准库笔记(7) — copy模块
- Python项目实战——开发网易云音乐插件
- 将已有项目代码通过命令行方式上传到github,简易傻瓜教程(图文)将已有项目代码通过命令行方式上传到github,傻瓜教程(图文)1. 创建一个github项目2. 在Repository name
- mac执行git命令出现xcrun: error: invalid active developer path解决方法
- centos修改主机名
- SSH免密登录,RSA认证登录
- Zookeeper安装部署调试命令
- Facebook 直播是如何承受海量压力的?
- 将 Redis 作为图数据库
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- JAVABEAN EJB POJO区别
- @Component和@Bean以及@Autowired、@Resource
- mybatis generator and 和or条件
- 『.Net反射』ILGenerator.Emit 动态MSIL 编程
- Spring通过XML配置文件以及通过注解形式来AOP 来实现前置,后置,环绕,异常通知
- 切面编程(环绕通知与前后置通知区别)
- Spring在代码中获取bean的几种方式
- Spring 一个接口多个实现类怎么注入
- ASP.NET MVC Controller的激活
- js 逗号表达式
- spring动态调用方法
- Spring AOP动态代理原理与实现方式
- 基于注解多数据源解决方案
- Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
- 你需要实现一个高效的缓存,它允许多个用户读,但只允许一个用户写,以此来保持它的完整性,你会怎样去实现它?