celery
时间:2019-03-13
本文章向大家介绍celery,主要包括celery使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Celery介绍
"""
1.application(task producer) 生产者
2.broker(task queue) 任务队列 可以使用redis
3.celery beat (task scheduler) 可以是任务调度器
3.worker(task consumer) 也可以是普通的worker,
4.也可以使多个worker,worker可以结果result存储在数据库当中
"""
# 使用场景
1.异步任务,耗时操作交给celery,比如发送短信/邮件,消息推送,音视频处理等等
2.定时任务,类似于crontab,比如每日数据统计
安装配置
首先
安装reids 数据库
然后
>>> pip install celery
# amqp-2.3.2 billiard-3.5.0.5 celery-4.2.1 kombu-4.2.2 pytz-2018.7 vine-1.1.4
>>> pip install celery[redis]
# redis-3.0.1
# 消息中间件 RabbitMQ/Redis
------------------------------------------------
# 安装redis 访问redis.io 官方网站
"""
$ wget http://download.redis.io/releases/redis-5.0.3.tar.gz
$ tar xzf redis-5.0.3.tar.gz
$ cd redis-5.0.3
$ make
"""
# make install
# /root/redis-5.0.3/src/redis-server
# 运行redis
# src/redis-server
# 交互
# src/redis-cli
celery基本使用
普通
# app.py
# 正常使用
import time
def add(x,y):
print("enter call func....")
time.sleep(4)
return x + y
if __name__ == "__main__":
print("start task......")
result= add(2,8)
print("end task......")
print(result)
celery异步调用
# tasks.py
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2' # 存储结果用的
app = Celery("my_task",broker=broker,backend=backend)
@app.task
def add(x,y):
print("enter call func....")
time.sleep(4)
return x + y
# app.py 中调用
import time
if __name__ == "__main__":
print("start task......")
result= add.delay(2,8) #异步调用
print("end task......")
print(result)
"""
但是这个只是将任务发出去了,但是worker 并没有启动,也不会处理任务和返回结果
"""
启动worker
celery worker -A tasks -l INFO
# -A tasks
# -l INFO 指定日志级别
API
from tasks import add
result= add.delay(2,6) #异步调用
result.ready() #任务是否执行完毕,不会阻塞,只是判断
result.get() # 拿到任务执行结果,会阻塞
celery项目配置
celery_app 文件夹
.
└── __init__.py
├── celeryconfig.py
└── task2.py
└── task2.py
# __init__.py
from celery import Celery
app=Celery("demo")
app.config_from_object("celery_app.celeryconfig")#通过Celery实例 加载配置模块
#celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE="Asia/Shanghai"
# 导入指定的任务模块
CELERY_IMPORTS=(
'celery_app.task1',
'celery_app.task2',
)
# task1.py
import time
from celery_app import app
@app.task
def add(x,y):
time.sleep(2)
return x+y
# task2.py
import time
from celery_app import app
@app.task
def multiply(x,y):
time.sleep(3)
return x*xy
# 启动
celery worker -A celery_app -l INFO
# app.py 调用
from celery_app import task1
from celery_app import task2
task1.add.delay(2,4)
# task1.add.apply_async(2.4) 这种方法也可以
task2.multiply.delay(4,5)
# celery worker -A celery_app -l INFO
# python app.py 看一下结果
定时任务
# celeryconfig.py 增加
from datetime import timedelta
from celery.schedules import crontab
CELERY_TIMEZONE="Asia/Shanghai"
CELERY_SCHEDULE = {
'task1':{
'task':'celery_app.task1.add',
'schedule':timedelta(seconds=10),
'args':(2,8)
}
'task2':{
'task':'celery_app.task1.multiply',
'schedule':crontab(hour=19,minute=28),#每天19点28分执行
'args':(4,5)
}
}
# celery beat -A celery_app -l INFO #启动beat
# celery worker -A celery_app -l INFO #启动worker
# 4.1.0 有bug,回退
# celery==4.0.2
# 一条命令启动worker beat
# celery -B -A celery_app worker -l INFO
django-celery
>>> pip install django-celery
>>> python manage.py celery worker -Q queue
# 新建项目中创建一个 celeryconfig.py
# 新建app sourse,在app中创建tasks.py
import djcelery
djcelery.setup_loader()
# 设置队列,防止worker和beat混在一起
CELERY_QUEUES = {
'beat_tasks':{
'exchange':'beat_tasks',
'exchange_type':'direct',
'binding_key':'beat_tasks'
},
'worker_tasks':{
'exchange':'worker_tasks',
'exchange_type':'direct',
'binding_key':'worker_tasks'
},
}
# 如果不指定,则默认为work_queue
CELERY_DEFAULT_QUEUE = 'work_queue'
CELERY_IMPORTS=(
'course.tasks',
)
# 有些情况下 可以防止死锁
CELERYD_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 4
# 任务失败,允许重试
CELERY_ACKS_LATE = True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100
#单个任务的最大允许时间
CELERYD_TASK_TIME_LIMIT=12*30
-------------------------------------------------
# 在django settings.py 设置
from .celeryconfig import *
BROKER_BACKEND='redis'
BROKER_URL='redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 在APPS中 注册 加入 'djcelery'
python manage.py celery worker -Q queue
# 补充 celeryconfig.py 中加入定时任务并指定队列
from datetime import timedelta
CELERY_SCHEDULE = {
'task1':{
'task':'celery_app.task1.add',
'schedule':timedelta(seconds=10),
'args':(2,8),
'options':{
'queue':'beat_tasks',
}
}
}
# 启动
# python manage.py celery beat -l INFO
# 手动指定队列
CourseTask.apply_async(args=('hello',),queue='work_queue')
监控工具
# 安装
>>> pip install flower
# 启动的话,
>>> celery flower --address=0.0.0.0 --port=5555 --broker=xxxx --basic_auth=sun:sun
>>> python manage.py celery flower
#--basic_auth=sun:sun 是认证
进程管理工具 supervisor
安装配置
"""
Install : pip install supervisor
Start : supervisord -c /etc/supervisord.conf
Tool : supervisorctl
"""
# 创建一个conf目录
mkdir conf
# 重定向
echco_supervisord_conf > conf/supervisord.conf
# /root/anaconda3/envs/supv/bin/echo_supervisord_conf > conf/supervisord.conf
supervisord.conf
supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock ; the path to the socket file
;chmod=0700 ; socket file mode (default 0700)
;chown=nobody:nogroup ; socket file uid:gid owner
;username=user ; default is no username (open server)
;password=123 ; default is no password (open server)
; 这个是web端的管理见面,想要开启去掉分号
[inet_http_server] ; inet (TCP) server disabled by default
port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface
username=admin ; default is no username (open server)
password=1234 ; default is no password (open server)
[supervisord]
logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
;umask=022 ; process file creation umask; default 022
;user=chrism ; default is current user, required if root
;identifier=supervisor ; supervisord identifier, default is 'supervisor'
;directory=/tmp ; default is not to cd during start
;nocleanup=true ; don't clean up tempfiles at start; default false
;childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value" ; key value pairs to add to environment
;strip_ansi=false ; strip ansi escape codes in logs; def. false
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
; 将serverurl=http://127.0.0.1:9001 开启
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris ; should be same as in [*_http_server] if set
;password=123 ; should be same as in [*_http_server] if set
; 配置进程配置项的
;[program:theprogramname]
;command=/bin/cat ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
; 把其他配置文件包含进来,开启
[include]
files = *.ini
xxx.ini
xxx.ini
[program:django-lip-celery-worker]
command=python manage.py celery worker -l INFO #启动命令
directory=/root/工作目录
enviroment=PATH="xxxx/bin" #环境变量
stdout_logfile=/root/工作目录/logs/celery_worker.log
stderr_logfile=/root/工作目录/logs/celery_worker.log
autostart=true #自动启动
autorestart=true # 自动重启
startsecs=10
stopwatisecs=60
priority=998 #优先级,优先级大的先启动
使用
supervisord -c conf/supervisord.conf
supervisorctl
supervisor>
# version statis update
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Linux创建进程达到65535的方法
- SSH 上传文件及文件夹到linux服务器的方法
- apache tika检测文件是否损坏的方法
- Linux下二进制编译安装MySql centos7的教程
- Linux 6 修改ssh默认远程端口号的操作步骤
- 基于python的Linux系统指定进程性能监控思路详解
- ubuntu下的虚拟环境中安装Django的操作方法
- 详解linux下umask的使用
- Linux下设置每天自动备份数据库的方法
- Linux常用命令之chmod修改文件权限777和754
- 解决CentOS 7升级Python到3.6.6后yum出错问题总结
- Linux下如何挂载磁盘的方法示例
- centos7 PHP环境搭建 GD库 等插件安装方法
- CentOS服务器环境下MySQL主从同步配置方法
- awk命令