数据库
首页 > 数据库> > 使用django数据库执行之前,芹菜撤销任务

使用django数据库执行之前,芹菜撤销任务

作者:互联网

由于并发原因,我使用的是Django数据库而不是RabbitMQ.

但是我无法解决在执行任务之前将其撤消的问题.

我找到了有关此问题的一些答案,但它们似乎不完整,或者我无法获得足够的帮助.

> first answer
> second answer

当我不想执行任务时,如何使用模型扩展celery任务表,并添加一个布尔字段(撤消)以进行设置?

谢谢.

解决方法:

由于Celery通过ID跟踪任务,因此您真正需要的就是能够知道哪些ID已被取消.您可以创建自己的表(或memcached等)来跟踪已取消的ID,而不用修改kombu内部结构,然后检查当前可取消任务的ID是否在其中.

这是支持远程撤消命令的传输在内部执行的操作:

All worker nodes keeps a memory of revoked task ids, either in-memory
or persistent on disk (see Persistent revokes). (from Celery docs)

当您使用django传输时,您要自己负责.在这种情况下,由每个任务来检查是否已取消.

因此,任务的基本形式(添加日志代替实际操作)变为:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

您可以扩展&以各种方式改进此操作,例如通过创建基本的CancelableTask类,就像您链接到的其他答案之一一样,但这是基本形式.您现在缺少的是模型和检查它的功能.

请注意,这种情况下的ID将是字符串ID,例如a5644f08-7d30-43ff-a61e-81c165ad9e19,而不是整数.您的模型可以像这样简单:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

现在,您可以通过查看celery服务的调试日志来检查行为,例如:

my_task.delay()
models.cancel_task(my_task.delay())

标签:python,django,celery,django-celery
来源: https://codeday.me/bug/20191009/1877562.html