Python asyncio.gather的return_exceptions异常处理模式
asyncio.gather 是 Python 并发编程中用于批量运行协程的常用工具。默认情况下,只要其中有一个任务出错,整个流程就会立即抛出异常,导致未完成的任务被中断或无法获取已完成任务的结果。为了解决“部分失败不影响整体结果获取”的需求,return_exceptions 参数应运而生。
1. 理解默认行为的痛点
在默认模式下,asyncio.gather 采用“一票否决制”。查看 以下代码示例,了解默认行为如何阻碍我们获取结果。
创建 一个包含成功和失败任务的脚本:
import asyncio
async def task_success():
await asyncio.sleep(0.1)
return "成功结果"
async def task_fail():
await asyncio.sleep(0.05)
raise ValueError("发生错误")
async def main():
# 默认模式:return_exceptions=False
results = await asyncio.gather(
task_success(),
task_fail()
)
print(results)
# 运行这段代码会导致程序崩溃,且无法获取 "成功结果"
# asyncio.run(main())
运行 上述代码时,task_fail 抛出异常,asyncio.gather 立即捕获该异常并重新抛出。结果是程序崩溃,不仅没有打印出任何结果,连 task_success 的返回值也丢失了。
2. 使用 return_exceptions=True 收集异常
为了在部分任务失败时仍能获取其他任务的结果,设置 return_exceptions=True。此时,异常对象会被当作普通返回值收集到结果列表中,而不是直接抛出。
修改 主函数代码如下:
import asyncio
async def task_success():
await asyncio.sleep(0.1)
return "成功结果"
async def task_fail():
await asyncio.sleep(0.05)
raise ValueError("发生错误")
async def main():
# 开启安全模式:return_exceptions=True
results = await asyncio.gather(
task_success(),
task_fail(),
return_exceptions=True
)
print(f"所有返回结果: {results}")
asyncio.run(main())
执行 脚本后,控制台会输出:
所有返回结果: ['成功结果', ValueError('发生错误')]
观察 输出结果可以发现:
- 列表包含了所有任务的返回值。
- 成功的任务返回了字符串
'成功结果'。 - 失败的任务返回了
ValueError异常对象本身,程序并没有崩溃。
3. 区分正常结果与异常对象
开启 return_exceptions=True 后,结果列表中混合了正常数据和异常对象。为了正确处理这些数据,遍历 结果列表并检查 每一项的类型。
编写 处理逻辑如下:
import asyncio
async def main():
results = await asyncio.gather(
task_success(),
task_fail(),
return_exceptions=True
)
for index, result in enumerate(results):
# 判断结果是否是 Exception 的实例
if isinstance(result, Exception):
print(f"任务 {index} 失败: {result}")
else:
print(f"任务 {index} 成功: {result}")
asyncio.run(main())
分析 代码逻辑:
isinstance(result, Exception)用于精准判断当前项是否为异常。- 如果是异常,打印 错误信息。
- 如果不是异常,处理 正常的业务数据。
4. 模式对比与总结
为了更清晰地展示两种模式的区别,参考 下表。该表展示了在不同场景下 asyncio.gather 的行为差异。
| 参数设置 | 任务行为 | 程序是否崩溃 | 返回值内容 | 适用场景 |
|---|---|---|---|---|
return_exceptions=False (默认) |
任意一个任务失败 | 是 | 不返回(直接抛出异常) | 强依赖所有任务,任何一个失败都需要整体停止。 |
return_exceptions=True |
任意一个任务失败 | 否 | 包含成功结果和异常对象的列表 | 批量爬虫、多接口调用,允许部分失败,需处理所有有效数据。 |
5. 进阶实战:结合超时控制
在实际生产环境中,除了处理异常,还需要防止任务无限期阻塞。结合 asyncio.wait_for 和 return_exceptions=True 可以构建一个健壮的并发模型。
实现 带有超时和异常处理的代码:
import asyncio
async def long_running_task(duration, name):
try:
await asyncio.sleep(duration)
return f"{name} 完成"
except asyncio.CancelledError:
return f"{name} 被取消"
except Exception as e:
return f"{name} 出错: {e}"
async def main():
# 定义任务:第一个正常,第二个会超时,第三个会出错
tasks = [
asyncio.wait_for(long_running_task(0.5, "任务A"), timeout=1.0),
asyncio.wait_for(long_running_task(2.0, "任务B"), timeout=1.0), # 耗时超过超时限制
long_running_task(0.1, "任务C") # 内部手动触发错误需修改函数,此处暂不模拟错误
]
# 添加模拟任务C错误的逻辑
async def task_c_error():
raise RuntimeError("任务C运行时错误")
tasks[2] = task_c_error()
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
# 区分超时异常和其他异常
if isinstance(res, asyncio.TimeoutError):
print(f"任务 {i} 超时")
else:
print(f"任务 {i} 异常: {res}")
else:
print(f"任务 {i} 结果: {res}")
asyncio.run(main())
解析 上述逻辑:
asyncio.wait_for会在任务运行时间超过timeout时抛出asyncio.TimeoutError。- 由于
gather开启了return_exceptions=True,这个TimeoutError被捕获并放入结果列表。 - 代码中进一步判断异常类型,区分 是“超时”还是“业务逻辑错误”,从而实现精细化的错误处理。
6. 执行流程可视化
为了直观理解 return_exceptions=True 对控制流的影响,参考 下面的流程图。该图描述了从任务分发到结果处理的完整路径。

暂无评论,快来抢沙发吧!