Python 3.11 引入了 asyncio.timeout 作为处理异步操作超时的新标准方式。相比于旧版的 asyncio.wait_for,它提供了更灵活的上下文管理器接口,支持动态调整超时时间和设置绝对截止时间。以下是在实际代码中使用 asyncio.timeout 的具体步骤。
1. 基础用法:设置固定超时
使用 asyncio.timeout 最简单的方式是将其作为异步上下文管理器,包裹可能耗时较长的代码块。
- 导入
asyncio库。 - 定义 一个模拟耗时操作的异步函数。
- 使用
async with asyncio.timeout(seconds)语句包裹await调用。 - 捕获
TimeoutError异常以处理超时逻辑。
import asyncio
async def slow_operation():
# 模拟一个耗时 10 秒的操作
await asyncio.sleep(10)
async def main():
try:
# 设置 2 秒超时
async with asyncio.timeout(2):
await slow_operation()
except TimeoutError:
print("错误:操作在 2 秒内未完成")
asyncio.run(main())
2. 动态调整超时时间
asyncio.timeout 允许在代码块运行时动态延长或修改超时时间,这在处理不可预测的耗时任务(如下载大文件)时非常有用。
- 创建 一个
timeout实例对象t = asyncio.timeout(delay)。 - 进入
async with t:上下文。 - 调用
t.shift(delay)方法,将当前截止时间向后推迟指定的秒数。 - 执行 任务逻辑,在关键时刻调用
shift避免超时。
import asyncio
async def flexible_task():
# 创建初始超时为 2 秒的对象
t = asyncio.timeout(2)
try:
async with t:
print("第一阶段开始...")
await asyncio.sleep(1)
print("第一阶段完成,耗时 1 秒")
# 检查是否快要超时,如果需要,延长 3 秒
# 此时总允许时间变为 初始2秒 + 延长3秒 = 5秒
t.shift(3)
print("第二阶段开始,已延长超时时间...")
await asyncio.sleep(3)
print("第二阶段完成")
except TimeoutError:
print("任务超时")
asyncio.run(flexible_task())
3. 设置绝对截止时间
除了相对的“几秒后超时”,asyncio.timeout 还支持基于特定时间戳的绝对截止时间。
- 获取 当前事件循环的时间
loop.time()。 - 计算 目标截止时间戳(例如:当前时间 + 5 秒)。
- 使用
asyncio.timeout_at(deadline)替代asyncio.timeout()。 - 运行 代码,系统会在到达指定时间戳时触发超时。
import asyncio
async def deadline_example():
loop = asyncio.get_running_loop()
# 计算截止时间:当前时刻 + 5 秒
deadline = loop.time() + 5.0
print(f"截止时间点设置为: {deadline}")
try:
# 使用绝对时间
async with asyncio.timeout_at(deadline):
print("开始任务...")
await asyncio.sleep(10) # 这个任务肯定会超时
except TimeoutError:
print("已达到绝对截止时间")
asyncio.run(deadline_example())
4. 处理外部任务取消
asyncio.timeout 的工作原理是当超时发生时,取消上下文内正在运行的任务。你可以通过检查返回的 Timeout 对象来查看状态。
- 实例化
asyncio.timeout对象。 - 进入 上下文并执行可能被取消的任务。
- 引用 外部的
asyncio.create_task创建的任务。 - 观察 超时后任务的状态变化。
import asyncio
async def worker():
print("工作线程启动")
try:
await asyncio.sleep(5)
print("工作线程完成")
except asyncio.CancelledError:
print("工作线程被外部取消")
raise # 必须重新抛出 CancelledError
async def main():
task = asyncio.create_task(worker())
try:
async with asyncio.timeout(2):
await task
except TimeoutError:
print("主逻辑捕获到超时")
# 检查任务是否被取消
print(f"任务是否被取消: {task.cancelled()}")
asyncio.run(main())
5. 对比:旧版 wait_for 与新版 timeout
为了平滑迁移,理解两者的区别至关重要。下表总结了核心差异。
| 特性 | asyncio.wait_for (旧版) |
asyncio.timeout (新版) |
|---|---|---|
| 作用对象 | 只能包裹单个 awaitable 对象 |
可以包裹任意代码块(代码段) |
| 灵活性 | 仅支持固定时长或固定截止时间 | 支持动态调整超时 (shift) |
| 取消机制 | 自动取消传入的 awaitable | 自动取消上下文内的所有任务 |
| 返回值 | 返回 awaitable 的结果 | 本身不返回结果,仅管理超时上下文 |
| 推荐场景 | 简单的单次等待超时 | 复杂的业务逻辑流、需要重置超时的场景 |
6. 实战建议:在现有代码中替换
如果你正在维护使用 wait_for 的旧代码,可以按照以下模式进行替换。
旧代码模式:
try:
result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
except TimeoutError:
handle_timeout()
新代码替换模式:
- 移除
wait_for调用。 - 添加
async with asyncio.timeout(5.0):包裹await。 - 保持
except结构不变。
try:
async with asyncio.timeout(5.0):
result = await some_coroutine()
except TimeoutError:
handle_timeout()
这种替换不仅代码更整洁,而且如果将来需要在 some_coroutine() 前后添加其他日志或准备工作,超时控制依然有效。

暂无评论,快来抢沙发吧!