如何在python项目中实现异步构建?FastAPI 、Faust、Celery 异步处理库使用介绍
作者:互联网
FastAPI — 用于需要 HTTP (REST) 的服务
对于需要 HTTP 和 REST 能力的服务,icode9技术分享使用了一个名为 FastAPI 的软件。FastAPI 是一个构建在 Starlette 之上的包(由构建 django-rest-framework 和 Uvicorn 的同一个人维护的包)。
FastAPI 是一种现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示使用 Python 3.6+ 构建 API。它的主要优点是:
- 编码简单——与icode9技术分享拥有的其他框架(DRF、apistar、aiohttp)相比,项目中的新成员很容易为基于 FastAPI 构建的服务做出贡献
- 它的性能非常好——因为 Starlette 是一款非常快的软件,而且 pydantic(数据验证和解析库)也很棒——icode9技术分享成功地为每个加载请求处理了大约 20 毫秒的延迟。
代码的一个小例子是:
从 fastapi 导入 FastAPI应用程序 = FastAPI()@app.get(“/”)异步定义 read_root():返回{“你好”:“世界”}@app.get(“/items/{item_id}”)async def read_item(item_id: int, q: str = None):返回{“item_id”:item_id,“q”:q}
Faust——让流处理变得简单
Faust 是 Robinhood 公司的人贡献的一个很棒的包。Faust 基于 Apache Kafka,基本上为您提供了 Python 中的 Kafka Stream 功能。
使用Faust,您可以:
- 在 Python 中轻松定义流处理(无 DSL),快速扩展(感谢 Kafka 消费者群体)。
- 将其用作集群,具有内置领导者选举、集群范围的表和窗口。
- 通过序列化进行备份,以提高在 kafka 主题中传递数据的标准。
icode9技术分享在许多用例中大量使用了 faust:
- Data stream worker——icode9技术分享有 faust worker 来监听icode9技术分享所有的 Kafka 主题,将数据解析为 JSON 并将它发送到 elasticsearch。Faust 给了icode9技术分享“批处理”消息的能力,所以icode9技术分享可以保证 100% 的消息发送到 elasticsearch。
- 规则验证工作人员——一个监听主题的流,通过用户定义的规则验证数据——并在违反规则时发出警报。
- 价格流——icode9技术分享有一个流,它正在收听充满股票价格的 kafka 主题,并进行窗口化以给出给定时间内的平均价格。
- Websocket 流——icode9技术分享想让icode9技术分享的用户能够订阅流并从 kafka 向他发送实时数据——所以浮士德给了icode9技术分享这种能力,几乎没有样板。
所有这些任务可能需要几天时间才能完成,因为您需要设置 kafka 消费者(使用 aiokafka)、http/websocket 服务器并设置指标。对于领导者选举和窗口化等高级功能,您可能会编写大量代码。Faust 为您提供了所有这些精巧的功能,而无需编写一行代码,而且效果很好!icode9技术分享在生产中使用了一年多,没有出现任何严重问题,Robinhood 也是如此。
Faust 当前状态的缺点是它不适合低延迟使用(因为它在给定秒内从 kafka 提取的数据量非常低)。
代码的一个小例子是:
app = faust.App('myapp', broker='kafka://localhost')# 模型描述了消息是如何序列化的:# {“account_id”: “3fae-…”, 金额”: 3}类订单(faust.Record):account_id: 海峡金额:整数@app.agent(value_type=Order)异步定义订单(订单):订单中的订单异步:# 处理无限的订单流。print(f'Order for {order.account_id}: {order.amount}')
Celery
Celery 是一个任务队列管理器,它允许使用消息队列和调度在 Python 中运行长时间运行的任务。
Celery 已成为社区的广泛继承者,过去六年icode9技术分享一直在使用它。但是,随着时间的推移,icode9技术分享遇到了很多问题。本节将讨论可能的问题和备选方案。
icode9技术分享在生产中遇到的一些问题是:
- 调度程序 (celery beat) 不能很好地处理时区并导致数百万个错误的调度,而且它也很难调试。
- 如果您在特定任务中使用多处理,则撤销机制无法正常工作。
- icode9技术分享认为 celery 结果的存储后端解决方案并不完整。您需要努力工作才能获得任务名称、任务参数和参数、总运行时间和结果。
- 该软件在过去的版本中不稳定,并导致了很多严重问题(特别是在使用工作流等奇特功能时)
为了继续使用它icode9技术分享做了什么:
- icode9技术分享使用 APScheduler 为 celery 实现了自己的调度程序,效果非常好。
- icode9技术分享停止使用工作流或任何其他花哨的功能,只使用芹菜的“工作者”功能。
- icode9技术分享使用 postgres 构建了自己的存储后端,它使用 celery 信号准确存储了icode9技术分享想要的内容。
icode9技术分享在使用Celery时推荐什么:
- 确保调度程序运行良好!由于这些问题,icode9技术分享有许多关键任务失败了——这可能会让你在生产中付出代价。
- 如果调度程序不稳定,请自己实施!
- 使用芹菜信号来保存你想要的数据,芹菜存储后端可能会丢失一些数据。
最后,icode9技术分享讨论了构建新后端时icode9技术分享最常遇到的问题的解决方案——使用 Faust,您可以轻松处理流,并将 FastAPI 用于 API 层,将 Celery/Huey 用于长时间运行的任务。
标签:python异步处理库,FastAPI,Faust,Celery 来源: