Python 进程池:concurrent.futures.ProcessPoolExecutor
Python 的 concurrent.futures.ProcessPoolExecutor 是一个用于并行执行 CPU 密集型任务的工具。它通过创建多个独立进程(而非线程)来绕过全局解释器锁(GIL),从而真正实现多核并行计算。当你有一组相互独立、计算量大的任务时,使用它能显著缩短总运行时间。
判断是否该用 ProcessPoolExecutor
检查你的任务是否同时满足以下两个条件:
- 任务是 CPU 密集型(如数值计算、图像处理、加密解密),而不是 I/O 密集型(如下载文件、读写数据库)。
- 各个子任务之间 没有共享状态依赖,可以完全独立运行。
如果满足,就继续;否则应考虑 ThreadPoolExecutor 或其他异步方案。
基础用法:三步启动并行任务
-
导入模块:
from concurrent.futures import ProcessPoolExecutor -
定义一个可被并行调用的函数。该函数必须位于模块顶层(不能是嵌套函数或 lambda),且所有参数和返回值必须能被
pickle序列化。def square(n): return n * n -
提交任务并获取结果:
if __name__ == "__main__": numbers = [1, 2, 3, 4, 5] with ProcessPoolExecutor() as executor: results = list(executor.map(square, numbers)) print(results) # 输出: [1, 4, 9, 16, 25]
注意:必须将主程序入口包裹在
if __name__ == "__main__":中,否则在 Windows 或 macOS 上会因进程重复启动而报错或死循环。
控制进程数量
默认情况下,ProcessPoolExecutor() 会创建与 CPU 核心数相等的进程。你可以通过 max_workers 参数手动指定:
with ProcessPoolExecutor(max_workers=4) as executor:
...
建议:不要设置超过 CPU 物理核心数太多,否则会因进程切换开销反而降低性能。可通过 os.cpu_count() 获取核心数:
import os
max_workers = os.cpu_count()
处理异常与超时
当子进程中发生异常,map() 会在你尝试获取结果时抛出。若需单独处理每个任务的结果或异常,改用 submit() 和 as_completed():
-
提交多个任务:
from concurrent.futures import as_completed import time def risky_task(x): if x == 3: raise ValueError("x cannot be 3") time.sleep(0.1) return x ** 2 -
遍历完成的任务并处理结果或异常:
if __name__ == "__main__": with ProcessPoolExecutor() as executor: futures = [executor.submit(risky_task, i) for i in range(5)] for future in as_completed(futures): try: result = future.result(timeout=1) # 最多等待1秒 print(f"Got {result}") except ValueError as e: print(f"Task failed: {e}") except TimeoutError: print("Task timed out")
future.result(timeout=1) 中的 timeout 参数单位为秒,超时会抛出 TimeoutError。
性能对比:串行 vs 并行
假设有一个耗时计算函数:
def cpu_bound_task(n):
total = 0
for i in range(n * 1000000):
total += i
return total
分别用串行和并行方式执行:
import time
from concurrent.futures import ProcessPoolExecutor
def run_serial():
start = time.time()
results = [cpu_bound_task(i) for i in range(1, 5)]
print(f"Serial time: {time.time() - start:.2f}s")
def run_parallel():
start = time.time()
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_bound_task, range(1, 5)))
print(f"Parallel time: {time.time() - start:.2f}s")
if __name__ == "__main__":
run_serial()
run_parallel()
在 4 核机器上,run_parallel() 的耗时通常接近 run_serial() 的 1/4。实际加速比受任务粒度、进程启动开销等因素影响。
注意事项与常见陷阱
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 程序卡住无输出 | 在非 __main__ 模块中直接创建 ProcessPoolExecutor |
所有使用必须放在 if __name__ == "__main__": 块内 |
报错 Can't pickle... |
函数、类或参数无法序列化(如 lambda、局部函数、未定义类) | 使用顶层定义的函数,确保参数是基本类型或可 pickle 对象 |
| 内存占用过高 | 每个进程复制了主进程的内存空间 | 避免在主进程中加载大型数据后再启动进程池;尽量让子进程自己加载所需数据 |
| 性能无提升甚至变慢 | 任务太轻量,进程创建和通信开销大于计算收益 | 合并小任务,确保单个任务执行时间 > 10ms |
实战示例:批量处理图像
假设你要将一批图片转为灰度图,使用 Pillow 库:
from PIL import Image
import os
from concurrent.futures import ProcessPoolExecutor
def convert_to_grayscale(input_path, output_path):
with Image.open(input_path) as img:
gray_img = img.convert("L")
gray_img.save(output_path)
def batch_convert(input_dir, output_dir):
os.makedirs(output_dir, exist_ok=True)
files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
tasks = []
for f in files:
inp = os.path.join(input_dir, f)
out = os.path.join(output_dir, f)
tasks.append((inp, out))
with ProcessPoolExecutor() as executor:
executor.map(lambda args: convert_to_grayscale(*args), tasks)
调用方式:
if __name__ == "__main__":
batch_convert("photos/", "photos_gray/")
此例中,每个图片处理任务独立,适合并行。注意使用 lambda 包装是为了适配 map() 的单参数要求,但 lambda 本身不可 pickle——幸运的是,此处 lambda 在主模块顶层定义,且仅引用全局函数,因此在 CPython 中通常可工作。为更稳妥,可改写为:
def _wrapper(args):
return convert_to_grayscale(*args)
# 然后在 map 中使用 _wrapper
executor.map(_wrapper, tasks)
何时不该用 ProcessPoolExecutor
- 任务间需要频繁通信:进程间通信(IPC)成本高,应改用
multiprocessing.Manager或共享内存,但复杂度大增。 - 任务非常轻量(如简单加法):进程启动和数据序列化开销远大于计算本身。
- 运行环境限制:某些托管环境(如部分云函数)禁止创建子进程。
此时应优先考虑向量化计算(如 NumPy)、协程(asyncio)或线程池。
启动你的 CPU 密集型任务前,先用小数据测试并行效果,再决定是否投入生产。

暂无评论,快来抢沙发吧!