文章目录

Python asyncio.timeout在Python 3.11中的新用法

发布于 2026-05-04 00:27:57 · 浏览 18 次 · 评论 0 条

Python 3.11 引入了 asyncio.timeout 作为处理异步操作超时的新标准方式。相比于旧版的 asyncio.wait_for,它提供了更灵活的上下文管理器接口,支持动态调整超时时间和设置绝对截止时间。以下是在实际代码中使用 asyncio.timeout 的具体步骤。


1. 基础用法:设置固定超时

使用 asyncio.timeout 最简单的方式是将其作为异步上下文管理器,包裹可能耗时较长的代码块。

  1. 导入 asyncio 库。
  2. 定义 一个模拟耗时操作的异步函数。
  3. 使用 async with asyncio.timeout(seconds) 语句包裹 await 调用。
  4. 捕获 TimeoutError 异常以处理超时逻辑。
import asyncio

async def slow_operation():
    # 模拟一个耗时 10 秒的操作
    await asyncio.sleep(10)

async def main():
    try:
        # 设置 2 秒超时
        async with asyncio.timeout(2):
            await slow_operation()
    except TimeoutError:
        print("错误:操作在 2 秒内未完成")

asyncio.run(main())

2. 动态调整超时时间

asyncio.timeout 允许在代码块运行时动态延长或修改超时时间,这在处理不可预测的耗时任务(如下载大文件)时非常有用。

  1. 创建 一个 timeout 实例对象 t = asyncio.timeout(delay)
  2. 进入 async with t: 上下文。
  3. 调用 t.shift(delay) 方法,将当前截止时间向后推迟指定的秒数。
  4. 执行 任务逻辑,在关键时刻调用 shift 避免超时。
import asyncio

async def flexible_task():
    # 创建初始超时为 2 秒的对象
    t = asyncio.timeout(2)
    try:
        async with t:
            print("第一阶段开始...")
            await asyncio.sleep(1) 
            print("第一阶段完成,耗时 1 秒")

            # 检查是否快要超时,如果需要,延长 3 秒
            # 此时总允许时间变为 初始2秒 + 延长3秒 = 5秒
            t.shift(3)

            print("第二阶段开始,已延长超时时间...")
            await asyncio.sleep(3)
            print("第二阶段完成")

    except TimeoutError:
        print("任务超时")

asyncio.run(flexible_task())

3. 设置绝对截止时间

除了相对的“几秒后超时”,asyncio.timeout 还支持基于特定时间戳的绝对截止时间。

  1. 获取 当前事件循环的时间 loop.time()
  2. 计算 目标截止时间戳(例如:当前时间 + 5 秒)。
  3. 使用 asyncio.timeout_at(deadline) 替代 asyncio.timeout()
  4. 运行 代码,系统会在到达指定时间戳时触发超时。
import asyncio

async def deadline_example():
    loop = asyncio.get_running_loop()
    # 计算截止时间:当前时刻 + 5 秒
    deadline = loop.time() + 5.0

    print(f"截止时间点设置为: {deadline}")

    try:
        # 使用绝对时间
        async with asyncio.timeout_at(deadline):
            print("开始任务...")
            await asyncio.sleep(10) # 这个任务肯定会超时
    except TimeoutError:
        print("已达到绝对截止时间")

asyncio.run(deadline_example())

4. 处理外部任务取消

asyncio.timeout 的工作原理是当超时发生时,取消上下文内正在运行的任务。你可以通过检查返回的 Timeout 对象来查看状态。

  1. 实例化 asyncio.timeout 对象。
  2. 进入 上下文并执行可能被取消的任务。
  3. 引用 外部的 asyncio.create_task 创建的任务。
  4. 观察 超时后任务的状态变化。
import asyncio

async def worker():
    print("工作线程启动")
    try:
        await asyncio.sleep(5)
        print("工作线程完成")
    except asyncio.CancelledError:
        print("工作线程被外部取消")
        raise # 必须重新抛出 CancelledError

async def main():
    task = asyncio.create_task(worker())

    try:
        async with asyncio.timeout(2):
            await task
    except TimeoutError:
        print("主逻辑捕获到超时")

    # 检查任务是否被取消
    print(f"任务是否被取消: {task.cancelled()}")

asyncio.run(main())

5. 对比:旧版 wait_for 与新版 timeout

为了平滑迁移,理解两者的区别至关重要。下表总结了核心差异。

特性 asyncio.wait_for (旧版) asyncio.timeout (新版)
作用对象 只能包裹单个 awaitable 对象 可以包裹任意代码块(代码段)
灵活性 仅支持固定时长或固定截止时间 支持动态调整超时 (shift)
取消机制 自动取消传入的 awaitable 自动取消上下文内的所有任务
返回值 返回 awaitable 的结果 本身不返回结果,仅管理超时上下文
推荐场景 简单的单次等待超时 复杂的业务逻辑流、需要重置超时的场景

6. 实战建议:在现有代码中替换

如果你正在维护使用 wait_for 的旧代码,可以按照以下模式进行替换。

旧代码模式:

try:
    result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
except TimeoutError:
    handle_timeout()

新代码替换模式:

  1. 移除 wait_for 调用。
  2. 添加 async with asyncio.timeout(5.0): 包裹 await
  3. 保持 except 结构不变。
try:
    async with asyncio.timeout(5.0):
        result = await some_coroutine()
except TimeoutError:
    handle_timeout()

这种替换不仅代码更整洁,而且如果将来需要在 some_coroutine() 前后添加其他日志或准备工作,超时控制依然有效。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文