文章目录

Python asyncio.TaskGroup取代gather的结构化并发

发布于 2026-05-05 11:19:29 · 浏览 15 次 · 评论 0 条

Python asyncio.TaskGroup取代gather的结构化并发

在 Python 异步编程中,管理多个并发任务长期以来依赖于 asyncio.gather。然而,gather 在处理异常和任务取消时存在局限性,往往需要编写大量样板代码来确保“要么全做,要么全不做”。Python 3.11 引入了 asyncio.TaskGroup,正式带来了原生支持的结构化并发。它利用上下文管理器自动管理任务生命周期,当内部任务失败时自动取消其余任务,极大地简化了异步代码的编写。


准备工作

确认 Python 版本。TaskGroup 是 Python 3.11 引入的标准库功能,旧版本无法使用。

运行 以下命令检查版本:

python --version

如果输出低于 3.11,升级 Python 环境。

基础用法:从 Gather 迁移

TaskGroup 最核心的改变是使用 async with 语句块来定义任务的作用域。在作用域结束时,Python 会等待所有任务完成。

定义 三个模拟耗时的异步函数:

import asyncio

async def task_a():
    print("任务 A 开始")
    await asyncio.sleep(1)
    print("任务 A 结束")
    return "结果 A"

async def task_b():
    print("任务 B 开始")
    await asyncio.sleep(2)
    print("任务 B 结束")
    return "结果 B"

async def task_c():
    print("任务 C 开始")
    await asyncio.sleep(1)
    print("任务 C 结束")
    return "结果 C"

使用 gather 的旧写法

在旧代码中,你需要手动将协程对象列表传给 gather,并按顺序获取结果列表:

async def main_gather():
    print("--- 使用 gather ---")
    results = await asyncio.gather(task_a(), task_b(), task_c())
    print(f"获取结果: {results}")

asyncio.run(main_gather())

使用 TaskGroup 的新写法

在新写法中,使用 tg.create_task() 创建任务,并在 with 块结束后通过任务对象获取结果:

async def main_taskgroup():
    print("--- 使用 TaskGroup ---")
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(task_a())
        t2 = tg.create_task(task_b())
        t3 = tg.create_task(task_c())

    print(f"任务 1 结果: {t1.result()}")
    print(f"任务 2 结果: {t2.result()}")
    print(f"任务 3 结果: {t3.result()}")

asyncio.run(main_taskgroup())

核心机制:异常处理与自动取消

结构化并发最大的优势在于异常处理。当 TaskGroup 中的任意一个任务抛出异常时,组内其他正在运行或挂起的任务会被立即自动取消,无需手动编写取消逻辑。

修改 task_b 使其抛出异常:

async def task_b_fail():
    print("任务 B (失败) 开始")
    await asyncio.sleep(0.5)
    raise RuntimeError("任务 B 遇到了错误")

编写 测试代码观察行为:

async def main_exception_test():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(task_a())
            t2 = tg.create_task(task_b_fail())
            t3 = tg.create_task(task_c())
    except ExceptionGroup as eg:
        print(f"捕获到异常组: {eg}")
        # 遍历异常组中的具体异常
        for exc in eg.exceptions:
            print(f"具体错误: {exc}")

    # 检查任务状态
    print(f"t1 状态 (已取消): {t1.cancelled()}")
    print(f"t2 状态 (已完成): {t2.done()}")
    print(f"t3 状态 (已取消): {t3.cancelled()}")

asyncio.run(main_exception_test())

在上述代码中,task_b_fail 抛出异常后,TaskGroup 会触发以下连锁反应:

  1. 立即取消 task_atask_c
  2. async with 块抛出 ExceptionGroup(Python 3.11 新增的异常容器)。
  3. t1t3cancelled() 状态为 True,防止了资源泄漏或僵尸任务。

功能对比:Gather vs TaskGroup

为了更清晰地理解两者差异,请参考下表:

特性 asyncio.gather asyncio.TaskGroup
Python 版本要求 3.5+ (早期版本) 3.11+
任务创建方式 直接传入协程列表 调用 tg.create_task()
异常行为 默认传播第一个异常(除非 return_exceptions=True),其他任务可能继续在后台运行 自动取消组内所有剩余任务,抛出 ExceptionGroup
结果获取 返回一个结果列表 通过 Task 对象的 .result() 方法获取
适用场景 简单的并行任务,任务间无依赖且允许部分失败 复杂的业务逻辑,要求原子性操作(全成或全败)

迁移实战:重构现有代码

假设有一段使用 gather 且带有手动取消逻辑的复杂代码,目标是将其简化为 TaskGroup

原始代码 (使用 gather 手动处理异常):

async def old_way():
    tasks = [task_a(), task_b(), task_c()]
    try:
        results = await asyncio.gather(*tasks)
        print(results)
    except Exception as e:
        print(f"发生错误: {e}")
        # 手动取消尚未完成的任务
        for t in asyncio.all_tasks():
            if t is not asyncio.current_task() and not t.done():
                t.cancel()

重构后代码 (使用 TaskGroup):

async def new_way():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(task_a())
            t2 = tg.create_task(task_b())
            t3 = tg.create_task(task_c())
    except* (RuntimeError) as e:  # 注意:Python 3.11 引入了 except* 语法
        print(f"捕获到特定错误: {e}")

    # 代码运行至此,所有任务要么全完成,要么全取消,无需手动清理

注意 代码中的 except* 语法。这是 Python 3.11 专门配合 ExceptionGroup 引入的新语法,用于匹配异常组中的部分异常。如果只想捕获特定类型的异常而不中断程序,使用 except* 是最佳实践。


获取结果的策略

gather 直接返回列表不同,TaskGroup 更加灵活。如果你需要像列表一样按顺序收集结果,可以配合列表推导式使用。

创建 任务列表并收集结果:

async def main_ordered_results():
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(func()) for func in [task_a, task_b, task_c]]

    # 此时所有任务已完成
    results = [task.result() for task in tasks]
    print(results)

asyncio.run(main_ordered_results())

这种方式既保留了 TaskGroup 的异常安全特性,又能获得类似 gather 的结果列表格式。

总结关键操作

  1. 引入 import asyncio
  2. 替换 await asyncio.gather(...)async with asyncio.TaskGroup() as tg:
  3. 调用 tg.create_task(coroutine) 启动任务。
  4. 访问 task_object.result() 获取返回值。
  5. 使用 except* ExceptionGroup 处理批量异常。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文