文章目录

Python asynciogather和as_completed在并发控制上的行为差异

发布于 2026-06-03 21:48:51 · 浏览 21 次 · 评论 0 条

Python asyncio gather和as_completed在并发控制上的行为差异

理解 gatheras_completed 在控制并发任务时的核心区别,能帮你精确选择最合适的方法,避免潜在的性能或逻辑陷阱。


核心差异一览

gather 像一个“齐步走”指挥官。 它将所有任务收集起来,要求它们一起开始执行,并阻塞等待直到所有任务都完成。任务的结果会按照你提交的顺序原样返回。
as_completed 像一个“抢单”调度器。 它接收一组任务,让它们并发执行,但哪个任务先完成就先返回哪个的结果。它返回的是一个按完成顺序产生结果的迭代器。

简单说,gather 保证顺序,强制等待全体;as_completed 关注效率,抢先处理结果。


行为详解与示例

1. asyncio.gather

调用 asyncio.gather(*coros) 会立即开始所有传入的协程,并返回一个代表所有结果的可等待对象。当你 await 这个对象时,当前协程会暂停,直到所有任务完成。

关键特性:

  • 阻塞等待全体:必须等待最慢的那个任务完成,整个 gather 调用才会返回。
  • 结果顺序固定:返回的结果是一个元组,其顺序与传入协程的顺序严格一致。第 i 个结果对应第 i 个协程。
  • 错误处理统一:如果任何一个任务抛出异常,gather 默认会立即重新引发该异常(取决于 return_exceptions 参数设置),导致整个等待过程中断。
import asyncio
import time

async def fetch_data(url, delay):
    print(f‘开始获取 {url}...‘)
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f‘完成获取 {url}‘)
    return f‘{url} 的数据‘

async def main_gather():
    # 任务按顺序提交:A(2秒), B(1秒), C(3秒)
    tasks = [
        fetch_data(‘A‘, 2),
        fetch_data(‘B‘, 1),
        fetch_data(‘C‘, 3)
    ]

    print(‘使用 gather 开始并发...‘)
    start = time.time()
    # 阻塞等待所有任务完成,结果顺序固定:A, B, C
    results = await asyncio.gather(*tasks)
    elapsed = time.time() - start

    print(f‘gather 所有结果: {results}‘)
    print(f‘总耗时: {elapsed:.1f} 秒‘) # 接近最慢任务的耗时:3秒

asyncio.run(main_gather())

执行流程与输出:

  1. gather 瞬间启动 A、B、C 三个任务。
  2. B (1秒) 最先完成,但 gather 不会立刻返回 B 的结果,而是继续等待。
  3. A (2秒) 完成,仍在等待。
  4. C (3秒) 完成,此时所有任务结束gather 才返回。
  5. 返回的结果列表严格按照提交顺序:[‘A 的数据‘, ‘B 的数据‘, ‘C 的数据‘]
  6. 总耗时约 3 秒(取决于最慢的任务)。

2. asyncio.as_completed

调用 asyncio.as_completed(coros) 返回一个迭代器。当你用 for 循环遍历这个迭代器时,循环会暂停在每次迭代上,直到下一个完成的任务产出结果

关键特性:

  • 抢先返回结果:迭代器会按照任务实际完成的先后顺序 yield 出可等待对象。你可以立即 await 这个对象并处理结果,无需等待其他任务。
  • 提升整体吞吐量:对于需要处理大量任务,且任务耗时差异大的场景,可以尽早开始处理已返回的结果,提高系统响应速度。
  • 丢失原始顺序:结果返回的顺序是随机的,你无法直接知道哪个结果对应哪个初始任务(需要自行编码,例如将任务ID与结果一起返回)。
import asyncio
import time

async def main_as_completed():
    tasks = [
        fetch_data(‘A‘, 2),
        fetch_data(‘B‘, 1),
        fetch_data(‘C‘, 3)
    ]

    print(‘使用 as_completed 开始并发...‘)
    start = time.time()

    # 获取一个按完成顺序产出结果的迭代器
    for coro in asyncio.as_completed(tasks):
        # 暂停,直到下一个任务完成
        result = await coro
        elapsed_part = time.time() - start
        print(f‘及时处理到结果: {result},当前耗时: {elapsed_part:.1f} 秒‘)

    total_elapsed = time.time() - start
    print(f‘as_completed 处理完毕,总耗时: {total_elapsed:.1f} 秒‘) # 仍是3秒,但处理过程不同

asyncio.run(main_as_completed())

执行流程与输出:

  1. as_completed 启动所有任务,并返回一个迭代器。
  2. 首先完成的是 B (1秒),迭代器立即 yield B 的协程,循环 await 得到 B 的结果并处理。
  3. 接着完成的是 A (2秒),迭代器 yield A 的协程,循环处理 A 的结果。
  4. 最后完成的是 C (3秒),迭代器 yield C 的协程,循环处理 C 的结果。
  5. 结果的处理顺序是 B, A, C,与提交顺序不同。
  6. 总耗时依然是 3 秒,但用户(或系统)在 1 秒后就已经得到了第一个结果并开始处理。

对比总结

特性 asyncio.gather asyncio.as_completed
核心语义 等待全体,按序返回 抢先完成,按需处理
返回值 一个可等待对象,await 后得到结果列表/元组。 一个迭代器,遍历可得到按完成顺序的可等待对象。
结果顺序 严格保证输入协程的顺序。 不保证顺序,按实际完成时间排序。
阻塞行为 阻塞直到所有任务完成。 在每次迭代时阻塞,直到下一个任务完成。
适用场景 1. 任务结果需要按顺序使用或打包。<br>2. 必须获得所有结果后才能进行下一步逻辑。<br>3. 需要统一处理所有任务的异常(通过 return_exceptions)。 1. 任务耗时差异大,希望尽早处理已就绪的结果以提高响应速度。<br>2. 结果的处理相互独立,且不关心原始顺序。<br>3. 需要实现进度提示或实时反馈。
错误处理 默认任何一个异常会取消其他任务(除非 return_exceptions=True)。 遍历时 await 具体的协程才会抛出异常,需要自行处理以避免中断循环。

如何选择?

遵循这个决策树:

  1. 你需要所有任务的结果,并且结果的顺序至关重要吗?

    • 使用 gather。例如,计算多个独立指标后按固定格式生成一份报告。
    • → 进入问题 2。
  2. 你希望尽快处理任何一个已完成任务的结果,而不是傻等最慢的任务吗?

    • 使用 as_completed。例如,爬取100个网页,只要有一个返回就可以开始解析;或监控多个服务状态,率先返回健康的就更新界面。
    • → 进入问题 3。
  3. 你的主要目标是最大化并发性能,并且任务之间没有逻辑先后依赖吗?

    • → 两者在总完成时间上通常等价,选择更契合你结果处理逻辑的那个。
    • → 可能其他模式(如带信号量的 TaskGroup)更合适。

最后注意: 如果使用 as_completed 时又需要知道结果对应的是哪个初始任务(例如返回的 ‘数据‘ 需要知道是来自 URL A 还是 B),你需要在提交任务前就将任务ID、参数等信息“编码”到协程的返回值中。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文