文章目录

Python 事件循环:asyncio.get_event_loop() 的使用

发布于 2026-04-13 07:24:12 · 浏览 28 次 · 评论 0 条

Python 事件循环:asyncio.get_event_loop() 的使用

在 Python 异步编程中,事件循环是核心调度机制。asyncio.get_event_loop() 曾是获取当前事件循环的标准方法,但在 Python 3.10 及更高版本中已被标记为废弃,并推荐使用 asyncio.run()。为了维护旧代码或理解底层机制,掌握其具体用法依然重要。


1. 理解事件循环与版本差异

事件循环负责调度和执行异步任务。在不同 Python 版本中,获取和管理循环的方式有所不同。

查看 下方表格,了解不同版本的推荐做法。

Python 版本 推荐获取方式 运行方式 状态
3.6 及以下 loop = asyncio.get_event_loop() loop.run_until_complete() 标准
3.7 - 3.9 asyncio.get_event_loop()asyncio.new_event_loop() asyncio.run(main()) 过渡期
3.10+ asyncio.run(main()) asyncio.run(main()) get_event_loop 已废弃

2. 使用 get_event_loop() 运行基础任务

在旧版本代码或特定环境下,你可以手动获取循环并运行任务。以下是具体操作步骤。

  1. 导入 asyncio 库。
  2. 定义 一个异步函数(协程)。
  3. 获取 当前线程的事件循环对象。
  4. 运行 协程直到它完成。
import asyncio

async def main_task():
    print("任务开始执行")
    await asyncio.sleep(1)
    print("任务执行完毕")

# 获取事件循环
loop = asyncio.get_event_loop()

# 运行任务
try:
    loop.run_until_complete(main_task())
finally:
    loop.close()

注意:在使用 run_until_complete 后,通常建议手动关闭循环以释放资源。


3. 在子线程或未设置循环时使用

如果在尚未设置事件循环的子线程中调用 get_event_loop(),它会自动创建一个新循环。在主线程中,如果没有设置,可能会抛出 RuntimeError

  1. 创建一个新的线程目标函数。
  2. 调用 asyncio.new_event_loop() 显式创建循环。
  3. 设置 该循环为当前线程的循环。
  4. 运行 异步代码。
import asyncio
import threading

def new_thread_worker():
    # 在子线程中,显式创建并设置新循环
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def task():
        print("子线程任务运行中")

    loop.run_until_complete(task())
    loop.close()

# 启动线程
thread = threading.Thread(target=new_thread_worker)
thread.start()
thread.join()

4. 现代替代方案:asyncio.run()

对于 Python 3.7+ 的应用,推荐直接使用高层封装 asyncio.run()。它自动处理循环的创建、运行、关闭以及调试器钩子。

  1. 定义 主入口协程。
  2. 调用 asyncio.run() 并传入协程对象。
import asyncio

async def modern_main():
    print("使用现代方式运行")

# 自动管理生命周期
asyncio.run(modern_main())

对比 手动管理:

  • 手动:需调用 get_event_looprun_until_completeclose
  • 自动:仅 asyncio.run 一行代码。

5. 事件循环执行流程

为了更直观地理解事件循环的执行过程,以下流程图描述了从获取循环到任务结束的生命周期。

graph TD Start(开始) --> Check{已有运行中的循环?} Check -- 否 --> Get[获取或创建新循环] Check -- 是 --> Running[获取当前运行循环] Get --> Run[运行直到任务完成] Running --> Run Run --> Task{任务是否抛出异常?} Task -- 是 --> Error[处理异常] Task -- 否 --> Success[正常结束] Error --> Close[关闭循环] Success --> Close Close --> End(结束)

6. 获取当前运行中的循环

当你在协程内部编写代码,且需要获取当前正在运行该协程的循环对象时,切勿使用 get_event_loop()(在某些情况下可能无法获取),而应使用 get_running_loop()

  1. 进入 一个异步函数内部。
  2. 调用 asyncio.get_running_loop()
import asyncio

async def inner_task():
    # 获取正在运行此代码的循环
    current_loop = asyncio.get_running_loop()
    print(f"当前正在使用的循环对象: {current_loop}")

asyncio.run(inner_task())

7. 处理常见错误

在使用 get_event_loop() 时,最常见的问题是 RuntimeError: There is no current event loop

原因:在 Python 3.10+ 中,如果主线程尚未设置事件循环,get_event_loop() 不再自动创建,而是直接报错。

解决方案

  • 方案 A:使用 asyncio.run() 启动程序。
  • 方案 B:在代码最开始显式创建并设置循环。
# 显式修复方案
import asyncio

# 手动创建并设置
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

async def my_task():
    print("修复后运行成功")

loop.run_until_complete(my_task())

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文