celery_异步任务队列
作者:互联网
一、简介
Celery是基于Python开发的分布式任务队列。
它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 :
组件
- task:定义的task函数。由生产者发送给broker。
- broker:broker是一个消息传输的中间件,它是用来存储生产者发出的各种任务。存储媒介可供选择:RabbitMQ,Redis,MongoDB等等。
- worker:worker是执行任务的单元,并发的运行在分布式的系统节点中。它实时监控消息队列,如果有任务就获取任务并执行它。
- backend:用于存储任务的执行结果。
二、安装使用
安装celery+redis模块
pip install celery pip install redis pip install eventlet # celery 4.x以上版本不安装该模块,添加任务时会报错
使用Celery
使用celery包含三个方面
- 定义任务函数
- 运行celery服务
- 将任务函数添加到broker
定义文件目录结构如下:
1. 定义任务函数
创建celery_task.py文件输入下列代码
import time from celery import Celery broker = "redis://:123456@192.168.3.66:6379/0" # 123456为redis密码 backend = "redis://:123456@192.168.3.66:6379/1" # 存储执行结果 app = Celery("celery_test", broker=broker, backend=backend) # 第1个参数为任务的名称 @app.task def task1(x, y): time.sleep(5) return x + y
2.启动worker服务
进入celery_task.py文件所在目录,执行下面命令
celery -A celery_task worker --loglevel=info -P eventlet
celery_task为 定义任务函数 的文件路径id
启动成功截图如下
3.将任务函数添加到broker
创建add_task.py文件输入下列代码
import time from celery_task import task1 # 将任务添加到broker中 result = task1.delay(1, 2) # 查看任务是否执行完成。True表示任务已执行,False表示任务还在执行中 print('任务执行完成状态: %s' % result.ready()) # time.sleep(6) print('任务执行完成状态: %s' % result.ready()) run_result = result.get(timeout=1) print('任务执行结果: %s' % run_result)
执行结果如下:
流程说明:执行dd_task.py文件会将task1任务添加到broker中,worker服务监控获取到broker中新增的task1任务,就执行该任务,任务执行完后将执行结果存储在backend中。
标签:异步,task,队列,broker,celery,任务,result,执行 来源: https://www.cnblogs.com/testlearn/p/16205083.html