Python asyncio.TaskGroup管理并发任务的优雅方式
什么是TaskGroup
asyncio.TaskGroup 是 Python 3.11 引入的新特性,它提供了一种结构化的方式管理一组异步任务。传统的 asyncio.gather 或手动创建 Task 对象在错误处理、任务取消和生命周期管理方面存在不足,而 TaskGroup 通过上下文管理器自动处理这些细节。
核心优势:当组内任意一个任务抛出异常时,组内所有其他未完成的任务会被自动取消,并在退出上下文时统一抛出异常。这避免了资源泄漏和悬空任务。
基础用法:创建并执行一组任务
导入 asyncio 并定义几个简单的协程函数。
import asyncio
async def worker(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Worker {name} done in {delay}s"
使用 async with asyncio.TaskGroup() 创建组,然后用 create_task() 方法提交协程。
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(worker("A", 2))
task2 = tg.create_task(worker("B", 1))
task3 = tg.create_task(worker("C", 3))
# 等待所有任务完成,结果可从task对象的result()获取
results = [task1.result(), task2.result(), task3.result()]
print(results) # ['Worker A done in 2s', 'Worker B done in 1s', 'Worker C done in 3s']
asyncio.run(main())
关键点:
create_task()返回一个Task对象,你可以在with块之外通过result()获取返回值。- 当
with块结束时,TaskGroup会等待所有已创建的任务完成(或抛出异常)。
错误处理:一个失败,全体取消
假设任务 worker("B", 1) 抛出异常,其他任务会立即被取消,并且异常在退出 with 块时重新抛出。
async def failing_worker(name: str, delay: float):
await asyncio.sleep(delay)
raise ValueError(f"{name} failed after {delay}s")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("A", 3))
tg.create_task(failing_worker("B", 1))
tg.create_task(worker("C", 2))
except* ValueError as eg:
# ExceptionGroup会在退出时抛出,用except*捕获
for exc in eg.exceptions:
print(f"Caught: {exc}")
asyncio.run(main())
行为:
- 任务B触发异常,TaskGroup立即取消未完成的A和C。
- 退出上下文时抛出
ExceptionGroup(Python 3.11+ 异常组)。 - 使用
except*语法按类型捕获异常组中的异常。
任务超时:限制单个任务的最大执行时间
TaskGroup 本身不提供超时功能,但可以结合 asyncio.timeout 或 asyncio.wait_for 实现。
推荐:将超时逻辑封装在协程内部,或使用 asyncio.timeout 上下文管理器包裹 create_task。
async def timed_worker(name: str, delay: float, timeout: float):
try:
async with asyncio.timeout(timeout):
await asyncio.sleep(delay)
return f"{name} completed"
except asyncio.TimeoutError:
return f"{name} timed out"
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(timed_worker("A", 5, 2)) # 超时
task2 = tg.create_task(timed_worker("B", 1, 3)) # 正常完成
task3 = tg.create_task(timed_worker("C", 3, 10)) # 正常完成
print(task1.result()) # "A timed out"
print(task2.result()) # "B completed"
print(task3.result()) # "C completed"
asyncio.run(main())
注意:asyncio.timeout 在 Python 3.11 中引入,支持在事件循环中有效取消任务。
与 asyncio.gather 的对比
| 特性 | asyncio.gather |
asyncio.TaskGroup |
|---|---|---|
| 异常处理 | 默认返回异常对象,不自动取消其他任务 | 自动取消所有未完成任务,抛出 ExceptionGroup |
| 任务生命周期 | 手动管理 Task 对象,可能泄漏 | 上下文管理器自动管理,退出即清理 |
| 嵌套使用 | 容易混乱 | 支持嵌套,结构清晰 |
| 返回结果 | 按输入顺序返回列表 | 通过 Task.result() 按需获取 |
何时用 TaskGroup:
- 需要“一个失败,全部取消”的原子性行为。
- 希望代码结构清晰,避免忘记
await或手动取消。 - 处理多个可能失败的并发任务,并统一处理异常组。
何时用 gather:
- 旧版 Python(3.11以下)无法使用。
- 需要
return_exceptions=True获取所有结果而不取消其他任务。 - 简单的批量执行,无需复杂取消逻辑。
嵌套 TaskGroup:分层管理
可以在一个 TaskGroup 内部再创建子 TaskGroup,实现任务组织的层次结构。
async def sub_worker(label: str):
async with asyncio.TaskGroup() as inner_tg:
inner_tg.create_task(worker(f"{label}-1", 0.5))
inner_tg.create_task(worker(f"{label}-2", 1.0))
async def main():
async with asyncio.TaskGroup() as outer_tg:
outer_tg.create_task(sub_worker("X"))
outer_tg.create_task(sub_worker("Y"))
asyncio.run(main())
效果:子组内的异常不会直接传播到外层,但子组作为外层的一个任务,若子组自身抛出异常(如内部异常未捕获),则外层会取消所有其他外层任务。
实际场景:并发 HTTP 请求,失败时快速放弃
结合 aiohttp 展示一个真实用例:
import aiohttp
import asyncio
async def fetch_url(session, url: str, timeout: float = 5):
async with asyncio.timeout(timeout):
async with session.get(url) as response:
return await response.text()
async def check_urls(urls: list):
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(session, url)) for url in urls]
# 如果任何一个请求失败,所有其他请求都会被取消
results = {}
for url, task in zip(urls, tasks):
try:
results[url] = task.result()
except Exception as e:
results[url] = f"Failed: {e}"
return results
async def main():
urls = [
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/10", # 这个会被取消
"https://httpbin.org/status/500" # 触发异常
]
result = await check_urls(urls)
for url, content in result.items():
print(f"{url}: {content[:50]}...")
asyncio.run(main())
运行说明:第二个请求因第一个或第三个失败而被取消,第三个请求抛出 HTTP 500 异常,整个组被取消。
与 asyncio.CancelledError 的交互
当 TaskGroup 取消任务时,被取消的任务内部会收到 asyncio.CancelledError。你可以通过 try/finally 或 asyncio.shield 保护关键清理操作。
async def cleanup_worker():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("Worker cancelled, performing cleanup...")
await asyncio.sleep(0.5) # 模拟清理
raise # 必须重新抛出以通知TaskGroup取消完成
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(cleanup_worker())
tg.create_task(worker("fast", 0.1)) # 快速完成并可能触发取消
注意:在 except CancelledError 中执行清理后,必须 raise 重新抛出异常,否则 TaskGroup 会认为任务正常完成,导致状态不一致。
避免常见陷阱
- 不要在
with块外创建任务:TaskGroup只管理在其上下文内通过create_task创建的任务。 - 不要手动
cancel()组内任务:这会导致任务状态混乱。应通过TaskGroup的异常机制或外部取消整个上下文。 - *不要忘记 `except
**:捕获异常组需要使用except*,而不是普通的except Exception`。 - 任务结果获取顺序:
result()会阻塞直到任务完成,但注意调用顺序不会改变任务完成顺序,使用asyncio.as_completed获取完成顺序时请小心与TaskGroup的交互,建议在with块外使用as_completed处理已完成任务。
async def main():
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker(i, 3 - i)) for i in range(3)]
# 现在所有任务已完成,可以按任意顺序获取结果
for task in asyncio.as_completed(tasks):
result = await task
print(result)
- Python 版本要求:
TaskGroup和ExceptionGroup以及except*均需要 Python 3.11 或更高。如果使用旧版本,临时方案是使用trio的TaskGroup或回退到gather。

暂无评论,快来抢沙发吧!