Celery 分布式任务队列框架,主要包括三个组成成分:

[1] - Celery 客户端(Client)

[2] - 消息中间件(Message Broker)

[3] - Celery Worker

他们之间的关系如图:

这里,采用 FastAPI 作为 Celery Client,RabbitMQ 作为 Message Broker.

其中,

[1] - Celery Client 运行 FastAPI app,并传递消息或后台任务(message/background jobs) 到 RabbitMQ;

[2] - RabbitMQ 作为 Message Broker,将会调度 clients 和 workers 之间的消息;

[3] - RabbitMQ 在接收到 client 发送的消息后,通过将消息发送到一个 Celery Worker 以初始化 client 任务;

[4] - 一个 Celery Worker 被看做为后台任务,其可以从任何网络服务请求,实现异步性;

[5] - 同时可以有很多 workers 进行或完成很多任务(每个任务作为一个独立线程thread);

[6] - Celery 确保每个 worker 在同一时间点只执行一个任务,且每个任务只能被分配到一个 worker.

基于以上,实现如下:

Github - azzan-amin-97/FastAPI_Async_Celery

1. 依赖项安装

pip install fastapi
pip install celery
pip install uvicorn #ASGI server to run FastAPI app. 
pip install flower  #任务队列监控

设置 Message Broker(基于 Docker):

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

2. 创建 Celery Worker Task

celery_worker.py

#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
import time

from celery import Celery
from celery.utils.log import get_task_logger

#实例化 Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')

# 创建 logger,以显示日志信息
celery_log = get_task_logger(__name__)

# 创建任务函数,以订单(Order) 为例,异步进行
@celery.task
def create_order(name, quantity):
    # 5 seconds per 1 order
    complete_time_per_item = 5
    
    # Keep increasing depending on item quantity being ordered
    time.sleep(complete_time_per_item * quantity)
    
    # 显示日志    
    celery_log.info(f"Order Complete!")
    return {"message": f"Hi {name}, Your order has completed!",
            "order_quantity": quantity}

3. 创建 Model 和 App

model.py:

#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
from pydantic import BaseModel

# Pydantic BaseModel
# Order class model for request body
class Order(BaseModel):
    customer_name: str
    order_quantity: int

main.py

#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
from fastapi import FastAPI
from celery_worker import create_order
from model import Order

# Create FastAPI app
app = FastAPI()

# Create order endpoint
@app.post('/order')
def add_order(order: Order):
    # use delay() method to call the celery task
    create_order.delay(order.customer_name, order.order_quantity)
    return {"message": "Order Received! Thank you for your patience."}

4. 运行 FastAPI 和 Celery Worker server

uvicorn main:app --reload

访问:http://localhost:8000/docs 即可查看 FastAPI Swagger Docs.

启动 Celery Worker:

celery -A celery_worker.celery worker --loglevel=info

启动 flower 服务监控 Celery 消息队列:

celery flower -A celery_worker.celery --broker:amqp://localhost//

访问 http://localhost:5555/

5. 测试和分析 Celery

访问 http://localhost:8000/docs,示例如,

点击 Execute,返回结果如:

Last modification:January 17th, 2022 at 02:14 pm