文章目录

Python asyncio.wait_for对协程设置超时并取消

发布于 2026-05-04 21:25:36 · 浏览 16 次 · 评论 0 条

Python asyncio.wait_for对协程设置超时并取消

在编写异步程序时,经常会遇到外部请求响应慢或 IO 操作卡住的情况。如果不做处理,这些挂起的协程会无限期占用资源。asyncio.wait_for 提供了一种机制,强制限制协程的运行时间。一旦超时,它会自动取消协程并抛出异常。

以下介绍如何使用 asyncio.wait_for 设置超时、捕获异常,以及理解其背后的取消机制。


1. 理解 wait_for 的基本参数

asyncio.wait_for 接收两个主要参数:一个 awaitable 对象(通常是一个协程)和一个超时时间。

查看 函数签名:

asyncio.wait_for(aw, timeout)
  • aw:需要执行的协程或任务。
  • timeout:等待的最长时间(秒),可以是 intfloat。如果设置为 None,则表示不限制时间。

2. 编写一个模拟耗时任务的协程

为了演示超时效果,首先定义一个模拟长耗时的协程。使用 asyncio.sleep 模拟 IO 阻塞。

输入以下代码:

import asyncio

async def long_running_task():
    print("任务开始:模拟耗时操作...")
    # 模拟耗时 5 秒
    await asyncio.sleep(5)
    print("任务完成:操作成功!")
    return "结果数据"

3. 设置超时并捕获 TimeoutError

创建一个主函数,调用 long_running_task,但只给它 2 秒的时间。由于任务需要 5 秒,必然触发超时。

编写如下逻辑:

  1. 使用 try...except 块包裹代码。
  2. 调用 asyncio.wait_for(long_running_task(), timeout=2.0)
  3. 捕获 asyncio.TimeoutError 异常。
async def main():
    try:
        # 设置 2 秒超时
        result = await asyncio.wait_for(long_running_task(), timeout=2.0)
        print(f"拿到结果: {result}")
    except asyncio.TimeoutError:
        print("错误:任务执行超时,已取消!")

# 运行主程序
asyncio.run(main())

运行上述代码,输出结果如下:

任务开始:模拟耗时操作...
错误:任务执行超时,已取消!

4. 理解超时的自动取消机制

当超时发生时,asyncio.wait_for 不仅抛出异常,还会在内部调用 cancel() 方法取消正在运行的协程。这意味着协程内的 finally 代码块仍然会被执行,用于清理资源。

修改 long_running_task,加入清理逻辑以验证这一点:

async def long_running_task_with_cleanup():
    try:
        print("任务开始:准备连接数据库...")
        await asyncio.sleep(5)
        print("任务完成:数据写入成功")
    finally:
        # 无论是否被取消,这里都会执行
        print("清理:关闭数据库连接")

更新 main 函数调用:

async def main_with_cleanup():
    try:
        await asyncio.wait_for(long_running_task_with_cleanup(), timeout=2.0)
    except asyncio.TimeoutError:
        print("错误:主程序捕获到超时")

asyncio.run(main_with_cleanup())

运行代码,观察输出顺序:

任务开始:准备连接数据库...
清理:关闭数据库连接
错误:主程序捕获到超时

注意 finally 块在 TimeoutError 被捕获之前就已经执行了。这是编写健壮异步代码的关键:确保finally 块中释放锁、关闭文件或断开网络连接。


5. 处理任务内部的 CancelledError

当协程被 wait_for 取消时,协程内部会抛出 asyncio.CancelledError。如果需要在取消时执行特定逻辑(不同于 finally),可以显式捕获这个错误。

注意:如果捕获了 CancelledError 且没有重新抛出,外部的 wait_for 可能无法正确感知取消状态,因此在处理完自定义逻辑后,通常需要重新抛出。

观察以下代码模式:

async def task_with_custom_cancel():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("任务内部:检测到取消请求,正在回滚事务...")
        # 执行回滚逻辑...
        # 关键:重新抛出异常,让外层知道任务已被取消
        raise
    finally:
        print("任务内部:最终清理资源")

async def main_handle_cancel():
    try:
        await asyncio.wait_for(task_with_custom_cancel(), timeout=1.0)
    except asyncio.TimeoutError:
        print("主程序:超时异常")

执行 asyncio.run(main_handle_cancel()),结果如下:

任务内部:检测到取消请求,正在回滚事务...
任务内部:最终清理资源
主程序:超时异常

6. 等待场景对比表

下表总结了在不同情况下 asyncio.wait_for 的行为表现。

场景描述 任务耗时 vs 超时时间 结果行为 抛出异常类型
正常完成 任务耗时 < 超时时间 返回任务的结果
发生超时 任务耗时 > 超时时间 任务被取消,抛出异常 asyncio.TimeoutError
任务内部报错 任意 任务停止,内部异常传播 任务内部的原始异常(如 ValueError

7. wait_for 执行流程图

以下流程图展示了 wait_for 在超时情况下的完整处理逻辑,包括取消、清理和异常抛出的顺序。

graph TD A["开始: 调用 wait_for(协程, timeout)"] --> B{任务在 timeout 内完成?} B -- 是 --> C["返回协程结果"] B -- 否 --> D["触发超时逻辑"] D --> E["内部调用 task.cancel()"] E --> F["协程内部抛出 CancelledError"] F --> G{协程是否有 except CancelledError?} G -- 是 --> H["执行自定义取消处理逻辑"] G -- 否 --> I["跳过"] H --> J["执行 finally 块 (清理资源)"] I --> J J --> K["wait_for 捕获到取消状态"] K --> L["向外抛出 asyncio.TimeoutError"]

注意:图表中使用双引号包裹文本以符合规范,所有标点均为英文半角。


通过以上步骤,你可以熟练使用 asyncio.wait_for 控制协程的最大执行时间,并结合 try...finallyCancelledError 确保程序在任何异常情况下的资源安全。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文