Python asyncio.TaskGroup取代gather的结构化并发
在 Python 异步编程中,管理多个并发任务长期以来依赖于 asyncio.gather。然而,gather 在处理异常和任务取消时存在局限性,往往需要编写大量样板代码来确保“要么全做,要么全不做”。Python 3.11 引入了 asyncio.TaskGroup,正式带来了原生支持的结构化并发。它利用上下文管理器自动管理任务生命周期,当内部任务失败时自动取消其余任务,极大地简化了异步代码的编写。
准备工作
确认 Python 版本。TaskGroup 是 Python 3.11 引入的标准库功能,旧版本无法使用。
运行 以下命令检查版本:
python --version
如果输出低于 3.11,升级 Python 环境。
基础用法:从 Gather 迁移
TaskGroup 最核心的改变是使用 async with 语句块来定义任务的作用域。在作用域结束时,Python 会等待所有任务完成。
定义 三个模拟耗时的异步函数:
import asyncio
async def task_a():
print("任务 A 开始")
await asyncio.sleep(1)
print("任务 A 结束")
return "结果 A"
async def task_b():
print("任务 B 开始")
await asyncio.sleep(2)
print("任务 B 结束")
return "结果 B"
async def task_c():
print("任务 C 开始")
await asyncio.sleep(1)
print("任务 C 结束")
return "结果 C"
使用 gather 的旧写法
在旧代码中,你需要手动将协程对象列表传给 gather,并按顺序获取结果列表:
async def main_gather():
print("--- 使用 gather ---")
results = await asyncio.gather(task_a(), task_b(), task_c())
print(f"获取结果: {results}")
asyncio.run(main_gather())
使用 TaskGroup 的新写法
在新写法中,使用 tg.create_task() 创建任务,并在 with 块结束后通过任务对象获取结果:
async def main_taskgroup():
print("--- 使用 TaskGroup ---")
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task_a())
t2 = tg.create_task(task_b())
t3 = tg.create_task(task_c())
print(f"任务 1 结果: {t1.result()}")
print(f"任务 2 结果: {t2.result()}")
print(f"任务 3 结果: {t3.result()}")
asyncio.run(main_taskgroup())
核心机制:异常处理与自动取消
结构化并发最大的优势在于异常处理。当 TaskGroup 中的任意一个任务抛出异常时,组内其他正在运行或挂起的任务会被立即自动取消,无需手动编写取消逻辑。
修改 task_b 使其抛出异常:
async def task_b_fail():
print("任务 B (失败) 开始")
await asyncio.sleep(0.5)
raise RuntimeError("任务 B 遇到了错误")
编写 测试代码观察行为:
async def main_exception_test():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task_a())
t2 = tg.create_task(task_b_fail())
t3 = tg.create_task(task_c())
except ExceptionGroup as eg:
print(f"捕获到异常组: {eg}")
# 遍历异常组中的具体异常
for exc in eg.exceptions:
print(f"具体错误: {exc}")
# 检查任务状态
print(f"t1 状态 (已取消): {t1.cancelled()}")
print(f"t2 状态 (已完成): {t2.done()}")
print(f"t3 状态 (已取消): {t3.cancelled()}")
asyncio.run(main_exception_test())
在上述代码中,task_b_fail 抛出异常后,TaskGroup 会触发以下连锁反应:
- 立即取消
task_a和task_c。 async with块抛出ExceptionGroup(Python 3.11 新增的异常容器)。t1和t3的cancelled()状态为True,防止了资源泄漏或僵尸任务。
功能对比:Gather vs TaskGroup
为了更清晰地理解两者差异,请参考下表:
| 特性 | asyncio.gather | asyncio.TaskGroup |
|---|---|---|
| Python 版本要求 | 3.5+ (早期版本) | 3.11+ |
| 任务创建方式 | 直接传入协程列表 | 调用 tg.create_task() |
| 异常行为 | 默认传播第一个异常(除非 return_exceptions=True),其他任务可能继续在后台运行 |
自动取消组内所有剩余任务,抛出 ExceptionGroup |
| 结果获取 | 返回一个结果列表 | 通过 Task 对象的 .result() 方法获取 |
| 适用场景 | 简单的并行任务,任务间无依赖且允许部分失败 | 复杂的业务逻辑,要求原子性操作(全成或全败) |
迁移实战:重构现有代码
假设有一段使用 gather 且带有手动取消逻辑的复杂代码,目标是将其简化为 TaskGroup。
原始代码 (使用 gather 手动处理异常):
async def old_way():
tasks = [task_a(), task_b(), task_c()]
try:
results = await asyncio.gather(*tasks)
print(results)
except Exception as e:
print(f"发生错误: {e}")
# 手动取消尚未完成的任务
for t in asyncio.all_tasks():
if t is not asyncio.current_task() and not t.done():
t.cancel()
重构后代码 (使用 TaskGroup):
async def new_way():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task_a())
t2 = tg.create_task(task_b())
t3 = tg.create_task(task_c())
except* (RuntimeError) as e: # 注意:Python 3.11 引入了 except* 语法
print(f"捕获到特定错误: {e}")
# 代码运行至此,所有任务要么全完成,要么全取消,无需手动清理
注意 代码中的 except* 语法。这是 Python 3.11 专门配合 ExceptionGroup 引入的新语法,用于匹配异常组中的部分异常。如果只想捕获特定类型的异常而不中断程序,使用 except* 是最佳实践。
获取结果的策略
与 gather 直接返回列表不同,TaskGroup 更加灵活。如果你需要像列表一样按顺序收集结果,可以配合列表推导式使用。
创建 任务列表并收集结果:
async def main_ordered_results():
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(func()) for func in [task_a, task_b, task_c]]
# 此时所有任务已完成
results = [task.result() for task in tasks]
print(results)
asyncio.run(main_ordered_results())
这种方式既保留了 TaskGroup 的异常安全特性,又能获得类似 gather 的结果列表格式。
总结关键操作
- 引入
import asyncio。 - 替换
await asyncio.gather(...)为async with asyncio.TaskGroup() as tg:。 - 调用
tg.create_task(coroutine)启动任务。 - 访问
task_object.result()获取返回值。 - 使用
except* ExceptionGroup处理批量异常。

暂无评论,快来抢沙发吧!