Python 官方文档:入门教程 => 点击学习
目录一、基本概念1.1、 触发器:triggers1.2、作业存储器:job stores1.3、执行器 executors1.4、调度器 schedulers二、调度器详解2.1、
APScheduler
全称Advanced python Scheduler
作用为在指定的时间规则执行指定的作业。
用于设定触发任务的条件:
触发器包含调度逻辑。每个任务都有自己的触发器,用于确定何时应该运行作业。除了初始配置之外,触发器完全是无状态的
用于存放任务,把任务存放在内存或数据库中
用于执行任务,可以设定执行模式为单线程或线程池:
任务会被执行器放入线程池或进程池去执行,执行完毕后,执行器会通知调度器。
把上方三个组件作为参数,通过创建调度器实例来运行:
一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。
BlockingScheduler
: 阻塞式调度器:适用于只跑调度器的程序。BackgroundScheduler
: 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行AsynciOScheduler
: Asyncio调度器,适用于应用使用AsnycIO的情况。GeventScheduler
: Gevent调度器,适用于应用通过Gevent的情况。TornadoScheduler
: Tornado调度器,适用于构建Tornado应用。TwistedScheduler
:Twisted调度器,适用于构建Twisted应用。QtScheduler
: Qt调度器,适用于构建Qt应用。date
:日期:触发任务运行的具体日期interval
: 间隔:触发任务运行的时间间隔cron
: 周期:触发任务运行的周期id
:启动任务的ID具有唯一性name
: 设置启动任务的名称coalesce
:当由于某种原因导致某个job积攒了好几次没有实际运行(比如说系统挂了5分钟后恢复,有一个任务是每分钟跑一次的,按道理说这5分钟内本来是“计划”运行5次的,但实际没有执行),如果coalesce为True,下次这个job被submit给executor时,只会执行1次,也就是最后这次,如果为False,那么会执行5次(不一定,因为还有其他条件,看后面misfire_grace_time的解释)max_instance
: 就是说同一个job同一时间最多有几个实例再跑,比如一个耗时10分钟的job,被指定每分钟运行1次,如果我们max_instance值为5,那么在第6~10分钟上,新的运行实例不会被执行,因为已经有5个实例在跑了misfire_grace_time
:设想和上述coalesce类似的场景,如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。replace_existing
: 如果调度的job在一个持久化的存储器里,当初始化应用程序时,必须要为job定义一个显示的ID并使用replace_existing=True
, 否则每次应用程序重启时都会得到那个job的一个新副本date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数 | 说明 |
---|---|
run_date (datetime 或 str) | 作业的运行日期或时间 |
timezone (datetime.tzinfo 或 str) | 指定时区 |
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
print(text)
scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2022, 4, 9), args=['text1'], id="1", coalesce=True, max_instances=1)
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2022, 4, 9, 17, 40, 58), args=['text2'], id="2", coalesce=True, max_instances=1)
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2022-4-9 17:41:00', args=['text3'], id="3", coalesce=True, max_instances=1)
scheduler.start()
参数 | 说明 |
---|---|
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
timezone (datetime.tzinfo 或str) | 时区 |
@sched.scheduled_job(
"interval", id=spider_job_name + "_bg_data", coalesce=True, max_instances=1, minutes=20
)
def tick_rzjg_detail_xq():
"""
快速完成
:return:
"""
each = "rzjg_bg_data"
cmd_str = f"cd {ROOT} && bash run_spider.sh {each} --loglevel=INFO"
print(cmd_str)
os.system(cmd_str)
def func():
print("Press Ctrl+C to exit")
# 直接触发一次
tick_rzjg_detail_xq()
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == "__main__":
func()
它是功能最强大的触发器
参数 | 说明 |
---|---|
year (int 或 str) | 年,4位数字 |
month (int 或 str) | 月 (范围1-12) |
day (int 或 str) | 日 (范围1-31) |
week (int 或 str) | 周 (范围1-53) |
day_of_week (int 或 str) | 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 时 (范围0-23) |
minute (int 或 str) | 分 (范围0-59) |
second (int 或 str) | 秒 (范围0-59) |
start_date (datetime 或 str) | 最早开始日期(包含) |
end_date (datetime 或 str) | 最晚结束时间(包含) |
timezone (datetime.tzinfo 或str) | 指定时区 |
表达式 | 参数类型 | 描述 |
---|---|---|
* | 所有 | 通配符。例:minutes=*即每分钟触发 |
*/a | 所有 | 可被a整除的通配符 |
a-b | 所有 | 范围a-b触发 |
a-b/c | 所有 | 范围a-b,且可被c整除时触发 |
xth y | 日 | 第几个星期几触发。x为第几个,y为星期几 |
last x | 日 | 一个月中,最后个星期几触发 |
last | 日 | 一个月最后一天触发 |
x,y,z | 所有 | 组合表达式,可以组合确定值或上方的表达式 |
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.fORMat(text, t))
scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
# 在每天 8 点,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='8', args=['job2'])
# 在每天 8 点 20点,各运行一次 job 方法 设置最大运行实例数
scheduler.add_job(job, 'cron', hour='8, 20', minute=30, max_instances=4)
scheduler.start()
到此这篇关于Python APScheduler 定时任务详解的文章就介绍到这了,更多相关Python APScheduler 定时任务内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: 最新Python APScheduler 定时任务详解
本文链接: https://lsjlt.com/news/118325.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0