Python asyncio.wait_for对协程设置超时并取消
在编写异步程序时,经常会遇到外部请求响应慢或 IO 操作卡住的情况。如果不做处理,这些挂起的协程会无限期占用资源。asyncio.wait_for 提供了一种机制,强制限制协程的运行时间。一旦超时,它会自动取消协程并抛出异常。
以下介绍如何使用 asyncio.wait_for 设置超时、捕获异常,以及理解其背后的取消机制。
1. 理解 wait_for 的基本参数
asyncio.wait_for 接收两个主要参数:一个 awaitable 对象(通常是一个协程)和一个超时时间。
查看 函数签名:
asyncio.wait_for(aw, timeout)
aw:需要执行的协程或任务。timeout:等待的最长时间(秒),可以是int或float。如果设置为None,则表示不限制时间。
2. 编写一个模拟耗时任务的协程
为了演示超时效果,首先定义一个模拟长耗时的协程。使用 asyncio.sleep 模拟 IO 阻塞。
输入以下代码:
import asyncio
async def long_running_task():
print("任务开始:模拟耗时操作...")
# 模拟耗时 5 秒
await asyncio.sleep(5)
print("任务完成:操作成功!")
return "结果数据"
3. 设置超时并捕获 TimeoutError
创建一个主函数,调用 long_running_task,但只给它 2 秒的时间。由于任务需要 5 秒,必然触发超时。
编写如下逻辑:
- 使用
try...except块包裹代码。 - 调用
asyncio.wait_for(long_running_task(), timeout=2.0)。 - 捕获
asyncio.TimeoutError异常。
async def main():
try:
# 设置 2 秒超时
result = await asyncio.wait_for(long_running_task(), timeout=2.0)
print(f"拿到结果: {result}")
except asyncio.TimeoutError:
print("错误:任务执行超时,已取消!")
# 运行主程序
asyncio.run(main())
运行上述代码,输出结果如下:
任务开始:模拟耗时操作...
错误:任务执行超时,已取消!
4. 理解超时的自动取消机制
当超时发生时,asyncio.wait_for 不仅抛出异常,还会在内部调用 cancel() 方法取消正在运行的协程。这意味着协程内的 finally 代码块仍然会被执行,用于清理资源。
修改 long_running_task,加入清理逻辑以验证这一点:
async def long_running_task_with_cleanup():
try:
print("任务开始:准备连接数据库...")
await asyncio.sleep(5)
print("任务完成:数据写入成功")
finally:
# 无论是否被取消,这里都会执行
print("清理:关闭数据库连接")
更新 main 函数调用:
async def main_with_cleanup():
try:
await asyncio.wait_for(long_running_task_with_cleanup(), timeout=2.0)
except asyncio.TimeoutError:
print("错误:主程序捕获到超时")
asyncio.run(main_with_cleanup())
运行代码,观察输出顺序:
任务开始:准备连接数据库...
清理:关闭数据库连接
错误:主程序捕获到超时
注意 finally 块在 TimeoutError 被捕获之前就已经执行了。这是编写健壮异步代码的关键:确保在 finally 块中释放锁、关闭文件或断开网络连接。
5. 处理任务内部的 CancelledError
当协程被 wait_for 取消时,协程内部会抛出 asyncio.CancelledError。如果需要在取消时执行特定逻辑(不同于 finally),可以显式捕获这个错误。
注意:如果捕获了 CancelledError 且没有重新抛出,外部的 wait_for 可能无法正确感知取消状态,因此在处理完自定义逻辑后,通常需要重新抛出。
观察以下代码模式:
async def task_with_custom_cancel():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("任务内部:检测到取消请求,正在回滚事务...")
# 执行回滚逻辑...
# 关键:重新抛出异常,让外层知道任务已被取消
raise
finally:
print("任务内部:最终清理资源")
async def main_handle_cancel():
try:
await asyncio.wait_for(task_with_custom_cancel(), timeout=1.0)
except asyncio.TimeoutError:
print("主程序:超时异常")
执行 asyncio.run(main_handle_cancel()),结果如下:
任务内部:检测到取消请求,正在回滚事务...
任务内部:最终清理资源
主程序:超时异常
6. 等待场景对比表
下表总结了在不同情况下 asyncio.wait_for 的行为表现。
| 场景描述 | 任务耗时 vs 超时时间 | 结果行为 | 抛出异常类型 |
|---|---|---|---|
| 正常完成 | 任务耗时 < 超时时间 | 返回任务的结果 | 无 |
| 发生超时 | 任务耗时 > 超时时间 | 任务被取消,抛出异常 | asyncio.TimeoutError |
| 任务内部报错 | 任意 | 任务停止,内部异常传播 | 任务内部的原始异常(如 ValueError) |
7. wait_for 执行流程图
以下流程图展示了 wait_for 在超时情况下的完整处理逻辑,包括取消、清理和异常抛出的顺序。
注意:图表中使用双引号包裹文本以符合规范,所有标点均为英文半角。
通过以上步骤,你可以熟练使用 asyncio.wait_for 控制协程的最大执行时间,并结合 try...finally 或 CancelledError 确保程序在任何异常情况下的资源安全。

暂无评论,快来抢沙发吧!