其他分享
首页 > 其他分享> > Celery周期性定时任务-periodic_task

Celery周期性定时任务-periodic_task

作者:互联网

目录

周期性定时任务

说明:在Django中使用celery, 并结合django-celery模块(省略安装)。需要在配置文件中注册:

import djcelery
INSTALLED_APPS += ('djcelery',)
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"  # 基于数据库模型调度

基本使用

crontab表达式

例子 意义
crontab() 每分钟执行一次。
crontab(minute=0, hour=0) 每天午夜执行。
crontab(minute=0, hour='*/3') 每三个小时执行一次:午夜、凌晨 3 点、早上 6 点、早上 9 点、中午、下午 3 点、下午 6 点、晚上 9 点。
crontab(minute=0,``hour='0,3,6,9,12,15,18,21') 和以前一样。
crontab(minute='*/15') 每 15 分钟执行一次。
crontab(day_of_week='sunday') 在星期日每分钟 (!) 执行一次。
crontab(minute='*',``hour='*', day_of_week='sun') 和以前一样。
crontab(minute='*/10',``hour='3,17,22', day_of_week='thu,fri') 每十分钟执行一次,但仅限于周四或周五的凌晨 3-4 点、下午 5-6 点和晚上 10-11 点。
crontab(minute=0, hour='*/2,*/3') 每隔偶数小时执行一次,并且每小时可被 3 整除。这意味着:除了:1am、5am、7am、11am、1pm、5pm、7pm、11pm之外的每个小时
crontab(minute=0, hour='*/5') 执行可被 5 整除的小时。这意味着它在下午 3 点而非下午 5 点触发(因为下午 3 点等于 24 小时时钟值“15”,可被 5 整除)。
crontab(minute=0, hour='*/3,8-17') 每小时执行一次,可被 3 整除,并在办公时间(上午 8 点至下午 5 点)每小时执行一次。
crontab(0, 0, day_of_month='2') 在每个月的第二天执行。
crontab(0, 0,``day_of_month='2-30/2') 在每个偶数日执行。
crontab(0, 0,``day_of_month='1-7,15-21') 在每月的第一周和第三周执行。
crontab(0, 0, day_of_month='11',``month_of_year='5') 每年5月11日执行。
crontab(0, 0,``month_of_year='*/3') 在每个季度的第一个月每天执行。
                       |

自定义crontab

celery_tasks.py文件:定义任务加task()

from celery.task import task
@task()
def send_msg(oa_list, content, id, username, msg):
    # 逻辑
    pass

views.py写自定义方法:

import datetime
import json
from djcelery.models import PeriodicTask, CrontabSchedule

def create_task(request):
    task_name = "test"	# 自定义,不能重复,唯一
	task = 'app01.tasks.send_msg'
	task_args = [oa_list, content]
	task_kwargs = {'id': id, 'username': username, 'msg': msg}
	# 定时规则
	crontab_time = {
    	'minute': '*/1',
    	'hour': '*',
        'day_of_week': '*'
    	'day_of_month': '*',
    	'month_of_year': '*'
	}
    
	schedule = CrontabSchedule.objects.create(**crontab_time)

	task, created = PeriodicTask.objects.get_or_create(
    	name=task_name,		# 名称保持唯一
    	task=task,	
    	crontab=schedule,
    	enabled=True,	# 是否开启
    	args=json.dumps(task_args)
    	kwargs=json.dumps(task_kwargs),
    	expires=datetime.datetime.now()+datetime.timedelta(days=1)	# 过期时间
	)
   	if created:
        return HttpResponse("创建成功")
    else:
        return HttpResponse("创建失败")

使用程序停止和删除周期定时任务

周期定时任务停止,直接通过task_name删除

from djcelery.schedulers import ModelEntry, DatabaseScheduler

def delete_celery_task(task_name):
	DatabaseScheduler.delete_task(task_name)

停止运行中的周期定时任务

from djcelery.schedulers import DatabaseScheduler
from djcelery.models import PeriodicTask, CrontabSchedule

def celery_enabled(task_name):
    """
    停止周期任务
    :param task_name: 周期任务名
    """
    
    PeriodicTask.objects.filter(name=task_name).update(enabled=False)	# False停止, True开启
	DatabaseScheduler.create_or_update_task(task_name)		# 刷新计划,防止数据库更改,而任务不发生变化

标签:task,name,hour,crontab,Celery,periodic,import,minute
来源: https://www.cnblogs.com/yzm1017/p/15357165.html