让运维更简单的7种定时任务实现方式

时间:2022-07-22
本文章向大家介绍让运维更简单的7种定时任务实现方式,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

什么是自动化任务

自动化任务就是将一系列可以手动去实现的操作的集合,在特定的条件下自动去执行,比较常见的场景:

  • 运行监控报警,监测要素的变化,触发阈值条件时执行一系列通知邮件、短信的发送,也可以是更复杂的触发
  • 自动化田间灌溉控制,对土壤含水量、天气预报和植物生长周期分析需水量来调节灌溉水量
  • 自动驾驶,根据行驶路线、实时路况信息分析进行自动的驾驶
  • 自动化测试,对目标系统有已知的预期,按照配置的流程去验证每一个功能的结果
  • 定时爬虫,比如我们需要长期的爬取招聘信息,来进行数据分析,就需要定时的去爬取某段时间的增量信息
  • 还有些统计类问题,比如我们需要对请求的IP进行判断是否是一个坏的IP(同一个IP下有一定比例的用户从事违规操作时,判定为非法IP),我们可以在用户请求是实时去计算坏IP的列表,但当用户量非常大的时候就不是一个很好的选择了,我们可以选择定期执行这个坏IP列表的计算任务,因为对于不同用户的坏IP列表是同一个

以上这些任务都可以认为是自动化任务,我们今天主要针对其中的定时任务进行讨论。

定时任务

定时任务的实现方式又有N多种,我就了解的几种实现举例说明:

  • Windows操作系统的任务计划程序
  • Linux操作系统的 Crontab 定时任务
  • Python的 循环+sleep
  • Python的 Timer
  • Python的 schedule
  • Python的 APScheduler
  • Python的 Celery

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间

1. Windows任务计划程序

当前版本win10

  • 右键“计算机”->选择“管理”;
  • 然后就可以打开 “计算机管理”界面;
  • 在界面的左侧有点击“系统工具”->任务计划程序 ;
  • 可以看到右侧有“操作”“任务计划程序”; 有创建基本任务、创建任务、导入任务;

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间

任务1:由于任务计划程序最小执行周期是“5分钟”,是在高级设置中设置重复任务间隔,并选择持续时间“无限制” 无法实现30秒的打印 任务2:设置中选择一次,开始时间设置2020-06-21 00:00:00 (具体时刻)可以满足需求

2. Linux Crontab定时任务

Crontab定时任务,在每个任务周期中执行一次特定任务,固无法控制执行次数 时间格式如下:

*    *    *    *    *  program
-    -    -    -    -    -
|    |    |    |    |    |----- 要执行的程序
|    |    |    |    +----- 星期中星期几 (0 - 7) (星期天 为0) 默认*
|    |    |    +---------- 月份 (1 - 12) 默认*
|    |    +--------------- 一个月中的第几天 (1 - 31) 默认*
|    +-------------------- 小时 (0 - 23) 默认*
+------------------------- 分钟 (0 - 59) 默认*

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间;
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间 ;

任务1:* * * * * echo $(date +%F%n%T)含义每分钟打印当前时间,最小执行周期每分钟 无法实现30秒的打印 任务2:* * 21 6 * echo $(date +%F%n%T)含义每年的6月21日打印当前时间 无法实现只执行一次的打印

3. Python 循环+sleep

以下代码使用的 python3.7

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间;
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间 ;

任务1: 可以实现30秒的打印

借助循环和等待时间来实现任务执行,但是在一个线程组阻塞时不能进行其他操作,可以使用协程优化

import time
import datetime

# 每30秒执行
while 1:
    time.sleep(30)
    print(datetime.datetime.now())

任务2: 可以实现特定时间的打印

每次循环判断当前时刻和目标是否一致,需要注意执行一次就需要跳出点前时刻的判断,不然在同一时刻会重复打印很多次

import time
import datetim

# 特定时间执行
while 1:
    now = datetime.datetime.now()
    if now.year == 2020 and now.month == 6 and now.day == 21:
        print(now)
        break
    time.sleep(1)

4. 线程Timer

以下代码使用的 python3.7

看下源码的描述

class Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=None, kwargs=None)
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

在指定的秒数后调用函数 因为创建的定时是异步执行,所以不存在等待顺序执行问题。

  • 创建定时器 Timer(interval, function, args=None, kwargs=None);
  • 取消定时器cancel();
  • 使用线程方式执行start();
  • 等待线程执行结束join(self, timeout=None);
  • 定时器只能执行一次,如果需要重复执行,需要重新添加任务;

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间

任务1: 可以实现30秒的打印 虽然是可以实现每30秒一次的打印,但是定时器只能执行一次,如果需要重复添加任务,创建线程又比较浪费系统资源

from threading import Timer
import datetime
import time

def run_time(seconds, flag=False):
    t = Timer(seconds, run_time, [seconds, True])
    t.start()
    if flag:
        print(datetime.datetime.now())
        time.sleep(3)
    t.join()

run_time(30)

任务2: 实现特定时间的打印 需要计算当前时间到目标时间间的秒数来实现

from threading import Timer
import datetime
import time

def run_time(seconds, flag=False):
    if seconds > 0:
        t = Timer(seconds, run_time, [seconds, True])
        t.start()
        if flag:
            print(datetime.datetime.now())
            time.sleep(3)
        t.join()

def get_seconds(run_date):
    start = datetime.datetime.now()
    end = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M:%S")
    if end > start:
        return (end-start).seconds
    else:
        return -1

seconds = get_seconds("2020-06-21 00:00:00")
run_time(seconds)

5. schedule模块

以下代码使用的 python3.7

首先看下源码的描述

Python job scheduling for humans.

github.com/dbader/schedule

An in-process scheduler for periodic jobs that uses the builder pattern
for configuration. Schedule lets you run Python functions (or any other
callable) periodically at pre-determined intervals using a simple,
human-friendly syntax.

Inspired by Addam Wiggins' article "Rethinking Cron" [1] and the
"clockwork" Ruby module [2][3].

Features:
    - A simple to use API for scheduling jobs.
    - Very lightweight and no external dependencies.
    - Excellent test coverage.
    - Tested on Python 2.7, 3.5 and 3.6

Usage:
    >>> import schedule
    >>> import time

    >>> def job(message='stuff'):
    >>>     print("I'm working on:", message)

    >>> schedule.every(10).minutes.do(job)
    >>> schedule.every(5).to(10).days.do(job)
    >>> schedule.every().hour.do(job, message='things')
    >>> schedule.every().day.at("10:30").do(job)

    >>> while True:
    >>>     schedule.run_pending()
    >>>     time.sleep(1)

任务1: 可以实现30秒的打印 可以实现每30秒一次的打印,还可以同时创建多个定时任务同时去执行,由于执行过程是顺序执行,pn休眠2秒,循环任务查询休眠1秒,会存在实际间隔时间并不是设定的30秒,当工作任务回非常耗时就会影响其他任务的触发时间

import datetime
import schedule
import time

def pn():
    print(datetime.datetime.now())
    time.sleep(2)

schedule.clear()
schedule.every(30).seconds.do(pn)

while 1:
    schedule.run_pending()
    time.sleep(1)

任务2: 实现特定时间的打印 需要计算当前时间到目标时间间的秒数来实现

import datetime
import schedule
import time

def pn():
    print(datetime.datetime.now())
    time.sleep(2)

def get_seconds(run_date):
    start = datetime.datetime.now()
    end = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M:%S")
    if end > start:
        return (end - start).seconds
    else:
        return -1

schedule.clear()
seconds = get_seconds("2020-06-21 00:00:00")
if seconds > 0:
    schedule.every(seconds).seconds.do(pn)

while 1:
    schedule.run_pending()
    time.sleep(1)

6. APScheduler定时任务框架

以下代码使用的 python3.7

APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务, 可以基于日期、时间间隔,及类似于Linux上的定时任务crontab类型的定时任务; 该该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。

  • triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器;
  • job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,支持存储到MongoDB,Redis数据库中;
  • executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行;
  • schedulers(调度器):调度器是将其它部分联系在一起,对使用者提供接口,进行任务添加,设置,删除。 add_job 创建任务

创建任务支持三种类型触发器dateintervalcron

date触发器
  • run_date 具体的日期执行,时间参数run_date,可以是dete/time格式的字符串,当不写时分秒是 默认是 00:00:00
interval触发器
  • weeks 参数weeks 每n周后执行,int类型
  • days 参数days每n天后执行,int类型
  • hours: 参数hours每n小时后执行,int类型
  • minutes:参数minutes每n分钟后执行,int类型
  • seconds: 参数seconds每n秒后执行,int类型
  • start_date 参数start_date,可以是dete/time格式的字符串,控制执行循环的时间范围
  • end_date 参数 end_date,可以是dete/time格式的字符串,控制执行循环的时间范围
cron触发器

使用方式与 Crontab定时任务类似

  • year int类型 取值范围1970~9999, 默认*
  • month int类型 取值范围1~12, 默认1
  • day int类型 取值范围1~31, 默认1
  • week 一年中的第几周,int类型 取值范围1~53, 默认*
  • day_of_week一周中的第几天,int类型 取值范围0~6, 默认*
  • hour int类型 取值范围0~23, 默认0
  • minute int类型 取值范围0~59, 默认0
  • second int类型 取值范围0~59, 默认0

任务1: 可以实现30s的打印 使用interval类型触发器添加定时任务 可以实现每30秒一次的打印,还可以同时创建多个定时任务同时去执行,由于执行过程是顺序执行,pn休眠3秒,会存在实际间隔时间并不是设定的30秒,当工作任务回非常耗时就会影响其他任务的触发时间

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time


def pn():
    print(datetime.datetime.now())
    time.sleep(3)

def run_time(seconds):
    scheduler = BlockingScheduler()
    scheduler.add_job(pn, 'interval', seconds=seconds, id=str(time.time()))
    scheduler.start()

run_time(30)

任务2: 可以实现特定时间的打印 使用date类型或cron类型触发器添加定时任务

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time

def pn():
    print(datetime.datetime.now())
    time.sleep(3)

def run_time():
    scheduler = BlockingScheduler()
    scheduler.add_job(pn, 'cron', year=2020, month=6, day=21, id=str(time.time()))
    scheduler.add_job(pn, 'date', run_date="2020-06-21", id=str(time.time()))
    scheduler.start()

run_time()

7. Celery分布式任务调度的框架

Celery是实时处理的异步任务调度的框架,需要借助中间件完成调度任务,也可以很好的用来做定时任务服务。

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,由三部分组成:

  • 消息中间件(message broker):Celery本身不提供消息服务,依赖于中间件RabbitMQ, Redis等
  • 任务执行单元(worker): 是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
  • 任务执行结果存储(task result store): 用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等

以下代码使用版本 python2.7 Celery 3.2

目录结构

celery_app_init_.py
# -*- coding: utf-8 -*-
# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
# 该代码中,名字是不一样的,最好也要不一样
from __future__ import absolute_import
from celery import Celery

app = Celery('tasks')
app.config_from_object('celery_app.celeryconfig')
celery_appceleryconfig.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta

# 使用redis存储任务队列
BROKER_URL = 'redis://127.0.0.1:6379/1'
# 使用redis存储结果
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# 时区设置
CELERY_TIMEZONE = 'Asia/Shanghai'

# 导入任务所在文件
CELERY_IMPORTS = [
    'celery_app.celery_task.tasks',
]

# 需要执行任务的配置
CELERYBEAT_SCHEDULE = {
    'np-seconds-10': {
        # 具体需要执行的函数
        # 该函数必须要使用@app.task装饰
        'task': 'celery_app.celery_task.tasks.pn',
        # 定时时间
        # 每30秒分钟执行一次
        'schedule': timedelta(seconds=30),
        'args': ()
    },
    'np-month-day': {
        'task': 'celery_app.celery_task.tasks.pn',
        # 每年6月21日执行一次
        'schedule': crontab(day_of_month=21, month_of_year=6),
        'args': ()
    },
}
celery_appcelery_tasktasks.py
# -*- coding: utf-8 -*-
from .. import app
import datetime
import time


@app.task
def pn():
    print(datetime.datetime.now())
    time.sleep(3)

我以两个场景来设计实例代码

  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间

任务1: 可以实现30秒的打印

    'np-seconds-10': {
        # 具体需要执行的函数
        # 该函数必须要使用@app.task装饰
        'task': 'celery_app.celery_task.tasks.pn',
        # 定时时间
        # 每30秒分钟执行一次
        'schedule': timedelta(seconds=30),
        'args': ()
    },

任务2: 不可以实现指定时间的打印 一般指定时间也没有意义,每年的某个月日的任务比较多 使用方式与 Crontab定时任务类似

    'np-month-day': {
        'task': 'celery_app.celery_task.tasks.pn',
        # 每年6月21日执行一次
        'schedule': crontab(day_of_month=21, month_of_year=6),
        'args': ()
    },

启动定时任务

由于是分布式服务,我们需要启动 中间件,触发器和处理模块

启动Redis

celery_app 统计目录下执行

命令行执行命令来启动 发布任务

celery -A celery_app beat

命令行执行命令来启动 执行任务

celery worker -A celery_app -l info

总结

简单总结上面七种定时任务实现:

  • Windows任务计划程序合适重启服务,定时执行bat脚本,清理日志等操作;
  • Linux Crontab定时任务适合执行shell脚本或者一系列Linux命令,大部分任务都可以使用,但对于部分项目需要在用户界面添加任务的情况支持不友善;
  • 循环+sleep方式适合简单测试;
  • Timer可以实现定时任务,但是对定点任务来说,需要检查当前时间点;
  • schedule可以定点定时执行,但是需要在循环中检测任务,而且存在阻塞,执行时间不能预期;
  • APScheduler框架功能相对完善,可以满足大多数定时任务;
  • Celery框架分布式调度,可与Django、Flask等Web框架完美结合使用,对于大任务量的服务支持比较好。

end