Celery-分布式任务队列学习笔记

时间:2022-07-25
本文章向大家介绍Celery-分布式任务队列学习笔记,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。 它是一个专注于实时处理的任务队列,同时也支持任务调度。 以上是celery自己官网的介绍

celery的应用场景很广泛

  • 处理异步任务
  • 任务调度
  • 处理定时任务
  • 分布式调度

好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。

Celery

安装

安装Celery

推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装

$ pip install celery

安装消息中间件

Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件

你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ

安装Redis

在Ubuntu系统下使用apt-get命令就可以

$ sudo apt-get install redis-server

如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可

$ pip install redis

能出现以下结果即为成功

redis 127.0.0.1:6379>

其他的redis知识这里不左介绍,如果有兴趣,可以自行了解

如果你使用RabbitMQ,也请安装RabbitMQ

安装RabbitMQ

$ sudo apt-get install rabbitmq-server

使用Celery

简单直接使用

可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

我这里使用了redis作为中间件,这是可以按自己的习惯替换的

由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些, 但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。

单独配置配置文件

比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

还有一种同一设置配置的方式,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先需要用以上方法批量更新配置文件。

在应用上使用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动celery只需要在proj同级目录下:

$ celery -A proj worker -l info

在django中使用celery

我们的django的项目的目录结构一般如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中使用celery,我们首先需要在django中配置celery

我们需要在与工程名同名的子文件夹中添加celery.py文件 在本例中也就是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后我们需要在同级目录下的init.py文件中配置如下内容 proj/proj/init.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

可能的一个tasks.py文件内容如下: myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器可以让你创建task不需要app实体

在需要的地方调用相关任务即可,例如在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别

启动web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

然后访问浏览器就可以在启动celery的终端中看到输出

扩展

  • 如果你的项目需要在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
$ pip install django-celery-beat 

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
INSTALLED_APPS = (     ...,     'django_celery_beat', ) 
  1. 同步一下数据库
$ python manage.py migrate 
  1. 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 

然后在就可以admin界面看到了。

  • 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
  1. 使用pip安装django-celery-results
$ pip install django-celery-results 

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
INSTALLED_APPS = (     ...,     'django_celery_results', ) 
  1. 同步一下数据库
$ python manage.py migrate django_celery_results 
  1. 配置后端,在settings.py中配置
# 使用数据库作为结果后端 CELERY_RESULT_BACKEND = 'django-db'  # 使用缓存作为结果后端 CELERY_RESULT_BACKEND = 'django-cache' 

基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档

注:

  • 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个人看法,如有错误或建议请及时联系我