文章目录

Python 协程任务:asyncio.create_task() 与 gather()

发布于 2026-04-06 12:16:56 · 浏览 14 次 · 评论 0 条

Python 协程任务:asyncio.create_task() 与 gather()

Python 的 asyncio 库通过协程实现了并发编程,但在实际开发中,如何正确调度和等待这些协程是核心难点。asyncio.create_task()asyncio.gather() 是两个最常用的调度工具,前者负责将协程“发射”出去,后者负责批量管理和收集结果。


核心概念:从协程到任务

协程在 Python 中调用时不会立即执行,它只是返回一个协程对象。必须通过事件循环来调度运行。

  1. 定义 一个简单的异步函数。
import asyncio
import time

async def say_hello(delay, name):
    await asyncio.sleep(delay)
    print(f"Hello, {name}! (after {delay}s)")
    return f"Result: {name}"
  1. 理解 任务的本质。
    任务是对协程的封装,它将协程对象包装成一个可被事件循环调度的实体。只有将协程转化为任务,它才具备“并发运行”的能力。

使用 asyncio.create_task() 并发运行

当你需要让一个协程在后台立即开始运行,同时主程序继续执行其他逻辑时,使用 asyncio.create_task()。这相当于“点火即忘”,除非你显式等待它。

  1. 导入 asyncio 模块。

  2. 调用 asyncio.create_task() 传入协程对象。

    这会立即将协程加入事件循环调度队列,并返回一个 Task 对象。

  3. 执行 其他耗时操作。
    在此期间,刚才创建的任务正在后台并发运行。

  4. 使用 await task 获取结果。
    这一步会阻塞当前函数,直到该任务完成并返回结果。

代码示例:单任务调度

async def main_single():
    print("1. 准备创建任务")
    # 将协程封装为任务并立即调度
    task = asyncio.create_task(say_hello(1, "Task A"))

    print("2. 任务已创建,主程序继续执行其他操作...")
    await asyncio.sleep(0.5) # 模拟其他操作
    print("3. 主程序其他操作完成")

    # 等待任务完成并获取返回值
    result = await task
    print(f"4. 任务结束,获取结果: {result}")

# 运行入口
# asyncio.run(main_single())

执行流程解析

graph LR A["Start main_single()"] --> B["Create Task A"] B --> C["Print: 任务已创建"] C --> D["Sleep 0.5s (Main)"] B --> E["Sleep 1s (Task A)"] D --> F["Print: 主程序操作完成"] E --> G["Print: Hello Task A"] F --> H["Await Task"] G --> H H --> I["Get Result & End"]

使用 asyncio.gather() 批量管理

当需要同时启动多个协程,并且必须等待它们全部完成后再统一处理结果时,使用 asyncio.gather()。这是最常用的并发模式。

  1. 构建 多个协程对象列表。

  2. 调用 asyncio.gather() 并传入协程对象。
    该函数会自动将传入的协程封装为任务,并并发运行它们。

  3. 等待 所有任务结束。
    gather() 会阻塞直到所有任务完成,并按传入顺序返回结果列表。

代码示例:批量并发

async def main_batch():
    print("开始批量任务...")
    start_time = time.time()

    # 传入多个协程对象,gather 会自动调度
    results = await asyncio.gather(
        say_hello(1, "Batch A"),
        say_hello(1, "Batch B"),
        say_hello(1, "Batch C")
    )

    end_time = time.time()
    print(f"所有任务完成,总耗时: {end_time - start_time:.2f}s")
    print(f"结果列表: {results}")

# asyncio.run(main_batch())

关键特性说明

  • 并发执行:三个任务同时开始(假设耗时均为 1 秒),总耗时约 1 秒,而非 3 秒。
  • 顺序保留:返回的 results 列表顺序与传入 gather() 的参数顺序严格一致,无论哪个任务先执行完。

核心差异对比

create_task() 提供的是对单个任务的精细控制,而 gather() 提供的是对任务组的批量聚合。

以下是两者的详细对比:

特性 asyncio.create_task() asyncio.gather()
主要用途 将单个协程包装为任务并立即调度 批量并发运行多个协程并收集结果
返回值 返回 Task 对象 返回包含所有结果的列表
执行时机 调用后立即加入事件循环调度 调用后立即调度所有传入的协程
控制粒度 细粒度,可对单个任务取消或添加回调 粗粒度,视为一个整体任务组
异常处理 需手动 await 或使用 try-except 捕获 默认第一个异常会向上抛出,终止其他任务

进阶操作:异常处理与任务取消

实际开发中,必须处理任务失败的情况。两者的处理方式有所不同。

1. 处理 create_task 的异常

如果直接 await task,任务内部的异常会抛出到主流程。

  1. 使用 try-except 块包裹 await 语句。
async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong!")

async def handle_single_error():
    task = asyncio.create_task(task_with_error())

    try:
        await task
    except ValueError as e:
        print(f"捕获到异常: {e}")

2. 处理 gather 的异常

默认情况下,gather() 中任一任务抛出异常,该异常会立即向上传播,且 gather() 本身会抛出异常,导致无法获取其他成功任务的结果。

  1. 设置 return_exceptions=True 参数。
    这会让异常不向外抛出,而是作为结果列表中的一个元素返回。
async def handle_batch_error():
    results = await asyncio.gather(
        say_hello(1, "Normal"),
        task_with_error(), # 这个任务会失败
        say_hello(1, "Normal 2"),
        return_exceptions=True # 关键参数
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"任务失败: {result}")
        else:
            print(f"任务成功: {result}")

组合应用:精细控制与批量等待

可以将 create_task() 创建的任务对象直接传递给 gather()。这种方式结合了二者的优势:既可以提前创建任务并赋予个性化设置(如设置名字或回调),又可以批量等待结果。

  1. 使用 create_task() 创建任务并设置名称。
  2. 传递 任务对象给 gather()
async def combined_usage():
    # 1. 手动创建任务,可进行额外配置
    task1 = asyncio.create_task(say_hello(1, "Combined A"), name="Task-Alpha")
    task2 = asyncio.create_task(say_hello(1, "Combined B"), name="Task-Beta")

    # 2. 将 Task 对象传入 gather
    # gather 检测到已经是 Task 对象,就不会再次封装
    results = await asyncio.gather(task1, task2)

    print(f"组合模式结果: {results}")

最佳实践总结

  1. 区分 调度与等待。
    如果需要在“等待”之前执行其他逻辑,必须使用 create_task();如果只是单纯并发执行一批任务并等结果,优先使用 gather()

  2. 避免 忘记 await
    使用 create_task() 创建的任务如果没有被 await,当主程序结束时,该任务可能会被强制取消而未执行完毕。

  3. 使用 return_exceptions=True 容错。
    在批量处理网络请求等易失败场景,开启此参数可防止单个请求失败导致整批任务结果丢失。

  4. 注意 任务上下文。
    在 Python 3.7+ 中,确保在 asyncio.run() 或异步函数内部使用这两个方法,不要在同步代码中直接调用,否则会因为没有运行中的事件循环而报错。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文