其他分享
首页 > 其他分享> > FastAPI(41)- Background Task 后台任务

FastAPI(41)- Background Task 后台任务

作者:互联网

FastAPI(41)- Background Task 后台任务 

 

后台任务

 

常见的栗子

 

实际栗子

创建后台任务要用到的函数

import time

def write_notification(email: str, message: str = ""):
    # 1、模拟和邮件服务器建立连接
    time.sleep(3)
    with open("text.txt", mode="w") as f:
        # 2、模拟发送邮件
        content = f"message is {message}"
        f.write(content)
        print(content)

  

添加后台任务

#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/9/29 7:11 下午
# file: 35_background_task.py
"""
import time
import uvicorn
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()


def write_notification(email: str, message: str = ""):
    # 1、模拟和邮件服务器建立连接
    time.sleep(3)
    with open("text.txt", mode="w") as f:
        # 2、模拟发送邮件
        content = f"message is {message}"
        f.write(content)
        print(content)


@app.post("/email/{email}")
async def send_email(
        email: str,
        # 指定参数类型为 BackgroundTasks
        background_task: BackgroundTasks
):
    # 添加后台任务
    # 第一个参数:可调用对象,一般就是函数
    # 后面的参数:write_notification 函数所需要的参数
    background_task.add_task(write_notification, email, message="test_message")
    return {"message": "Notification sent in the background"}


if __name__ == '__main__':
    uvicorn.run(app="35_background_task:app", reload=True, host="127.0.0.1", port=8080)

  

后台任务结合依赖项

#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/9/29 7:11 下午
# file: 35_background_task.py
"""
import time
from typing import Optional

import uvicorn
from fastapi import FastAPI, BackgroundTasks, Depends

app = FastAPI()


# 后台任务函数
def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


# 依赖项函数
async def get_query(
        background_task: BackgroundTasks,
        q: Optional[str] = None,
):
    # 如果 q 有值才执行后台任务
    if q:
        message = f"found query: {q}\n"
        background_task.add_task(write_log, message)


@app.post("/email_depends/{email}")
async def send_email(
        email: str,
        background_task: BackgroundTasks,
        q: str = Depends(get_query)
):
    # 执行一次后台任务
    message = f"message to {email}\n"
    background_task.add_task(write_log, message)
    return {"message": "Message sent"}


if __name__ == '__main__':
    uvicorn.run(app="35_background_task:app", reload=True, host="127.0.0.1", port=8080)

 

查看 BackgroundTasks 源码

 

BackgroundTasks 注意事项

标签:__,Task,FastAPI,email,task,background,后台任务,message
来源: https://www.cnblogs.com/xiao-xue-di/p/15770160.html