FastAPI 异步后台任务阻塞其他请求如何处理?

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》

写在前面


  • 工作中遇到,有大佬做了解答,简单整理
  • 阻塞的主要原因是 网络IO 密集型CPU 密集型是两个不同的概念, ASGI 更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


在使用 FastAPIweb 服务的时候, 使用 BackgroundTasks 执行CPU密集型或者 IO 密集型任务,会阻塞当前 web 服务的所有接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.get('/face_recognition_activated')
async def face_recognition_activated(level : int,background_tasks: BackgroundTasks,token: bool = Depends(get_current_token)):
"""
@Time : 2023/10/20 03:28:11
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 开始进行人脸识别
"""
# 提取人脸库数据

background_tasks.add_task(face_recognition, data = {
"dir_name":"A205_20237test4",
"class_code": "A0205",
"real_real_id": [2747,2745,345435]
})
return {"status": 200,"message": "人脸识别开始了 🚀🚀🚀🚀🚀" }

对应的后台任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def face_recognition(c_data):
"""
@Time : 2023/10/20 05:09:23
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 人脸识别
"""
logging.info("开始人脸数据库特征提取!")
....................
pbar_i = tqdm(
range(0, len(face_list)),
desc="特征载入内存:👀👀👀",
mininterval=0.001,
smoothing=0.0001,
colour='green',
)
for idx in pbar_i:
face_id = face_list[idx]
face_id = face_id.decode('utf-8')
face_score_key = class_code_account_period +"_face_score"
faces = pkl.rc.zrange(face_score_key + ":" + face_id, 0, -1)
meta_dates = []
for hallmark_id in faces:
face_recognition_key = class_code_account_period + "_face_recognition"
face_r = pkl.rc.hget(
face_recognition_key + ":" + face_id, hallmark_id)
face_path_key = class_code_account_period + ':Path'
f_path = pkl.rc.hget(face_path_key, face_id)

meta_date = {"hallmark_id": hallmark_id.decode('utf-8'),
"face_r": pickle.loads(face_r),
"f_path": f_path.decode('utf-8'),
"mass": 500,
}
#logging.info(meta_date)
meta_dates.append(meta_date)
checks.append({"face_id": face_id, "meta_dates": meta_dates})

logging.info("构建内存人脸库完成")
# 开始人脸识别
logging.info("开始人脸识别..")
r_p = RedisClient(1)
logging.info("人脸识别后台任务启动......") #
consumer_task = asyncio.create_task(AdafaceFaceIdentification.consumer_facial_feature_clustering(global_object,checks,r_p,class_code_account_period,c_data))
await asyncio.gather(consumer_task)

对于这种情况,这是因为 对应的 后台任务被定义为 async , 意味着 fastapi 会在 asyncio 事件循环中运行它。并且因为 对应后台任务的某一环节是同步的(即不等待某些 IO或者是网络请求,而是进行计算)只要它正在运行,它就会阻塞事件循环。

这有在涉及异步IO网络操作的情况下,asyncio 才不会阻塞,能够以非阻塞的方式运行,从而充分利用系统资源并提高应用程序的并发性能。

解决这个问题的几种方法:

  • 使用更多的工人(例如 uvicorn main:app --workers 4 )。这将允许最多 4 个 后台任务 并行。
  • 将任务重写为不是 async (即将其定义为 def task(data): … 等)。然后 starlette 将在单独的线程中运行它。
  • 使用 fastapi.concurrency.run_in_threadpool ,这也将在单独的线程中运行它。像这样:
1
2
3
4
5
 from fastapi.concurrency import run_in_threadpool
async def task(data):
otherdata = await db.fetch("some sql")
newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
  • 或者直接使用 asynciosrun_in_executor (其中 run_in_threadpool 在后台使用):
1
2
3
4
5
6
7
 import asyncio
async def task(data):
otherdata = await db.fetch("some sql")
loop = asyncio.get_running_loop()
newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)

  • 自己生成一个单独的线程/进程。例如使用 concurrent.futures
  • 使用更重的东西,如芹菜。 (也在 此处 的 fastapi 文档中提到)。

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)


https://segmentfault.com/q/1010000043296883

https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/63171013#63171013


© 2018-至今 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

发布于

2023-11-03

更新于

2024-11-22

许可协议

评论
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×