文章目录

Python 异步IO:asyncio 事件循环与任务管理

发布于 2026-04-02 16:40:17 · 浏览 11 次 · 评论 0 条

Python 异步IO:asyncio 事件循环与任务管理

Python 的 asyncio 模块让你能用协程(coroutine)高效处理大量 I/O 密集型任务,比如网络请求、文件读写等。它的核心是事件循环(event loop),负责调度和运行协程。理解事件循环和任务(Task)的管理机制,是掌握异步编程的关键。


启动事件循环并运行一个协程

最简单的异步程序从一个协程开始:

  1. 定义一个异步函数(用 async def 声明)。
  2. 获取当前线程的默认事件循环。
  3. 运行该协程直到完成。
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 获取事件循环并运行协程
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())

注意:在 Python 3.7+ 中,推荐直接使用 asyncio.run(),它会自动创建并关闭事件循环:

asyncio.run(hello())

优先使用 asyncio.run() 启动主协程,避免手动管理事件循环生命周期。


创建和调度多个任务

单个协程只能顺序执行。要并发运行多个操作,需将协程封装为 Task。Task 是事件循环中可调度的“工作单元”。

  1. 使用 asyncio.create_task() 将协程转为任务。这会立即把任务排入事件循环队列,但不会立刻执行。
  2. await 任务以等待其结果(或异常)。
import asyncio

async def fetch_data(delay, name):
    print(f"Start {name}")
    await asyncio.sleep(delay)
    print(f"End {name}")
    return f"Data from {name}"

async def main():
    # 创建两个任务
    task1 = asyncio.create_task(fetch_data(2, "A"))
    task2 = asyncio.create_task(fetch_data(1, "B"))

    # 等待两个任务完成
    result1 = await task1
    result2 = await task2
    print(result1, result2)

asyncio.run(main())

输出顺序为:

Start A
Start B
End B
End A
Data from A Data from B

因为两个任务被同时调度,B 虽然后创建,但延迟更短,先完成。


并发等待多个任务完成

如果不想逐个 await 任务,可以使用 asyncio.gather() 一次性等待多个任务

  1. 传入多个协程或任务gather()
  2. await 返回的聚合对象,它会在所有任务完成后返回结果列表。
async def main():
    results = await asyncio.gather(
        fetch_data(2, "A"),
        fetch_data(1, "B"),
        fetch_data(1.5, "C")
    )
    print(results)  # ['Data from A', 'Data from B', 'Data from C']

gather()保持结果顺序与传入参数一致,无论实际完成顺序如何。

若任一任务抛出异常,gather() 会立即传播该异常,其他任务可能仍在后台运行(除非设置 return_exceptions=True)。


控制任务的超时与取消

长时间运行的任务可能需要超时控制或手动取消。

设置超时

使用 asyncio.wait_for() 为任务添加时间限制

async def main():
    try:
        result = await asyncio.wait_for(fetch_data(5, "Slow"), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Task took too long!")

2 秒后抛出 TimeoutError,原任务会被自动取消。

手动取消任务

任务对象有 .cancel() 方法:

async def main():
    task = asyncio.create_task(fetch_data(10, "Long"))

    await asyncio.sleep(1)  # 等1秒
    task.cancel()           # 取消任务

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

调用 task.cancel() 只是请求取消,实际取消发生在下一次 await 点。任务内部可通过捕获 CancelledError 做清理。


并发运行但不等待全部完成

有时你只想启动任务,不关心结果(如日志上报、心跳检测)。此时不要 await 任务,但需确保程序运行足够久让任务执行。

async def background_job():
    await asyncio.sleep(1)
    print("Background done")

async def main():
    # 启动后台任务但不等待
    asyncio.create_task(background_job())

    await asyncio.sleep(2)  # 主协程多睡一会,让后台任务有机会运行

asyncio.run(main())

如果主协程太快结束,事件循环会关闭,未完成的任务会被丢弃。


理解事件循环的工作方式

事件循环本质上是一个无限循环,不断检查是否有可执行的协程:

  • 当遇到 await,当前协程暂停,控制权交还给事件循环。
  • 事件循环选择下一个就绪的协程继续执行。
  • I/O 操作(如 sleep、网络请求)由底层系统通知事件循环何时可继续。

这意味着:所有异步代码必须在同一个事件循环中运行。跨线程或进程需特殊处理(如 loop.call_soon_threadsafe)。


常见陷阱与最佳实践

问题 正确做法
在非异步函数中直接调用协程 hello()<br>✅ asyncio.run(hello()) 或在 async 函数中 await hello()
忘记 await 任务 asyncio.create_task(...) 后无 await<br>✅ 至少保留引用并在适当时候 await 或使用 asyncio.shield()
阻塞事件循环 ❌ 在协程中使用 time.sleep()<br>✅ 使用 await asyncio.sleep()
多次调用 asyncio.run() ❌ 在同一个程序中多次调用<br>✅ 整个应用只调用一次 asyncio.run()

永远不要在协程中执行阻塞操作(如 requests.gettime.sleep、大循环计算),否则整个事件循环会卡住。

对于 CPU 密集型任务,应使用 asyncio.to_thread()(Python 3.9+)或 concurrent.futures 将工作交给线程池:

import time

def blocking_io():
    time.sleep(2)
    return "Done"

async def main():
    result = await asyncio.to_thread(blocking_io)
    print(result)

监控与调试任务

开发时可列出当前所有任务:

async def debug_tasks():
    all_tasks = asyncio.all_tasks()
    for task in all_tasks:
        print(f"Task: {task.get_name()}, done: {task.done()}")

也可为任务命名便于追踪:

task = asyncio.create_task(fetch_data(1, "X"), name="fetch_X")

安全关闭事件循环

使用 asyncio.run() 时,循环会在主协程结束后自动关闭并清理所有任务。

若手动管理循环(不推荐),需显式关闭:

loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

绝大多数情况只需 asyncio.run()


组合使用:超时 + 并发 + 错误处理

实际项目常需组合多种机制:

async def robust_fetch(url, timeout=5):
    try:
        # 假设有 async_http_get 协程
        data = await asyncio.wait_for(async_http_get(url), timeout)
        return data
    except asyncio.TimeoutError:
        return f"Timeout for {url}"
    except Exception as e:
        return f"Error for {url}: {e}"

async def main():
    urls = ["http://a.com", "http://b.com", "http://c.com"]
    tasks = [robust_fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(r)

这种方式能并发请求多个 URL每个都有独立超时错误不会中断整体流程


合理使用 asyncio.create_task() 启动并发任务 gather wait_for 控制执行逻辑避免阻塞调用,即可高效构建高吞吐的 I/O 应用。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文