Schedule—简单实用的 Python 周期任务调度工具

注意:Schedule主要作用是替代linux系统自带的Crontab命令,用于周期性的执行某个命令。

科普一下任务调度系统

任务调度系统的地位

任务调度系统在数据平台中算是非常核心的组件,相当于指挥部,指挥各个平台上各个组件的运行,并监督任务的运行情况。大数据平台中通常会有很多任务运行,有一个好的任务调度系统可以极大提高效率。

最简单的任务调度系统莫过于linux系统自带的Crontab工具。随着调度任务的增多,相互之间又有着依赖,crontab就远远满足不了开发的需求了。

调度系统架构种类

  • 定时分片类作业调度系统:数据分片,将大任务拆分成小任务,分布式运行。定时准时的触发。例如:Elastic-Job、Easy Scheduler

  • DAG工作流类调度系统:主要解决了任务之间的依赖关系,要有丰富灵活的任务触发机制,例如时间、外部任务完成度等, 注意工作流调度需要注意失败是否会丢失整个工作流。例如:Oozie、Azkaban、Apache Airflow

Schedule与Crontab对比

两者都属于定时周期任务调度, 没有分片功能, 也没有工作流功能

Crontab 具有以下缺点:

  • 不方便执行秒级任务
  • 任务几百上千个时,管理不方便

Schedule 具有以下优点:

  • 解决crontab的缺点,且容纳了crontab的所有基本功能
  • 轻量级、不依赖外部库

基本使用教程

# 基本使用教程
import schedule
import time
# 任务内容
def job1():
    print("I'm working...")
def job2(name):
    print("I'm working..." + name)
# 每十分钟执行任务
schedule.every(10).minutes.do(job1)
# 每个小时执行任务,并在2030-01-01 18:33后停止执行作业
schedule.every().hour.until("2030-01-01 18:33").do(job1)
# 每天的10:30执行任务, 给任务传递参数
schedule.every().day.at("10:30").do(job2, name="longfei") 
# 每个月执行任务
schedule.every().monday.do(job1)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job1)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job1)
# 死循环调用
while True:
    schedule.run_pending()
    time.sleep(1)
# 如果你想只运行一次任务的话,可以这么配
import schedule
import time
def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob # 关键代码
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)
# 标签功能
import schedule
def greet(name):
    print('Hello {}'.format(name))
# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('任务标签1', '标签1')
schedule.every().hour.do(greet, 'John').tag('任务标签2', '标签1')
schedule.every().hour.do(greet, 'Monica').tag('任务标签1', '标签2')
schedule.every().day.do(greet, 'Derek').tag('任务标签3', '标签3')
# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('标签1')
# 取消所有 daily-tasks 标签的任务
schedule.clear('任务标签1')
# 多线程并行执行
import threading
import time
import schedule
# 任务列表
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
# 创建线程
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
# 定时任务
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
# 循环
while True:
    schedule.run_pending()
    time.sleep(1)

开源地址

此处评论已关闭