Python 异步生成器:async for 循环的使用
Python 中的 async for 循环用于遍历异步可迭代对象(asynchronous iterable),最常见的是异步生成器(asynchronous generator)。它允许你在等待 I/O 操作(如网络请求、文件读取)的同时,逐个获取数据,而不会阻塞整个程序。这在处理大量流式数据或高并发任务时非常高效。
理解异步生成器的基本结构
异步生成器是通过 async def 定义的函数,并在函数体内使用 yield 语句。与普通生成器不同,它返回一个异步生成器对象,只能在 async for 循环中使用。
定义一个异步生成器:
async def countdown(n):
while n > 0:
await asyncio.sleep(1) # 模拟异步等待
yield n
n -= 1
这段代码定义了一个从 n 倒数到 1 的异步生成器。每次 yield 前会暂停 1 秒,但不会阻塞其他协程。
使用 async for 遍历异步生成器
要消费异步生成器,必须在 async 函数中使用 async for 循环:
import asyncio
async def main():
async for value in countdown(3):
print(f"倒计时: {value}")
asyncio.run(main())
执行结果会每秒打印一行:
倒计时: 3
倒计时: 2
倒计时: 1
关键点:
- 不能在普通
for循环中直接使用异步生成器,会报错TypeError: 'async_generator' object is not iterable。 - 必须在
async def函数内部使用async for。 - 必须通过
asyncio.run()或事件循环启动主协程。
异步生成器 vs 普通生成器
| 特性 | 普通生成器 | 异步生成器 |
|---|---|---|
| 定义方式 | def func(): yield x |
async def func(): yield x |
| 遍历方式 | for x in gen(): |
async for x in gen(): |
是否支持 await |
❌ 不支持 | ✅ 支持 |
| 是否阻塞事件循环 | 否(但无法等待异步操作) | 否(可与其他协程并发) |
实际应用场景:异步读取文件行
假设你需要逐行读取一个大文件,但又不想一次性加载到内存,同时希望在读取过程中能处理其他任务(比如响应 Web 请求)。可以这样实现:
import asyncio
async def read_lines(filename):
with open(filename, 'r') as f:
for line in f:
await asyncio.sleep(0) # 让出控制权,避免长时间占用 CPU
yield line.strip()
async def process_file():
async for line in read_lines('data.txt'):
print(f"处理行: {line}")
# 可在此处进行异步处理,如发送到数据库
虽然文件 I/O 本身是同步的,但通过 await asyncio.sleep(0) 主动让出控制权,可以让其他协程有机会运行,实现“伪异步”协作。
注意:真正的异步文件 I/O 在标准库中支持有限(需用
aiofiles第三方库),但此模式适用于任何需要分批处理且希望不阻塞事件循环的场景。
错误处理:在 async for 中捕获异常
你可以在 async for 循环外部使用 try...except 捕获异步生成器抛出的异常:
async def risky_generator():
for i in range(3):
if i == 2:
raise ValueError("模拟错误")
yield i
async def main():
try:
async for x in risky_generator():
print(x)
except ValueError as e:
print(f"捕获异常: {e}")
输出:
0
1
捕获异常: 模拟错误
异步生成器中的异常会像普通生成器一样,在 yield 执行时抛出,并由调用方的 except 块捕获。
提前终止异步生成器
和普通生成器一样,如果 async for 循环提前退出(如 break 或异常),异步生成器会收到一个 GeneratorExit 信号,你可以用 try...finally 或 async with 来清理资源:
async def resource_generator():
print("打开资源")
try:
for i in range(5):
yield i
finally:
print("关闭资源")
async def main():
async for x in resource_generator():
print(x)
if x == 2:
break # 提前退出
输出:
打开资源
0
1
2
关闭资源
即使循环被 break 中断,finally 块仍会执行,确保资源正确释放。
常见错误与排查
-
错误:在非 async 函数中使用 async for
# 错误示例 for x in countdown(3): # TypeError pass解决:确保外层函数是
async def,并用asyncio.run()启动。 -
错误:忘记 await 异步操作
async def bad_gen(): time.sleep(1) # 阻塞整个事件循环! yield 1解决:使用
await asyncio.sleep(1)代替同步阻塞调用。 -
错误:混淆 async for 和普通 for
如果你有一个普通生成器,不要强行加async。只有当生成器内部需要await时,才使用异步生成器。
组合多个异步生成器
你可以将多个异步生成器组合成一个新的异步生成器:
async def combine(*gens):
for gen in gens:
async for item in gen:
yield item
async def main():
gen1 = countdown(2)
gen2 = countdown(2)
async for x in combine(gen1, gen2):
print(x)
输出:
2
1
2
1
这种模式适用于合并多个数据流(如多个 API 响应流)。
创建异步生成器时,始终问自己:是否需要在生成每个元素之间执行 await 操作?如果是,就用 async def + yield;否则,普通生成器更简单高效。使用 async for 时,确保它处于 async 函数上下文中,并通过事件循环驱动。

暂无评论,快来抢沙发吧!