文章目录

Python asyncio.Semaphore在限制并发请求数中的应用

发布于 2026-04-19 22:16:21 · 浏览 6 次 · 评论 0 条

Python asyncio.Semaphore在限制并发请求数中的应用

当使用 asyncio 处理大量网络请求(如爬虫或API调用)时,为了追求极致速度,往往会创建成千上万个并发任务。如果不加限制,这种“瞬时爆发”的流量会瞬间耗尽本地带宽、导致目标服务器触发拒绝服务保护,甚至直接封禁IP。asyncio.Semaphore 是解决此类并发控制问题的核心工具,它相当于一个智能的“红绿灯”,能精确控制同时运行的任务数量。


核心概念:理解信号量

可以将 Semaphore 想象成一个只有固定数量车位的停车场。

  • 停车位:信号量允许的并发数( permits)。
  • 车辆:正在运行的异步任务。
  • 入场:任务获取信号量。如果车位已满,任务必须在门口排队等待(挂起),直到有车离开。
  • 出场:任务完成工作,释放信号量,腾出一个车位,允许排队中的下一个任务进入。

实施步骤:如何在代码中应用

以下步骤展示如何在 Python 异步程序中引入并发限制。

1. 定义并发上限

在代码开始部分,定义一个整型变量来设定最大并发数。这个数值通常取决于目标服务器的承受能力或本地网卡的吞吐量。

例如,将最大并发数设定为 5

MAX_CONCURRENT = 5

2. 初始化信号量对象

在主程序或任务生成函数中,创建一个 asyncio.Semaphore 实例,并将上一步定义的常量传入构造函数。

sem = asyncio.Semaphore(MAX_CONCURRENT)

3. 在任务函数中加锁

这是最关键的一步。你需要修改执行耗时操作(如网络请求)的异步函数。使用 async with 语句包裹具体的业务逻辑代码。

这种写法利用了上下文管理器,确保无论任务是成功完成还是抛出异常,信号量都能被正确释放,避免“死锁”。

async def fetch_data(task_id):
    # 尝试获取信号量:如果当前并发数已达上限,这里会挂起等待
    async with sem:
        print(f"任务 {task_id} 开始执行")
        # 模拟网络IO延迟,这里是实际并发受控的区域
        await asyncio.sleep(1) 
        print(f"任务 {task_id} 执行完毕")
    # 离开 with 代码块时,自动释放信号量

逻辑流程图解

为了更清晰地展示任务如何通过信号量控制,下图描述了多个任务同时到达时的处理流程。

graph TD A[新任务启动] --> B{尝试获取信号量} B -- "获取成功" --> C[进入执行区\n开始IO操作] B -- "获取失败\n已满员" --> D[进入等待队列\n挂起当前任务] C --> E[IO操作完成] E --> F[释放信号量] F --> G[通知队列] G --> D D --> B

完整代码示例

下面是一个完整的脚本,模拟了10个任务同时启动,但通过 Semaphore 限制同时只有3个任务在运行。

创建一个名为 semaphore_demo.py 的文件,并输入以下代码:

import asyncio
import time

# 1. 定义并发上限
MAX_CONCURRENT = 3

async def worker(worker_id):
    print(f"Worker {worker_id}: 等待执行...")
    start_time = time.time()

    # 2. 使用信号量限制并发
    async with sem:
        print(f"Worker {worker_id}: 开始工作 (当前活跃)")
        await asyncio.sleep(2)  # 模拟耗时操作
        print(f"Worker {worker_id}: 工作完成 (耗时: {time.time() - start_time:.2f}s)")

async def main():
    print("--- 任务开始 ---")
    start_time = time.time()

    # 创建10个任务,但只有3个能同时运行
    tasks = [worker(i) for i in range(1, 11)]
    await asyncio.gather(*tasks)

    print(f"--- 所有任务结束 (总耗时: {time.time() - start_time:.2f}s) ---")

if __name__ == "__main__":
    # 3. 初始化信号量
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    asyncio.run(main())

运行该脚本,观察输出。你会发现任务ID总是以3个为一组出现,而不是瞬间全部打印出来。


效果对比

为了更直观地理解 Semaphore 的作用,下表对比了使用与未使用信号量时的区别。

特性 未使用 Semaphore 使用 Semaphore
并发行为 所有任务几乎同时启动 任务分批次启动,受 MAX_CONCURRENT 限制
资源消耗 瞬间占用大量内存和文件句柄 资源占用平稳,维持在预设水平
目标压力 极高,极易触发服务器限流 可控,保护目标服务器稳定性
适用场景 任务量极少或任务内部有互斥锁 爬虫、批量API调用、高并发数据库操作

通过合理设置 asyncio.Semaphore,你既能享受异步编程带来的高效率,又能避免因过载而导致的程序崩溃或被封禁风险。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文