Python 协程任务:asyncio.create_task() 与 gather()
Python 的 asyncio 库通过协程实现了并发编程,但在实际开发中,如何正确调度和等待这些协程是核心难点。asyncio.create_task() 和 asyncio.gather() 是两个最常用的调度工具,前者负责将协程“发射”出去,后者负责批量管理和收集结果。
核心概念:从协程到任务
协程在 Python 中调用时不会立即执行,它只是返回一个协程对象。必须通过事件循环来调度运行。
- 定义 一个简单的异步函数。
import asyncio
import time
async def say_hello(delay, name):
await asyncio.sleep(delay)
print(f"Hello, {name}! (after {delay}s)")
return f"Result: {name}"
- 理解 任务的本质。
任务是对协程的封装,它将协程对象包装成一个可被事件循环调度的实体。只有将协程转化为任务,它才具备“并发运行”的能力。
使用 asyncio.create_task() 并发运行
当你需要让一个协程在后台立即开始运行,同时主程序继续执行其他逻辑时,使用 asyncio.create_task()。这相当于“点火即忘”,除非你显式等待它。
-
导入
asyncio模块。 -
调用
asyncio.create_task()传入协程对象。这会立即将协程加入事件循环调度队列,并返回一个
Task对象。 -
执行 其他耗时操作。
在此期间,刚才创建的任务正在后台并发运行。 -
使用
await task获取结果。
这一步会阻塞当前函数,直到该任务完成并返回结果。
代码示例:单任务调度
async def main_single():
print("1. 准备创建任务")
# 将协程封装为任务并立即调度
task = asyncio.create_task(say_hello(1, "Task A"))
print("2. 任务已创建,主程序继续执行其他操作...")
await asyncio.sleep(0.5) # 模拟其他操作
print("3. 主程序其他操作完成")
# 等待任务完成并获取返回值
result = await task
print(f"4. 任务结束,获取结果: {result}")
# 运行入口
# asyncio.run(main_single())
执行流程解析:
使用 asyncio.gather() 批量管理
当需要同时启动多个协程,并且必须等待它们全部完成后再统一处理结果时,使用 asyncio.gather()。这是最常用的并发模式。
-
构建 多个协程对象列表。
-
调用
asyncio.gather()并传入协程对象。
该函数会自动将传入的协程封装为任务,并并发运行它们。 -
等待 所有任务结束。
gather()会阻塞直到所有任务完成,并按传入顺序返回结果列表。
代码示例:批量并发
async def main_batch():
print("开始批量任务...")
start_time = time.time()
# 传入多个协程对象,gather 会自动调度
results = await asyncio.gather(
say_hello(1, "Batch A"),
say_hello(1, "Batch B"),
say_hello(1, "Batch C")
)
end_time = time.time()
print(f"所有任务完成,总耗时: {end_time - start_time:.2f}s")
print(f"结果列表: {results}")
# asyncio.run(main_batch())
关键特性说明:
- 并发执行:三个任务同时开始(假设耗时均为 1 秒),总耗时约 1 秒,而非 3 秒。
- 顺序保留:返回的
results列表顺序与传入gather()的参数顺序严格一致,无论哪个任务先执行完。
核心差异对比
create_task() 提供的是对单个任务的精细控制,而 gather() 提供的是对任务组的批量聚合。
以下是两者的详细对比:
| 特性 | asyncio.create_task() | asyncio.gather() |
|---|---|---|
| 主要用途 | 将单个协程包装为任务并立即调度 | 批量并发运行多个协程并收集结果 |
| 返回值 | 返回 Task 对象 |
返回包含所有结果的列表 |
| 执行时机 | 调用后立即加入事件循环调度 | 调用后立即调度所有传入的协程 |
| 控制粒度 | 细粒度,可对单个任务取消或添加回调 | 粗粒度,视为一个整体任务组 |
| 异常处理 | 需手动 await 或使用 try-except 捕获 |
默认第一个异常会向上抛出,终止其他任务 |
进阶操作:异常处理与任务取消
实际开发中,必须处理任务失败的情况。两者的处理方式有所不同。
1. 处理 create_task 的异常
如果直接 await task,任务内部的异常会抛出到主流程。
- 使用
try-except块包裹await语句。
async def task_with_error():
await asyncio.sleep(1)
raise ValueError("Something went wrong!")
async def handle_single_error():
task = asyncio.create_task(task_with_error())
try:
await task
except ValueError as e:
print(f"捕获到异常: {e}")
2. 处理 gather 的异常
默认情况下,gather() 中任一任务抛出异常,该异常会立即向上传播,且 gather() 本身会抛出异常,导致无法获取其他成功任务的结果。
- 设置
return_exceptions=True参数。
这会让异常不向外抛出,而是作为结果列表中的一个元素返回。
async def handle_batch_error():
results = await asyncio.gather(
say_hello(1, "Normal"),
task_with_error(), # 这个任务会失败
say_hello(1, "Normal 2"),
return_exceptions=True # 关键参数
)
for result in results:
if isinstance(result, Exception):
print(f"任务失败: {result}")
else:
print(f"任务成功: {result}")
组合应用:精细控制与批量等待
可以将 create_task() 创建的任务对象直接传递给 gather()。这种方式结合了二者的优势:既可以提前创建任务并赋予个性化设置(如设置名字或回调),又可以批量等待结果。
- 使用
create_task()创建任务并设置名称。 - 传递 任务对象给
gather()。
async def combined_usage():
# 1. 手动创建任务,可进行额外配置
task1 = asyncio.create_task(say_hello(1, "Combined A"), name="Task-Alpha")
task2 = asyncio.create_task(say_hello(1, "Combined B"), name="Task-Beta")
# 2. 将 Task 对象传入 gather
# gather 检测到已经是 Task 对象,就不会再次封装
results = await asyncio.gather(task1, task2)
print(f"组合模式结果: {results}")
最佳实践总结
-
区分 调度与等待。
如果需要在“等待”之前执行其他逻辑,必须使用create_task();如果只是单纯并发执行一批任务并等结果,优先使用gather()。 -
避免 忘记
await。
使用create_task()创建的任务如果没有被await,当主程序结束时,该任务可能会被强制取消而未执行完毕。 -
使用
return_exceptions=True容错。
在批量处理网络请求等易失败场景,开启此参数可防止单个请求失败导致整批任务结果丢失。 -
注意 任务上下文。
在 Python 3.7+ 中,确保在asyncio.run()或异步函数内部使用这两个方法,不要在同步代码中直接调用,否则会因为没有运行中的事件循环而报错。

暂无评论,快来抢沙发吧!