Python asyncio gather和as_completed在并发控制上的行为差异
理解 gather 和 as_completed 在控制并发任务时的核心区别,能帮你精确选择最合适的方法,避免潜在的性能或逻辑陷阱。
核心差异一览
gather 像一个“齐步走”指挥官。 它将所有任务收集起来,要求它们一起开始执行,并阻塞等待直到所有任务都完成。任务的结果会按照你提交的顺序原样返回。
as_completed 像一个“抢单”调度器。 它接收一组任务,让它们并发执行,但哪个任务先完成就先返回哪个的结果。它返回的是一个按完成顺序产生结果的迭代器。
简单说,gather 保证顺序,强制等待全体;as_completed 关注效率,抢先处理结果。
行为详解与示例
1. asyncio.gather
调用 asyncio.gather(*coros) 会立即开始所有传入的协程,并返回一个代表所有结果的可等待对象。当你 await 这个对象时,当前协程会暂停,直到所有任务完成。
关键特性:
- 阻塞等待全体:必须等待最慢的那个任务完成,整个
gather调用才会返回。 - 结果顺序固定:返回的结果是一个元组,其顺序与传入协程的顺序严格一致。第 i 个结果对应第 i 个协程。
- 错误处理统一:如果任何一个任务抛出异常,
gather默认会立即重新引发该异常(取决于return_exceptions参数设置),导致整个等待过程中断。
import asyncio
import time
async def fetch_data(url, delay):
print(f‘开始获取 {url}...‘)
await asyncio.sleep(delay) # 模拟网络延迟
print(f‘完成获取 {url}‘)
return f‘{url} 的数据‘
async def main_gather():
# 任务按顺序提交:A(2秒), B(1秒), C(3秒)
tasks = [
fetch_data(‘A‘, 2),
fetch_data(‘B‘, 1),
fetch_data(‘C‘, 3)
]
print(‘使用 gather 开始并发...‘)
start = time.time()
# 阻塞等待所有任务完成,结果顺序固定:A, B, C
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f‘gather 所有结果: {results}‘)
print(f‘总耗时: {elapsed:.1f} 秒‘) # 接近最慢任务的耗时:3秒
asyncio.run(main_gather())
执行流程与输出:
gather瞬间启动 A、B、C 三个任务。- B (1秒) 最先完成,但
gather不会立刻返回 B 的结果,而是继续等待。 - A (2秒) 完成,仍在等待。
- C (3秒) 完成,此时所有任务结束,
gather才返回。 - 返回的结果列表严格按照提交顺序:
[‘A 的数据‘, ‘B 的数据‘, ‘C 的数据‘]。 - 总耗时约 3 秒(取决于最慢的任务)。
2. asyncio.as_completed
调用 asyncio.as_completed(coros) 返回一个迭代器。当你用 for 循环遍历这个迭代器时,循环会暂停在每次迭代上,直到下一个完成的任务产出结果。
关键特性:
- 抢先返回结果:迭代器会按照任务实际完成的先后顺序
yield出可等待对象。你可以立即await这个对象并处理结果,无需等待其他任务。 - 提升整体吞吐量:对于需要处理大量任务,且任务耗时差异大的场景,可以尽早开始处理已返回的结果,提高系统响应速度。
- 丢失原始顺序:结果返回的顺序是随机的,你无法直接知道哪个结果对应哪个初始任务(需要自行编码,例如将任务ID与结果一起返回)。
import asyncio
import time
async def main_as_completed():
tasks = [
fetch_data(‘A‘, 2),
fetch_data(‘B‘, 1),
fetch_data(‘C‘, 3)
]
print(‘使用 as_completed 开始并发...‘)
start = time.time()
# 获取一个按完成顺序产出结果的迭代器
for coro in asyncio.as_completed(tasks):
# 暂停,直到下一个任务完成
result = await coro
elapsed_part = time.time() - start
print(f‘及时处理到结果: {result},当前耗时: {elapsed_part:.1f} 秒‘)
total_elapsed = time.time() - start
print(f‘as_completed 处理完毕,总耗时: {total_elapsed:.1f} 秒‘) # 仍是3秒,但处理过程不同
asyncio.run(main_as_completed())
执行流程与输出:
as_completed启动所有任务,并返回一个迭代器。- 首先完成的是 B (1秒),迭代器立即
yieldB 的协程,循环await得到 B 的结果并处理。 - 接着完成的是 A (2秒),迭代器
yieldA 的协程,循环处理 A 的结果。 - 最后完成的是 C (3秒),迭代器
yieldC 的协程,循环处理 C 的结果。 - 结果的处理顺序是 B, A, C,与提交顺序不同。
- 总耗时依然是 3 秒,但用户(或系统)在 1 秒后就已经得到了第一个结果并开始处理。
对比总结
| 特性 | asyncio.gather |
asyncio.as_completed |
|---|---|---|
| 核心语义 | 等待全体,按序返回 | 抢先完成,按需处理 |
| 返回值 | 一个可等待对象,await 后得到结果列表/元组。 |
一个迭代器,遍历可得到按完成顺序的可等待对象。 |
| 结果顺序 | 严格保证输入协程的顺序。 | 不保证顺序,按实际完成时间排序。 |
| 阻塞行为 | 阻塞直到所有任务完成。 | 在每次迭代时阻塞,直到下一个任务完成。 |
| 适用场景 | 1. 任务结果需要按顺序使用或打包。<br>2. 必须获得所有结果后才能进行下一步逻辑。<br>3. 需要统一处理所有任务的异常(通过 return_exceptions)。 |
1. 任务耗时差异大,希望尽早处理已就绪的结果以提高响应速度。<br>2. 结果的处理相互独立,且不关心原始顺序。<br>3. 需要实现进度提示或实时反馈。 |
| 错误处理 | 默认任何一个异常会取消其他任务(除非 return_exceptions=True)。 |
遍历时 await 具体的协程才会抛出异常,需要自行处理以避免中断循环。 |
如何选择?
遵循这个决策树:
-
你需要所有任务的结果,并且结果的顺序至关重要吗?
- 是 → 使用
gather。例如,计算多个独立指标后按固定格式生成一份报告。 - 否 → 进入问题 2。
- 是 → 使用
-
你希望尽快处理任何一个已完成任务的结果,而不是傻等最慢的任务吗?
- 是 → 使用
as_completed。例如,爬取100个网页,只要有一个返回就可以开始解析;或监控多个服务状态,率先返回健康的就更新界面。 - 否 → 进入问题 3。
- 是 → 使用
-
你的主要目标是最大化并发性能,并且任务之间没有逻辑先后依赖吗?
- 是 → 两者在总完成时间上通常等价,选择更契合你结果处理逻辑的那个。
- 否 → 可能其他模式(如带信号量的
TaskGroup)更合适。
最后注意: 如果使用 as_completed 时又需要知道结果对应的是哪个初始任务(例如返回的 ‘数据‘ 需要知道是来自 URL A 还是 B),你需要在提交任务前就将任务ID、参数等信息“编码”到协程的返回值中。

暂无评论,快来抢沙发吧!