其他分享
首页 > 其他分享> > apscheduler定时任务

apscheduler定时任务

作者:互联网

apscheduler是一款使用python 语言开发的定时任务工具,提供了非常丰富而且简单易用的定时任务接口
#安装
pip3 install apscheduler
  apscheduler的四大组件   BlockingScheduler  BlockingScheduler,它是阻塞式的,执行之后,由于阻塞当前线程,后续代码不会执行,直到当前任务执行结束
import time
from apscheduler.schedulers.background import BlockingScheduler  # 导入阻塞调度器
from apscheduler.triggers.interval import IntervalTrigger  # 导入间隔触发器


def my_job():
    print(f'my_job{time.ctime()}')


if __name__ == '__main__':
    # 初始化调度器
    schedulers = BlockingScheduler()

    # 触发器间隔时间为1秒,可以使用minutes、hours、days、weeks等
    intervalTrigger = IntervalTrigger(seconds=1)

    # # 将任务添加到调度器当中,给当前任务自定义一个ID,方便后续管理(非必选)
    schedulers.add_job(func=my_job, trigger=intervalTrigger, id='my_job_id')
    # 开始运行
    schedulers.start()
    
      backgroundScheduler backgroundScheduler可以在后台运行,不会阻塞主线程的执行
import time
from apscheduler.schedulers.background import BackgroundScheduler  # 导入后台运行调度器
from apscheduler.triggers.interval import IntervalTrigger  # 导入间隔触发器


def my_job():
    print(f'my_job{time.ctime()}')


if __name__ == '__main__':
    schedulers = BackgroundScheduler()
    intervalTrigger = IntervalTrigger(seconds=1)
    schedulers.add_job(func=my_job, trigger=intervalTrigger)
    schedulers.start()

    while True:
        time.sleep(1)
  指定日期执行
import time
from apscheduler.schedulers.background import BlockingScheduler  # 导入调度器
from apscheduler.triggers.date import DateTrigger # 导入日期触发器


def my_job():
    print(f'my_job{time.ctime()}')


if __name__ == '__main__':
    schedulers = BlockingScheduler()
    # 设置触发日期
    intervalTrigger = DateTrigger(run_date='2022-03-07 14:19:30')
    schedulers.add_job(func=my_job, trigger=intervalTrigger)
    schedulers.start()
    CronTrigger指定周期执行
import time
from apscheduler.schedulers.background import BlockingScheduler  # 导入调度器
from apscheduler.triggers.cron import CronTrigger  # 导入CronTrigger


def my_job():
    print(f'my_job{time.ctime()}')


if __name__ == '__main__':
    schedulers = BlockingScheduler()
    # 每天的19:30:01 执行
    # intervalTrigger = CronTrigger(hour=19, minute=30, second=1)
    # 每年的10月1日 12点执行
    # intervalTrigger = CronTrigger(month=10, day=1, hour=12)
    # 第一秒执行
    intervalTrigger = CronTrigger(second=1)
    schedulers.add_job(func=my_job, trigger=intervalTrigger)
    schedulers.start()
  executors执行器 默认情况下,apscheduler也是使用了ThreadPoolExecutor,线程池的大小是10 关于ThreadPoolExecutor和ProcessPoolExecutor的选择问题,这里有一个原则,如果是cpu密集型的作业,使用ProcessPoolExecutor,其它的使用ThreadPoolExecutor,当然ThreadPoolExecutor和ProcessPoolExecutor也是可以混用的
import time
from apscheduler.schedulers.background import BlockingScheduler  # 导入调度器
from apscheduler.triggers.interval import IntervalTrigger  # 导入CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor


def my_job():
    print(f'my_job{time.ctime()}')


if __name__ == '__main__':
    # 将线程池修改为20
    executors = {
        'default': ThreadPoolExecutor(20)
    }
    # 将执行器传递给调度器,启动线程池为20
    schedulers = BlockingScheduler(executors=executors)

    intervalTrigger = IntervalTrigger(seconds=1)
    schedulers.add_job(func=my_job, trigger=intervalTrigger)
    schedulers.start()
    将数据存储到数据库
import time
from apscheduler.schedulers.background import BlockingScheduler  # 导入调度器
from apscheduler.triggers.interval import IntervalTrigger  # 导入CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

def my_job():
    print(f'my_job{time.ctime()}')


jobstores = {
    'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

if __name__ == '__main__':
    executors = {
        'default': ThreadPoolExecutor(20)
    }
    schedulers = BlockingScheduler(executors=executors,jobstores=jobstores)

    intervalTrigger = IntervalTrigger(seconds=1)
    schedulers.add_job(func=my_job, trigger=intervalTrigger)
    schedulers.start()
 

标签:__,schedulers,任务,apscheduler,job,import,定时,my
来源: https://www.cnblogs.com/Mickey-7/p/16315977.html