文章目录

Python asyncio.TaskGroup管理并发任务的优雅方式

发布于 2026-05-29 00:21:09 · 浏览 28 次 · 评论 0 条

Python asyncio.TaskGroup管理并发任务的优雅方式

什么是TaskGroup

asyncio.TaskGroup 是 Python 3.11 引入的新特性,它提供了一种结构化的方式管理一组异步任务。传统的 asyncio.gather 或手动创建 Task 对象在错误处理、任务取消和生命周期管理方面存在不足,而 TaskGroup 通过上下文管理器自动处理这些细节。

核心优势:当组内任意一个任务抛出异常时,组内所有其他未完成的任务会被自动取消,并在退出上下文时统一抛出异常。这避免了资源泄漏和悬空任务。

基础用法:创建并执行一组任务

导入 asyncio 并定义几个简单的协程函数。

import asyncio

async def worker(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Worker {name} done in {delay}s"

使用 async with asyncio.TaskGroup() 创建组,然后用 create_task() 方法提交协程。

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(worker("A", 2))
        task2 = tg.create_task(worker("B", 1))
        task3 = tg.create_task(worker("C", 3))

    # 等待所有任务完成,结果可从task对象的result()获取
    results = [task1.result(), task2.result(), task3.result()]
    print(results)  # ['Worker A done in 2s', 'Worker B done in 1s', 'Worker C done in 3s']

asyncio.run(main())

关键点

  • create_task() 返回一个 Task 对象,你可以在 with 块之外通过 result() 获取返回值。
  • with 块结束时,TaskGroup 会等待所有已创建的任务完成(或抛出异常)。

错误处理:一个失败,全体取消

假设任务 worker("B", 1) 抛出异常,其他任务会立即被取消,并且异常在退出 with 块时重新抛出。

async def failing_worker(name: str, delay: float):
    await asyncio.sleep(delay)
    raise ValueError(f"{name} failed after {delay}s")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(worker("A", 3))
            tg.create_task(failing_worker("B", 1))
            tg.create_task(worker("C", 2))
    except* ValueError as eg:
        # ExceptionGroup会在退出时抛出,用except*捕获
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

asyncio.run(main())

行为

  • 任务B触发异常,TaskGroup立即取消未完成的A和C。
  • 退出上下文时抛出 ExceptionGroup(Python 3.11+ 异常组)。
  • 使用 except* 语法按类型捕获异常组中的异常。

任务超时:限制单个任务的最大执行时间

TaskGroup 本身不提供超时功能,但可以结合 asyncio.timeoutasyncio.wait_for 实现。

推荐:将超时逻辑封装在协程内部,或使用 asyncio.timeout 上下文管理器包裹 create_task

async def timed_worker(name: str, delay: float, timeout: float):
    try:
        async with asyncio.timeout(timeout):
            await asyncio.sleep(delay)
        return f"{name} completed"
    except asyncio.TimeoutError:
        return f"{name} timed out"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(timed_worker("A", 5, 2))  # 超时
        task2 = tg.create_task(timed_worker("B", 1, 3))  # 正常完成
        task3 = tg.create_task(timed_worker("C", 3, 10)) # 正常完成

    print(task1.result())  # "A timed out"
    print(task2.result())  # "B completed"
    print(task3.result())  # "C completed"

asyncio.run(main())

注意asyncio.timeout 在 Python 3.11 中引入,支持在事件循环中有效取消任务。

asyncio.gather 的对比

特性 asyncio.gather asyncio.TaskGroup
异常处理 默认返回异常对象,不自动取消其他任务 自动取消所有未完成任务,抛出 ExceptionGroup
任务生命周期 手动管理 Task 对象,可能泄漏 上下文管理器自动管理,退出即清理
嵌套使用 容易混乱 支持嵌套,结构清晰
返回结果 按输入顺序返回列表 通过 Task.result() 按需获取

何时用 TaskGroup

  • 需要“一个失败,全部取消”的原子性行为。
  • 希望代码结构清晰,避免忘记 await 或手动取消。
  • 处理多个可能失败的并发任务,并统一处理异常组。

何时用 gather

  • 旧版 Python(3.11以下)无法使用。
  • 需要 return_exceptions=True 获取所有结果而不取消其他任务。
  • 简单的批量执行,无需复杂取消逻辑。

嵌套 TaskGroup:分层管理

可以在一个 TaskGroup 内部再创建子 TaskGroup,实现任务组织的层次结构。

async def sub_worker(label: str):
    async with asyncio.TaskGroup() as inner_tg:
        inner_tg.create_task(worker(f"{label}-1", 0.5))
        inner_tg.create_task(worker(f"{label}-2", 1.0))

async def main():
    async with asyncio.TaskGroup() as outer_tg:
        outer_tg.create_task(sub_worker("X"))
        outer_tg.create_task(sub_worker("Y"))

asyncio.run(main())

效果:子组内的异常不会直接传播到外层,但子组作为外层的一个任务,若子组自身抛出异常(如内部异常未捕获),则外层会取消所有其他外层任务。

实际场景:并发 HTTP 请求,失败时快速放弃

结合 aiohttp 展示一个真实用例:

import aiohttp
import asyncio

async def fetch_url(session, url: str, timeout: float = 5):
    async with asyncio.timeout(timeout):
        async with session.get(url) as response:
            return await response.text()

async def check_urls(urls: list):
    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch_url(session, url)) for url in urls]

        # 如果任何一个请求失败,所有其他请求都会被取消
        results = {}
        for url, task in zip(urls, tasks):
            try:
                results[url] = task.result()
            except Exception as e:
                results[url] = f"Failed: {e}"
        return results

async def main():
    urls = [
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/10",  # 这个会被取消
        "https://httpbin.org/status/500" # 触发异常
    ]
    result = await check_urls(urls)
    for url, content in result.items():
        print(f"{url}: {content[:50]}...")

asyncio.run(main())

运行说明:第二个请求因第一个或第三个失败而被取消,第三个请求抛出 HTTP 500 异常,整个组被取消。

asyncio.CancelledError 的交互

TaskGroup 取消任务时,被取消的任务内部会收到 asyncio.CancelledError。你可以通过 try/finallyasyncio.shield 保护关键清理操作。

async def cleanup_worker():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("Worker cancelled, performing cleanup...")
        await asyncio.sleep(0.5)  # 模拟清理
        raise  # 必须重新抛出以通知TaskGroup取消完成

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(cleanup_worker())
        tg.create_task(worker("fast", 0.1))  # 快速完成并可能触发取消

注意:在 except CancelledError 中执行清理后,必须 raise 重新抛出异常,否则 TaskGroup 会认为任务正常完成,导致状态不一致。

避免常见陷阱

  1. 不要在 with 块外创建任务TaskGroup 只管理在其上下文内通过 create_task 创建的任务。
  2. 不要手动 cancel() 组内任务:这会导致任务状态混乱。应通过 TaskGroup 的异常机制或外部取消整个上下文。
  3. *不要忘记 `except**:捕获异常组需要使用except*,而不是普通的except Exception`。
  4. 任务结果获取顺序result() 会阻塞直到任务完成,但注意调用顺序不会改变任务完成顺序,使用 asyncio.as_completed 获取完成顺序时请小心与 TaskGroup 的交互,建议在 with 块外使用 as_completed 处理已完成任务。
async def main():
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(worker(i, 3 - i)) for i in range(3)]

    # 现在所有任务已完成,可以按任意顺序获取结果
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)
  1. Python 版本要求TaskGroupExceptionGroup 以及 except* 均需要 Python 3.11 或更高。如果使用旧版本,临时方案是使用 trioTaskGroup 或回退到 gather

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文