Python 异步IO:asyncio 事件循环与任务管理
Python 的 asyncio 模块让你能用协程(coroutine)高效处理大量 I/O 密集型任务,比如网络请求、文件读写等。它的核心是事件循环(event loop),负责调度和运行协程。理解事件循环和任务(Task)的管理机制,是掌握异步编程的关键。
启动事件循环并运行一个协程
最简单的异步程序从一个协程开始:
- 定义一个异步函数(用
async def声明)。 - 获取当前线程的默认事件循环。
- 运行该协程直到完成。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 获取事件循环并运行协程
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
注意:在 Python 3.7+ 中,推荐直接使用
asyncio.run(),它会自动创建并关闭事件循环:
asyncio.run(hello())
优先使用 asyncio.run() 启动主协程,避免手动管理事件循环生命周期。
创建和调度多个任务
单个协程只能顺序执行。要并发运行多个操作,需将协程封装为 Task。Task 是事件循环中可调度的“工作单元”。
- 使用
asyncio.create_task()将协程转为任务。这会立即把任务排入事件循环队列,但不会立刻执行。 - await 任务以等待其结果(或异常)。
import asyncio
async def fetch_data(delay, name):
print(f"Start {name}")
await asyncio.sleep(delay)
print(f"End {name}")
return f"Data from {name}"
async def main():
# 创建两个任务
task1 = asyncio.create_task(fetch_data(2, "A"))
task2 = asyncio.create_task(fetch_data(1, "B"))
# 等待两个任务完成
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())
输出顺序为:
Start A
Start B
End B
End A
Data from A Data from B
因为两个任务被同时调度,B 虽然后创建,但延迟更短,先完成。
并发等待多个任务完成
如果不想逐个 await 任务,可以使用 asyncio.gather() 一次性等待多个任务。
- 传入多个协程或任务给
gather()。 - await 返回的聚合对象,它会在所有任务完成后返回结果列表。
async def main():
results = await asyncio.gather(
fetch_data(2, "A"),
fetch_data(1, "B"),
fetch_data(1.5, "C")
)
print(results) # ['Data from A', 'Data from B', 'Data from C']
gather()会保持结果顺序与传入参数一致,无论实际完成顺序如何。
若任一任务抛出异常,gather() 会立即传播该异常,其他任务可能仍在后台运行(除非设置 return_exceptions=True)。
控制任务的超时与取消
长时间运行的任务可能需要超时控制或手动取消。
设置超时
使用 asyncio.wait_for() 为任务添加时间限制:
async def main():
try:
result = await asyncio.wait_for(fetch_data(5, "Slow"), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Task took too long!")
2 秒后抛出 TimeoutError,原任务会被自动取消。
手动取消任务
任务对象有 .cancel() 方法:
async def main():
task = asyncio.create_task(fetch_data(10, "Long"))
await asyncio.sleep(1) # 等1秒
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
调用 task.cancel() 只是请求取消,实际取消发生在下一次 await 点。任务内部可通过捕获 CancelledError 做清理。
并发运行但不等待全部完成
有时你只想启动任务,不关心结果(如日志上报、心跳检测)。此时不要 await 任务,但需确保程序运行足够久让任务执行。
async def background_job():
await asyncio.sleep(1)
print("Background done")
async def main():
# 启动后台任务但不等待
asyncio.create_task(background_job())
await asyncio.sleep(2) # 主协程多睡一会,让后台任务有机会运行
asyncio.run(main())
如果主协程太快结束,事件循环会关闭,未完成的任务会被丢弃。
理解事件循环的工作方式
事件循环本质上是一个无限循环,不断检查是否有可执行的协程:
- 当遇到
await,当前协程暂停,控制权交还给事件循环。 - 事件循环选择下一个就绪的协程继续执行。
- I/O 操作(如
sleep、网络请求)由底层系统通知事件循环何时可继续。
这意味着:所有异步代码必须在同一个事件循环中运行。跨线程或进程需特殊处理(如 loop.call_soon_threadsafe)。
常见陷阱与最佳实践
| 问题 | 正确做法 |
|---|---|
| 在非异步函数中直接调用协程 | ❌ hello()<br>✅ asyncio.run(hello()) 或在 async 函数中 await hello() |
忘记 await 任务 |
❌ asyncio.create_task(...) 后无 await<br>✅ 至少保留引用并在适当时候 await 或使用 asyncio.shield() |
| 阻塞事件循环 | ❌ 在协程中使用 time.sleep()<br>✅ 使用 await asyncio.sleep() |
多次调用 asyncio.run() |
❌ 在同一个程序中多次调用<br>✅ 整个应用只调用一次 asyncio.run() |
永远不要在协程中执行阻塞操作(如 requests.get、time.sleep、大循环计算),否则整个事件循环会卡住。
对于 CPU 密集型任务,应使用 asyncio.to_thread()(Python 3.9+)或 concurrent.futures 将工作交给线程池:
import time
def blocking_io():
time.sleep(2)
return "Done"
async def main():
result = await asyncio.to_thread(blocking_io)
print(result)
监控与调试任务
开发时可列出当前所有任务:
async def debug_tasks():
all_tasks = asyncio.all_tasks()
for task in all_tasks:
print(f"Task: {task.get_name()}, done: {task.done()}")
也可为任务命名便于追踪:
task = asyncio.create_task(fetch_data(1, "X"), name="fetch_X")
安全关闭事件循环
使用 asyncio.run() 时,循环会在主协程结束后自动关闭并清理所有任务。
若手动管理循环(不推荐),需显式关闭:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
但绝大多数情况只需 asyncio.run()。
组合使用:超时 + 并发 + 错误处理
实际项目常需组合多种机制:
async def robust_fetch(url, timeout=5):
try:
# 假设有 async_http_get 协程
data = await asyncio.wait_for(async_http_get(url), timeout)
return data
except asyncio.TimeoutError:
return f"Timeout for {url}"
except Exception as e:
return f"Error for {url}: {e}"
async def main():
urls = ["http://a.com", "http://b.com", "http://c.com"]
tasks = [robust_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
这种方式能并发请求多个 URL,每个都有独立超时,错误不会中断整体流程。
合理使用 asyncio.create_task() 启动并发任务,用 gather 或 wait_for 控制执行逻辑,避免阻塞调用,即可高效构建高吞吐的 I/O 应用。

暂无评论,快来抢沙发吧!