asyncio.Event 是 Python asyncio 库中用于协程间简单通信的同步原语。它的作用类似于一个线程安全的标志位,允许一个或多个协程等待某个事件发生,直到另一个协程将该事件标志位设为“真”。这就好比比赛中的发令枪,裁判(主控协程)鸣枪(设置事件),所有运动员(等待协程)听到枪响后同时起跑(恢复执行)。
下面通过具体步骤和代码示例,演示如何利用 asyncio.Event 在协程间进行事件通知。
1. 理解核心方法
在使用之前,先掌握 asyncio.Event 对象的四个核心方法。
| 方法名 | 功能描述 | 典型使用场景 |
|---|---|---|
set() |
将内部标志设为 True。所有等待该事件的协程会被立即唤醒。 |
通知工作线程开始处理任务,或通知系统停止运行。 |
clear() |
将内部标志重置为 False。 |
在事件触发后,准备下一轮等待。 |
is_set() |
检查内部标志是否为 True。 |
非阻塞地检查状态,决定是否执行特定逻辑。 |
wait() |
阻塞当前协程,直到内部标志变为 True。 |
让后台任务挂起,等待主程序的“放行”信号。 |
2. 场景一:发令枪模式(同步启动)
这个场景模拟多个工作协程准备好后,需要等待主程序的一个信号才能同时开始工作。
- 导入
asyncio库。 - 定义 一个异步函数
worker,接收event对象和name参数。 - 编写 日志输出,显示某个工人已就绪。
- 调用
await event.wait()让当前协程在此处暂停,等待信号。 - 编写 信号触发后的执行逻辑。
import asyncio
async def worker(event, name):
print(f"{name}: 已就绪,等待信号...")
# 阻塞等待,直到 event.set() 被调用
await event.wait()
print(f"{name}: 收到信号,开始执行任务!")
async def main():
# 创建一个 Event 对象,初始状态为 False
start_event = asyncio.Event()
# 模拟创建 3 个工作协程,它们会并发运行并在 wait() 处挂起
task1 = asyncio.create_task(worker(start_event, "Worker-1"))
task2 = asyncio.create_task(worker(start_event, "Worker-2"))
task3 = asyncio.create_task(worker(start_event, "Worker-3"))
# 模拟主程序做一些准备工作
await asyncio.sleep(1)
print("主程序: 准备工作完成,发令枪响!")
# 触发事件,唤醒所有等待的协程
start_event.set()
# 等待所有任务完成
await asyncio.gather(task1, task2, task3)
if __name__ == "__main__":
asyncio.run(main())
执行上述代码,你会发现所有的 Worker 都会先打印“已就绪”,暂停约 1 秒后,在几乎同一瞬间打印“开始执行任务”。
3. 场景二:优雅停机模式(终止循环)
在长期运行的后台任务中,通常需要通过 Event 来控制任务的退出,而不是强制取消任务。
- 定义 一个无限循环的后台任务
background_task。 - 在循环内部,检查
event.is_set()的状态。 - 如果 为
True,跳出 循环结束任务。 - 如果 为
False,执行工作逻辑并短暂休眠。
import asyncio
async def background_task(stop_event):
print("后台任务: 启动中...")
while not stop_event.is_set():
print("后台任务: 正在处理数据...")
# 模拟工作耗时
await asyncio.sleep(0.5)
print("后台任务: 收到停止信号,清理资源并退出。")
async def main():
stop_event = asyncio.Event()
# 启动后台任务
bg = asyncio.create_task(background_task(stop_event))
# 主程序运行一段时间后决定停止
print("主程序: 运行 2 秒后停止...")
await asyncio.sleep(2)
print("主程序: 发送停止信号。")
stop_event.set()
# 等待后台任务彻底退出
await bg
if __name__ == "__main__":
asyncio.run(main())
此模式下,后台任务不会立即暴死,而是在当前循环周期检查标志位后,安全地退出循环,保证了资源的清理。
4. 进阶用法:结合 wait 超时机制
有时候我们不希望永久等待,而是希望“最多等 N 秒,如果没信号就去做别的事”。
- 使用
asyncio.wait_for包装event.wait()。 - 设置 超时时间参数。
import asyncio
async def patient_worker(event):
try:
print("尝试等待事件(最多2秒)...")
# 等待事件,但最多只等 2 秒
await asyncio.wait_for(event.wait(), timeout=2.0)
print("事件已触发!")
except asyncio.TimeoutError:
print("超时了,没有等到信号,执行备用方案。")
async def main():
event = asyncio.Event()
# 故意不设置 event.set(),触发超时
await patient_worker(event)
if __name__ == "__main__":
asyncio.run(main())
5. 协程交互流程可视化
为了更直观地理解协程与事件对象的交互时序,请参考以下流程图:
sequenceDiagram
participant Main as 主控协程
participant Event as Event对象
participant Worker as 工作协程
Note over Worker,Event: 初始状态: False
Worker->>Event: await wait()
Note over Worker: 挂起/阻塞
activate Worker
Main->>Main: 执行准备工作...
Main->>Event: set()
Note over Event: 状态变更为: True
Event-->>Worker: 唤醒
deactivate Worker
Note over Worker: 恢复执行
6. 实战中的注意事项
在实际编码中,请遵循以下规范以避免逻辑错误:
- 避免遗忘唤醒:确保在所有逻辑分支中(包括异常处理
try...finally),如果需要,都要调用event.set(),否则等待的协程可能会永久死锁。 - 不可重复使用:默认情况下,
Event是一种“一次性”触发器(类似枪响一次)。一旦set(),它将永远保持True,除非显式调用clear()重置它。如果需要周期性的“等待-通知-等待-通知”,请在每次循环开始前确认状态或重置。 - 广播特性:
Event.set()会唤醒所有正在等待该事件的协程。如果你只需要唤醒其中一个协程,请考虑使用asyncio.Queue或asyncio.Condition。

暂无评论,快来抢沙发吧!