文章目录

Python 多进程:multiprocessing 模块与进程池

发布于 2026-04-07 04:43:07 · 浏览 11 次 · 评论 0 条

Python 多进程:multiprocessing 模块与进程池

Python 的 multiprocessing 模块用于绕过全局解释器锁(GIL)的限制,充分利用 计算机的多核 CPU 性能。本文提供从零到一的实操指南,指导你完成进程创建、数据通信、任务池管理及安全执行。


第一阶段:创建与启动独立进程

使用 multiprocessing.Process 类可手动控制 单个进程的完整生命周期。

  1. 导入 multiprocessing 模块中的 Process 类与 current_process 辅助函数。
  2. 定义 需要并行执行的目标函数。确保该函数只包含独立计算逻辑,避免 依赖全局变量修改。
  3. 实例化 Process 对象。通过 target 参数指定目标函数,通过 args 参数以元组形式传递位置参数。
  4. 调用 .start() 方法触发 进程执行。此时操作系统分配独立内存空间。
  5. 调用 .join() 方法阻塞 主程序,等待 子进程执行完毕后再继续运行后续代码。
import multiprocessing
import time

def worker(task_id):
    current_name = multiprocessing.current_process().name
    print(f"进程 {current_name} 正在执行任务 {task_id}")
    time.sleep(1)
    print(f"任务 {task_id} 执行完成")

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=worker, args=(1,))
    p2 = multiprocessing.Process(target=worker, args=(2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()
    print("所有任务已执行完毕")

第二阶段:实现进程间数据通信

独立进程拥有隔离的内存空间,禁止 直接读写同一变量。使用 multiprocessing.Queue 安全传递 数据。

  1. 创建 multiprocessing.Queue 对象。该对象底层使用管道与锁机制保证数据传输安全。
  2. Queue 对象作为参数传入 目标函数。
  3. 在子进程中调用 .put() 方法将计算结果写入 队列。
  4. 在主进程中调用 .get() 方法读取 队列中的数据。若队列为空,程序将默认阻塞等待
import multiprocessing

def compute_and_return(q, x, y):
    result = x * y
    q.put(result)  # 写入队列

if __name__ == '__main__':
    result_queue = multiprocessing.Queue()

    proc = multiprocessing.Process(
        target=compute_and_return, 
        args=(result_queue, 5, 6)
    )
    proc.start()

    final_value = result_queue.get()
    print(f"计算结果为: {final_value}")

    proc.join()

第三阶段:使用进程池批量管理任务

频繁创建与销毁进程会消耗大量系统资源。multiprocessing.Pool 进程池通过复用 预先创建的进程,显著提升 高并发任务的处理效率。

  1. 导入 Pool 类。
  2. 实例化 进程池对象。通过 processes 参数指定核心数量(通常设为 os.cpu_count())。若不传参,系统默认使用 当前机器的所有 CPU 核心数。
  3. 选择 合适的分发方法执行任务。参考下表对比核心方法特性:
方法名 执行模式 参数传递 返回值 适用场景
pool.map() 阻塞执行 仅支持单参数函数 按输入顺序返回列表 数据规整、只需按顺序收集结果的任务
pool.starmap() 阻塞执行 支持多参数(元组解包) 按输入顺序返回列表 目标函数需要接收多个独立参数
pool.apply_async() 异步非阻塞 支持多参数与回调函数 返回 AsyncResult 对象 需提交后立即执行其他逻辑,或需处理单个复杂任务
  1. 调用 .close() 方法关闭 进程池入口,禁止 再提交新任务。
  2. 调用 .join() 方法等待 池内所有任务执行结束,回收 所有工作进程资源。
import multiprocessing
import time

def heavy_calculation(n):
    total = sum(i * i for i in range(n))
    return total

if __name__ == '__main__':
    tasks = [100000, 200000, 300000, 400000]

    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(heavy_calculation, tasks)

        for t, res in zip(tasks, results):
            print(f"任务 {t} 结果: {res}")

第四阶段:异步任务处理与回调函数

当任务执行时间差异极大,或需要在主进程中实时响应任务状态时,使用异步提交模式。

  1. 定义 回调函数(Callback)。该函数必须接收一个参数(即异步任务的返回值)。
  2. 调用 .apply_async(target, args=..., callback=callback_func) 异步提交 任务。主程序不阻塞,继续向下执行。
  3. 调用 AsyncResult 对象的 .get() 方法主动获取 结果,或使用 .ready() 方法检查 任务是否完成。
  4. 验证 结果完整性。若任务内部抛出异常,.get()重新抛出 该异常,必须 使用 try...except 捕获。
import multiprocessing

def slow_task(val):
    import time
    time.sleep(val)
    return f"耗时 {val} 秒完成"

def on_finish(res):
    print(f"收到回调通知: {res}")

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)

    res_obj1 = pool.apply_async(slow_task, args=(2,), callback=on_finish)
    res_obj2 = pool.apply_async(slow_task, args=(1,), callback=on_finish)

    print("任务已提交,主程序继续运行其他逻辑...")

    print(f"获取结果1: {res_obj1.get()}")
    print(f"获取结果2: {res_obj2.get()}")

    pool.close()
    pool.join()

第五阶段:核心避坑指南与资源清理

多进程编程极易因环境差异或资源泄漏引发崩溃。严格遵循以下操作规范可彻底消除 常见故障。

  1. 添加 入口保护语句。在 Windows 与 macOS 的 spawn 启动模式下,子进程会重新导入主脚本。必须 将所有启动代码置于 if __name__ == '__main__': 代码块内,否则将触发无限递归创建进程的致命错误。
  2. 隔离 全局状态。多进程不共享 普通全局变量。使用 multiprocessing.Valuemultiprocessing.Array 替代基础类型,或完全依赖 QueuePipe 传递状态。
  3. 捕获 子进程异常。Pool.map.get() 默认不直接打印 子进程的具体错误堆栈。包裹 调用代码至 try...except Exception as e: 块中,或配置 独立日志记录器,以便快速定位 崩溃源头。
  4. 限制 进程数量。根据实际 CPU 核心数设置池大小。避免 创建远超物理核心数的进程,否则上下文切换开销将导致性能断崖式下降。
  5. 清理 僵尸进程。确保 每次 close()join() 均被正确调用。推荐优先使用 with multiprocessing.Pool() as pool: 上下文管理器语法,自动触发 资源释放逻辑,防止 脚本被强制中断时残留后台进程。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文