apscheduler定时任务
作者:互联网
apscheduler是一款使用python 语言开发的定时任务工具,提供了非常丰富而且简单易用的定时任务接口
#安装
pip3 install apscheduler
apscheduler的四大组件
- triggers 触发器 可以按照日期、时间间隔或者 contab 表达式三种方式触发
- job stores 作业存储器 指定作业存放的位置,默认保存在内存,也可以保存在各种数据库中
- executors 执行器 将指定的作业提交到线程池或者进程池中运行
- schedulers 作业调度器 常用的有 BackgroundScheduler(后台运行)和 BlockingScheduler (阻塞式)
import time
from apscheduler.schedulers.background import BlockingScheduler # 导入阻塞调度器
from apscheduler.triggers.interval import IntervalTrigger # 导入间隔触发器
def my_job():
print(f'my_job{time.ctime()}')
if __name__ == '__main__':
# 初始化调度器
schedulers = BlockingScheduler()
# 触发器间隔时间为1秒,可以使用minutes、hours、days、weeks等
intervalTrigger = IntervalTrigger(seconds=1)
# # 将任务添加到调度器当中,给当前任务自定义一个ID,方便后续管理(非必选)
schedulers.add_job(func=my_job, trigger=intervalTrigger, id='my_job_id')
# 开始运行
schedulers.start()
backgroundScheduler
backgroundScheduler可以在后台运行,不会阻塞主线程的执行
import time
from apscheduler.schedulers.background import BackgroundScheduler # 导入后台运行调度器
from apscheduler.triggers.interval import IntervalTrigger # 导入间隔触发器
def my_job():
print(f'my_job{time.ctime()}')
if __name__ == '__main__':
schedulers = BackgroundScheduler()
intervalTrigger = IntervalTrigger(seconds=1)
schedulers.add_job(func=my_job, trigger=intervalTrigger)
schedulers.start()
while True:
time.sleep(1)
指定日期执行
import time
from apscheduler.schedulers.background import BlockingScheduler # 导入调度器
from apscheduler.triggers.date import DateTrigger # 导入日期触发器
def my_job():
print(f'my_job{time.ctime()}')
if __name__ == '__main__':
schedulers = BlockingScheduler()
# 设置触发日期
intervalTrigger = DateTrigger(run_date='2022-03-07 14:19:30')
schedulers.add_job(func=my_job, trigger=intervalTrigger)
schedulers.start()
CronTrigger指定周期执行
import time
from apscheduler.schedulers.background import BlockingScheduler # 导入调度器
from apscheduler.triggers.cron import CronTrigger # 导入CronTrigger
def my_job():
print(f'my_job{time.ctime()}')
if __name__ == '__main__':
schedulers = BlockingScheduler()
# 每天的19:30:01 执行
# intervalTrigger = CronTrigger(hour=19, minute=30, second=1)
# 每年的10月1日 12点执行
# intervalTrigger = CronTrigger(month=10, day=1, hour=12)
# 第一秒执行
intervalTrigger = CronTrigger(second=1)
schedulers.add_job(func=my_job, trigger=intervalTrigger)
schedulers.start()
executors执行器
默认情况下,apscheduler也是使用了ThreadPoolExecutor,线程池的大小是10
关于ThreadPoolExecutor和ProcessPoolExecutor的选择问题,这里有一个原则,如果是cpu密集型的作业,使用ProcessPoolExecutor,其它的使用ThreadPoolExecutor,当然ThreadPoolExecutor和ProcessPoolExecutor也是可以混用的
import time
from apscheduler.schedulers.background import BlockingScheduler # 导入调度器
from apscheduler.triggers.interval import IntervalTrigger # 导入CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
def my_job():
print(f'my_job{time.ctime()}')
if __name__ == '__main__':
# 将线程池修改为20
executors = {
'default': ThreadPoolExecutor(20)
}
# 将执行器传递给调度器,启动线程池为20
schedulers = BlockingScheduler(executors=executors)
intervalTrigger = IntervalTrigger(seconds=1)
schedulers.add_job(func=my_job, trigger=intervalTrigger)
schedulers.start()
将数据存储到数据库
import time
from apscheduler.schedulers.background import BlockingScheduler # 导入调度器
from apscheduler.triggers.interval import IntervalTrigger # 导入CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def my_job():
print(f'my_job{time.ctime()}')
jobstores = {
'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
if __name__ == '__main__':
executors = {
'default': ThreadPoolExecutor(20)
}
schedulers = BlockingScheduler(executors=executors,jobstores=jobstores)
intervalTrigger = IntervalTrigger(seconds=1)
schedulers.add_job(func=my_job, trigger=intervalTrigger)
schedulers.start()
标签:__,schedulers,任务,apscheduler,job,import,定时,my 来源: https://www.cnblogs.com/Mickey-7/p/16315977.html