celery

时间:2019-03-13
本文章向大家介绍celery,主要包括celery使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Celery介绍

"""
1.application(task producer) 生产者
2.broker(task queue) 任务队列  可以使用redis
3.celery beat (task scheduler) 可以是任务调度器
3.worker(task consumer) 也可以是普通的worker,
4.也可以使多个worker,worker可以结果result存储在数据库当中
"""

# 使用场景
1.异步任务,耗时操作交给celery,比如发送短信/邮件,消息推送,音视频处理等等
2.定时任务,类似于crontab,比如每日数据统计

安装配置

首先
安装reids 数据库

然后

>>> pip install celery
#  amqp-2.3.2 billiard-3.5.0.5 celery-4.2.1 kombu-4.2.2 pytz-2018.7 vine-1.1.4
>>> pip install celery[redis]
#  redis-3.0.1
#   消息中间件 RabbitMQ/Redis

------------------------------------------------


# 安装redis  访问redis.io 官方网站
"""
$ wget http://download.redis.io/releases/redis-5.0.3.tar.gz
$ tar xzf redis-5.0.3.tar.gz
$ cd redis-5.0.3
$ make
"""
# make install

# /root/redis-5.0.3/src/redis-server
# 运行redis
# src/redis-server
# 交互
# src/redis-cli

celery基本使用

普通

# app.py 
# 正常使用
import time
def add(x,y):
    print("enter call func....")
    time.sleep(4)
    return x + y

if __name__ == "__main__":
    print("start task......")
    result= add(2,8)
    print("end task......")
    print(result)

celery异步调用

# tasks.py
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2' # 存储结果用的
app = Celery("my_task",broker=broker,backend=backend)

@app.task
def add(x,y):
    print("enter call func....")
    time.sleep(4)
    return x + y

# app.py 中调用
import time
if __name__ == "__main__":
    print("start task......")
    result= add.delay(2,8) #异步调用
    print("end task......")
    print(result)
"""
但是这个只是将任务发出去了,但是worker 并没有启动,也不会处理任务和返回结果
"""

启动worker

celery worker -A tasks -l INFO
# -A tasks
# -l INFO 指定日志级别

API

from tasks import add
result= add.delay(2,6) #异步调用
result.ready() #任务是否执行完毕,不会阻塞,只是判断
result.get() # 拿到任务执行结果,会阻塞

celery项目配置


celery_app 文件夹 
.
└── __init__.py
├── celeryconfig.py
└── task2.py
└── task2.py
# __init__.py
from celery import Celery

app=Celery("demo")
app.config_from_object("celery_app.celeryconfig")#通过Celery实例 加载配置模块

#celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE="Asia/Shanghai"
# 导入指定的任务模块
CELERY_IMPORTS=(
	'celery_app.task1',
    'celery_app.task2',
)


# task1.py
import time
from celery_app import app

@app.task
def add(x,y):
    time.sleep(2)
    return x+y


# task2.py
import time
from celery_app import app

@app.task
def multiply(x,y):
    time.sleep(3)
    return x*xy



# 启动
celery worker -A celery_app -l INFO

# app.py 调用
from celery_app import task1
from celery_app import task2

task1.add.delay(2,4)
# task1.add.apply_async(2.4) 这种方法也可以
task2.multiply.delay(4,5)
# celery worker -A celery_app -l INFO
# python app.py 看一下结果

定时任务

# celeryconfig.py 增加
from datetime import timedelta
from celery.schedules import crontab

CELERY_TIMEZONE="Asia/Shanghai"

CELERY_SCHEDULE = {
    'task1':{
        'task':'celery_app.task1.add',
        'schedule':timedelta(seconds=10),
        'args':(2,8)
    }
    
    'task2':{
        'task':'celery_app.task1.multiply',
        'schedule':crontab(hour=19,minute=28),#每天19点28分执行
        'args':(4,5)
    }
}

# celery beat -A celery_app -l INFO    #启动beat
# celery worker -A celery_app -l INFO  #启动worker 
# 4.1.0 有bug,回退
# celery==4.0.2
# 一条命令启动worker beat
# celery -B -A celery_app worker -l INFO

django-celery

>>> pip install django-celery
>>> python manage.py celery worker -Q queue

# 新建项目中创建一个 celeryconfig.py
# 新建app sourse,在app中创建tasks.py
import djcelery
djcelery.setup_loader()
# 设置队列,防止worker和beat混在一起
CELERY_QUEUES = {
    'beat_tasks':{
        'exchange':'beat_tasks',
        'exchange_type':'direct',
        'binding_key':'beat_tasks'
    },
    'worker_tasks':{
        'exchange':'worker_tasks',
        'exchange_type':'direct',
        'binding_key':'worker_tasks'
    },
}
# 如果不指定,则默认为work_queue
CELERY_DEFAULT_QUEUE = 'work_queue'

CELERY_IMPORTS=(
	'course.tasks',
)
# 有些情况下 可以防止死锁
CELERYD_FORCE_EXECV = True

# 设置并发的worker数量
CELERYD_CONCURRENCY = 4
# 任务失败,允许重试
CELERY_ACKS_LATE = True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100
#单个任务的最大允许时间
CELERYD_TASK_TIME_LIMIT=12*30

-------------------------------------------------

# 在django settings.py 设置
from .celeryconfig import *
BROKER_BACKEND='redis'
BROKER_URL='redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

# 在APPS中 注册 加入 'djcelery'
python manage.py celery worker -Q queue
# 补充 celeryconfig.py 中加入定时任务并指定队列

from datetime import timedelta
CELERY_SCHEDULE = {
    'task1':{
        'task':'celery_app.task1.add',
        'schedule':timedelta(seconds=10),
        'args':(2,8),
        'options':{
            'queue':'beat_tasks',
        }
    }
}
# 启动
# python manage.py celery beat -l INFO
# 手动指定队列
CourseTask.apply_async(args=('hello',),queue='work_queue')

监控工具

# 安装
>>> pip install flower
# 启动的话,
>>> celery flower --address=0.0.0.0 --port=5555 --broker=xxxx --basic_auth=sun:sun 
>>> python manage.py celery flower 
#--basic_auth=sun:sun 是认证

进程管理工具 supervisor

安装配置

"""
Install : pip install supervisor
Start : supervisord -c /etc/supervisord.conf
Tool : supervisorctl
"""

# 创建一个conf目录
mkdir conf
# 重定向
echco_supervisord_conf > conf/supervisord.conf
# /root/anaconda3/envs/supv/bin/echo_supervisord_conf > conf/supervisord.conf

supervisord.conf

supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock   ; the path to the socket file
;chmod=0700                 ; socket file mode (default 0700)
;chown=nobody:nogroup       ; socket file uid:gid owner
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)

; 这个是web端的管理见面,想要开启去掉分号
[inet_http_server]         ; inet (TCP) server disabled by default
port=127.0.0.1:9001        ; ip_address:port specifier, *:port for all iface
username=admin              ; default is no username (open server)
password=1234              ; default is no password (open server)

[supervisord]
logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false               ; start in foreground if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
;umask=022                   ; process file creation umask; default 022
;user=chrism                 ; default is current user, required if root
;identifier=supervisor       ; supervisord identifier, default is 'supervisor'
;directory=/tmp              ; default is not to cd during start
;nocleanup=true              ; don't clean up tempfiles at start; default false
;childlogdir=/tmp            ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value"     ; key value pairs to add to environment
;strip_ansi=false            ; strip ansi escape codes in logs; def. false

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface


; 将serverurl=http://127.0.0.1:9001 开启
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris              ; should be same as in [*_http_server] if set
;password=123                ; should be same as in [*_http_server] if set

; 配置进程配置项的
;[program:theprogramname]
;command=/bin/cat              ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)

; 把其他配置文件包含进来,开启
[include]
files = *.ini

xxx.ini

xxx.ini
[program:django-lip-celery-worker]
command=python manage.py celery worker -l INFO #启动命令
directory=/root/工作目录 
enviroment=PATH="xxxx/bin" #环境变量
stdout_logfile=/root/工作目录/logs/celery_worker.log
stderr_logfile=/root/工作目录/logs/celery_worker.log
autostart=true #自动启动
autorestart=true # 自动重启
startsecs=10
stopwatisecs=60
priority=998  #优先级,优先级大的先启动

使用

supervisord -c conf/supervisord.conf

supervisorctl
supervisor>
# version  statis update