django集成celery之callback方式link_error和on_failure

时间:2022-05-02
本文章向大家介绍django集成celery之callback方式link_error和on_failure,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

 在使用django集成celery进行了异步调度任务之后,如果想对失败的任务进行跟踪或者告警,怎么做?

这里提供一个亲测的方法。

1、任务callback

假如你想在任务执行失败的时候,打印错误信息并且发出报警,该怎么搞。有两个方法:

(1)link_error

(2)on_failure/on_success

link_error的方法比较爽,但是我没有亲测过,on_failure的方式,是当任务抛出异常的时候,会触发一些事件,提供给大家代码:

定义一个新类重写Task里的on_success和on_failure方法:

from celery.app.task import Task

class CallbackTask(Task):

    def __init__(self):
        super(CallbackTask, self).__init__()

def on_success(self, retval, task_id, args, kwargs):
        try:
            item_param= json.loads(args[0])
            logger.info('[task_id] %s, [task_type] %s, finished successfully.' % (task_id, item_param.get('task_type')))
        except Exception, ex:
            logger.error(traceback.format_exc())

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        try:
            item_param = json.loads(args[0])
            logger.error(('Task {0} raised exception: {1!r}n{2!r}'.format(
                    task_id, exc, einfo.traceback)))
        except Exception, ex:
            logger.error(traceback.format_exc())

装饰器使用新类作为baseClass

from celery import task
from common.callback import CallbackTask

logger = logging.getLogger(__name__)

@task(base=CallbackTask)
def quota_check(item_param):
    logger.info('start')
    return