文章目录

Python 进程池:concurrent.futures.ProcessPoolExecutor

发布于 2026-04-02 11:27:15 · 浏览 10 次 · 评论 0 条

Python 进程池:concurrent.futures.ProcessPoolExecutor

Python 的 concurrent.futures.ProcessPoolExecutor 是一个用于并行执行 CPU 密集型任务的工具。它通过创建多个独立进程(而非线程)来绕过全局解释器锁(GIL),从而真正实现多核并行计算。当你有一组相互独立、计算量大的任务时,使用它能显著缩短总运行时间。


判断是否该用 ProcessPoolExecutor

检查你的任务是否同时满足以下两个条件:

  1. 任务是 CPU 密集型(如数值计算、图像处理、加密解密),而不是 I/O 密集型(如下载文件、读写数据库)。
  2. 各个子任务之间 没有共享状态依赖,可以完全独立运行。

如果满足,就继续;否则应考虑 ThreadPoolExecutor 或其他异步方案。


基础用法:三步启动并行任务

  1. 导入模块:

    from concurrent.futures import ProcessPoolExecutor
  2. 定义一个可被并行调用的函数。该函数必须位于模块顶层(不能是嵌套函数或 lambda),且所有参数和返回值必须能被 pickle 序列化。

    def square(n):
        return n * n
  3. 提交任务并获取结果:

    if __name__ == "__main__":
        numbers = [1, 2, 3, 4, 5]
        with ProcessPoolExecutor() as executor:
            results = list(executor.map(square, numbers))
        print(results)  # 输出: [1, 4, 9, 16, 25]

注意:必须将主程序入口包裹在 if __name__ == "__main__": 中,否则在 Windows 或 macOS 上会因进程重复启动而报错或死循环。


控制进程数量

默认情况下,ProcessPoolExecutor() 会创建与 CPU 核心数相等的进程。你可以通过 max_workers 参数手动指定:

with ProcessPoolExecutor(max_workers=4) as executor:
    ...

建议:不要设置超过 CPU 物理核心数太多,否则会因进程切换开销反而降低性能。可通过 os.cpu_count() 获取核心数:

import os
max_workers = os.cpu_count()

处理异常与超时

当子进程中发生异常,map() 会在你尝试获取结果时抛出。若需单独处理每个任务的结果或异常,改用 submit()as_completed()

  1. 提交多个任务:

    from concurrent.futures import as_completed
    import time
    
    def risky_task(x):
        if x == 3:
            raise ValueError("x cannot be 3")
        time.sleep(0.1)
        return x ** 2
  2. 遍历完成的任务并处理结果或异常:

    if __name__ == "__main__":
        with ProcessPoolExecutor() as executor:
            futures = [executor.submit(risky_task, i) for i in range(5)]
            for future in as_completed(futures):
                try:
                    result = future.result(timeout=1)  # 最多等待1秒
                    print(f"Got {result}")
                except ValueError as e:
                    print(f"Task failed: {e}")
                except TimeoutError:
                    print("Task timed out")

future.result(timeout=1) 中的 timeout 参数单位为秒,超时会抛出 TimeoutError


性能对比:串行 vs 并行

假设有一个耗时计算函数:

def cpu_bound_task(n):
    total = 0
    for i in range(n * 1000000):
        total += i
    return total

分别用串行和并行方式执行:

import time
from concurrent.futures import ProcessPoolExecutor

def run_serial():
    start = time.time()
    results = [cpu_bound_task(i) for i in range(1, 5)]
    print(f"Serial time: {time.time() - start:.2f}s")

def run_parallel():
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(cpu_bound_task, range(1, 5)))
    print(f"Parallel time: {time.time() - start:.2f}s")

if __name__ == "__main__":
    run_serial()
    run_parallel()

在 4 核机器上,run_parallel() 的耗时通常接近 run_serial() 的 1/4。实际加速比受任务粒度、进程启动开销等因素影响。


注意事项与常见陷阱

问题 原因 解决方案
程序卡住无输出 在非 __main__ 模块中直接创建 ProcessPoolExecutor 所有使用必须放在 if __name__ == "__main__": 块内
报错 Can't pickle... 函数、类或参数无法序列化(如 lambda、局部函数、未定义类) 使用顶层定义的函数,确保参数是基本类型或可 pickle 对象
内存占用过高 每个进程复制了主进程的内存空间 避免在主进程中加载大型数据后再启动进程池;尽量让子进程自己加载所需数据
性能无提升甚至变慢 任务太轻量,进程创建和通信开销大于计算收益 合并小任务,确保单个任务执行时间 > 10ms

实战示例:批量处理图像

假设你要将一批图片转为灰度图,使用 Pillow 库:

from PIL import Image
import os
from concurrent.futures import ProcessPoolExecutor

def convert_to_grayscale(input_path, output_path):
    with Image.open(input_path) as img:
        gray_img = img.convert("L")
        gray_img.save(output_path)

def batch_convert(input_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    tasks = []
    for f in files:
        inp = os.path.join(input_dir, f)
        out = os.path.join(output_dir, f)
        tasks.append((inp, out))

    with ProcessPoolExecutor() as executor:
        executor.map(lambda args: convert_to_grayscale(*args), tasks)

调用方式:

if __name__ == "__main__":
    batch_convert("photos/", "photos_gray/")

此例中,每个图片处理任务独立,适合并行。注意使用 lambda 包装是为了适配 map() 的单参数要求,但 lambda 本身不可 pickle——幸运的是,此处 lambda 在主模块顶层定义,且仅引用全局函数,因此在 CPython 中通常可工作。为更稳妥,可改写为:

def _wrapper(args):
    return convert_to_grayscale(*args)

# 然后在 map 中使用 _wrapper
executor.map(_wrapper, tasks)

何时不该用 ProcessPoolExecutor

  • 任务间需要频繁通信:进程间通信(IPC)成本高,应改用 multiprocessing.Manager 或共享内存,但复杂度大增。
  • 任务非常轻量(如简单加法):进程启动和数据序列化开销远大于计算本身。
  • 运行环境限制:某些托管环境(如部分云函数)禁止创建子进程。

此时应优先考虑向量化计算(如 NumPy)、协程(asyncio)或线程池。

启动你的 CPU 密集型任务前,先用小数据测试并行效果,再决定是否投入生产。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文