其他分享
首页 > 其他分享> > 轻量级消息队列 Django-Q 轻度体验

轻量级消息队列 Django-Q 轻度体验

作者:互联网

前言

最近做的这个项目(基于Django),需要做个功能,实现定时采集车辆定位。

这让我想起来几年前那个OneCat项目,当时我用的是Celery这个很重的组件

Celery实在是太重了,后来我做公众号采集平台的时候,又接触了Django-RQ和Django-Q这俩,前者是对RQ的封装,让RQ和Django更好的结合在一起;后者是一个全新的「多进程任务队列」组件,相比起celery很轻量,当时使用的时候就给我留下不错的印象。

于是这个项目我决定继续使用Django-Q来实现一些异步操作和定时任务。

关于Django-Q

官方介绍:

A multiprocessing task queue for Django

快速开始

安装

pip install django-q

添加到 INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    'django_q',
)

数据库迁移

由于Django-Q会把执行结果放到数据库里,所以要执行一下数据库迁移的操作

python manage.py migrate

这个操作会生成 django_q_ormqdjango_q_scheduledjango_q_task 三个表

配置

因为本身项目用的缓存就是Redis,所以我直接用Redis作为消息队列的后端(broker)

Django-Q支持很多种后端,除了Redis还有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~

settings.py 中添加以下配置:

Q_CLUSTER = {
    'name': 'project_name',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label': 'Django Q',
    'redis': {
        'host': 127.0.0.1',
        'port': 6379,
        'db': 0,
    }
}

启动服务

python manage.py qcluster

搞定,现在消息队列服务已经跑起来了

我们可以添加异步任务或者定时任务

异步任务

最简单的方式是使用它提供的 async_task 方法,添加一个新的异步任务到队列中

来写个例子,输入一个数,求阶乘之后开平方

import math

def demo_task(number: int):
    return math.sqrt(math.factorial(number))

启动任务

然后来添加一个异步任务

from django_q.tasks import async_task, Task

def task_finish(task: Task):
    print(f'任务 {task.name}(ID:{task.id})完成!')

task_id = async_task(
    demo_task, 10,
    task_name='任务名称',
    hook=task_finish,
)

可以看到,直接调用 async_task 方法就行

这个方法的定义是

async_task(func: Any, *args: Any, **kwargs: Any)

传入要异步执行的方法之后,可以把该方法的参数跟在后面传进去,也可以用 kwargs 的方式传入

这两种方式都可以的:

我个人比较喜欢第一种,因为Django-Q本身有几个命名参数,比如 task_namehooktimeout之类的,用第一种方式传参不容易和Django-Q默认的命名参数冲突。

获取执行结果

有两种方式获取任务的执行结果:

第一种方式无需赘述,在安装Django-Q组件后执行了数据库迁移,就会生成 Failed tasksScheduled tasksSuccessful tasks 三个admin模块,顾名思义,在 Failed tasksSuccessful tasks 中可以看到任务的执行结果,也就是我们写在 demo_task 里的返回值。

第二种方式,代码如下:

from django_q.tasks import result

task_result = result(task_id)

task_id 传入就可以查询任务执行的结果,如果任务还没执行完,那结果就是 None

这个 result 方法还有个 wait 参数,可以设置等待时间,单位是毫秒

执行完成回调

上面代码中,我们还设置了 hook 参数

作用就是任务执行完成之后,执行 task_finish 这个函数

task_finish 里可以通过 task 参数获取任务信息

就是这样~

async_task 的其他参数

创建异步任务的这个方法还有很多参数,官网文档写得还算可以,很多参数都是 Q_CLUSTER 配置里面有的,在 async_task 里设置这些参数就会覆盖默认的配置。

我直接搬运一波,权当翻译文档了~

除了上面介绍到的 task_namehook 还有这些参数:

q_options 参数

根据前面启动任务的部分,我们启动异步任务的时候,可以通过命名参数向任务方法传递参数,比如:

async_task(demo_task, number=10)

async_task 这个方法本身又有很多参数,如果这个参数名称和我们要执行的任务 demo_task 参数重名的话,这些参数就被 async_task 拿走了,我们的任务 demo_task 就拿不到这些参数了。

怎么办?

q_options 参数就是为了解决这个问题

可以把要传给 async_task 的参数都包装在一个 dict 里面,然后通过 q_options 参数传入

假如我们的 demo_task 是这样的:

def demo_task(number: int, timeout: int):
  ...

除了 number 这个参数,还要接收一个跟 async_task 自有参数重名的 timeout 参数,使用 q_options 的解决方案如下

opts = {
    'hook': 'hooks.print_result',
    'group': 'math',
    'timeout': 30
}

async_task(demo_task, number=10, timeout=100, q_options=opts)

这样既能……又能……,完美啊~

当然我还是建议用 *args 的方式传参,这样就没有参数重名的问题了。

定时任务

有两种方式添加定时任务

在代码中添加

比较简单,直接上代码

from django_q.tasks import schedule

schedule(
  'demo_task',
  schedule_type=Schedule.MINUTES,
  minutes=1,
  task_name='任务名称',
)

有一点注意的是,因为添加后的定时任务是要保存在数据库中的

所以需要把要执行的方法(包含完整包名),以字符串的形式传入

假如在我们的Django项目中,要执行的是在 apps/test/tasks.py 文件中的 demo_task 方法

那么需要把 apps.test.tasks.demo_task 这个完整的名称传入

在admin中添加也是一样

时间间隔设置

Django-Q的定时任务有很多类型:

注意,即使是Cron表达式,定时任务执行的最短间隔也是1分钟

这点我一开始不知道,用Cron表达式写了个15秒的任务,但执行时间根本不对,然后我翻了一下github上的issues,看到作者的解答才知道~

那个Issues的地址:https://github.com/Koed00/django-q/issues/179

作者的回复:

The current design has a heartbeat of 30 seconds, which means the schedule table can't have schedules below that. Most of this is explained in the architecture docs. Because of the way the internal loop is set up, a resolution under a dozen seconds or so, quickly becomes unreliable.

I always imagined tasks that need accuracy measured in seconds, would use a delayed tasks strategy where a few seconds delay is either added through the broker or inside the task itself.

The problem with all this, is that a task is currently always added to the back of the queue.
So even with a 1 second resolution on the schedule, the task still has to wait it's execution time. Which can of course vary wildly depending on the broker type, worker capacity and current workload.

这点感觉有些鸡肋,如果要高频执行的任务,那只能选择Celery了

在admin后台添加

这个更简单,傻瓜式操作

所以这部分略过了~

docker部署

现在后端服务基本是用docker部署的

为了能在docker中使用Django-Q

我们需要在原有Django容器的基础上,再起一个同样的容器,然后入口改成qcluster的启动命令

这里有个issues也有讨论这个问题:https://github.com/Koed00/django-q/issues/513

来个 docker-compose.yml 的例子

version: "3.9"
services:  
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - django_q
  django_q:
    build: .
    command: python manage.py qcluster
    volumes:
      - .:/code
    depends_on:
      - redis

一个简单的例子

其他的类似环境变量这些,根据实际情况来

注意:

其他

命令行工具

Django-Q还提供了一些命令行工具

除了使用命令监控,还可以在代码里做监控,不过我暂时没用到,所以还没研究,有需要的同学可以直接看文档

admin自定义

安装完Django-Q后,会在admin出现三个菜单,跟普通的Django app一样,这些也是通过 admin 注册进去的,因此我们可以重新注册这些 ModelAdmin 来自定义admin上的操作界面

来一段官方关于失败任务界面的代码:

from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
    list_display = (
        'name',
        'func',
        'result',
        'started',
        # add attempt_count to list_display
        'attempt_count'
    )

跟普通的 ModelAdmin 是一样的

我们可以自行添加搜索框、过滤字段之类的。记得要先执行 admin.site.unregister([q_models.Failure]) 取消之前Django-Q自己注册的 ModelAdmin 对象。

信号

Django内置信号系统,我之前有写过一篇简单的文章介绍:3分钟看懂Python后端必须知道的Django的信号机制

Django-Q提供了两类信号:

例子代码如下:

from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute

@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
    print("Task {} will be enqueued".format(task["name"]))

@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
    print("Task {} will be executed by calling {}".format(
          task["name"], func))

有需要的话可以注册消息接收器,做一些处理。(不过我暂时是没用上)

小结

搞定~

Django-Q使用下来的体验还是不错的,足够轻量,部署足够方便,足以应付大部分场景了~

参考资料

标签:task,队列,django,admin,任务,参数,Django,轻量级
来源: https://www.cnblogs.com/deali/p/16644989.html