Python 多进程:multiprocessing 模块与进程池
Python 的 multiprocessing 模块用于绕过全局解释器锁(GIL)的限制,充分利用 计算机的多核 CPU 性能。本文提供从零到一的实操指南,指导你完成进程创建、数据通信、任务池管理及安全执行。
第一阶段:创建与启动独立进程
使用 multiprocessing.Process 类可手动控制 单个进程的完整生命周期。
- 导入
multiprocessing模块中的Process类与current_process辅助函数。 - 定义 需要并行执行的目标函数。确保该函数只包含独立计算逻辑,避免 依赖全局变量修改。
- 实例化
Process对象。通过target参数指定目标函数,通过args参数以元组形式传递位置参数。 - 调用
.start()方法触发 进程执行。此时操作系统分配独立内存空间。 - 调用
.join()方法阻塞 主程序,等待 子进程执行完毕后再继续运行后续代码。
import multiprocessing
import time
def worker(task_id):
current_name = multiprocessing.current_process().name
print(f"进程 {current_name} 正在执行任务 {task_id}")
time.sleep(1)
print(f"任务 {task_id} 执行完成")
if __name__ == '__main__':
p1 = multiprocessing.Process(target=worker, args=(1,))
p2 = multiprocessing.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
print("所有任务已执行完毕")
第二阶段:实现进程间数据通信
独立进程拥有隔离的内存空间,禁止 直接读写同一变量。使用 multiprocessing.Queue 安全传递 数据。
- 创建
multiprocessing.Queue对象。该对象底层使用管道与锁机制保证数据传输安全。 - 将 Queue 对象作为参数传入 目标函数。
- 在子进程中调用
.put()方法将计算结果写入 队列。 - 在主进程中调用
.get()方法读取 队列中的数据。若队列为空,程序将默认阻塞等待。
import multiprocessing
def compute_and_return(q, x, y):
result = x * y
q.put(result) # 写入队列
if __name__ == '__main__':
result_queue = multiprocessing.Queue()
proc = multiprocessing.Process(
target=compute_and_return,
args=(result_queue, 5, 6)
)
proc.start()
final_value = result_queue.get()
print(f"计算结果为: {final_value}")
proc.join()
第三阶段:使用进程池批量管理任务
频繁创建与销毁进程会消耗大量系统资源。multiprocessing.Pool 进程池通过复用 预先创建的进程,显著提升 高并发任务的处理效率。
- 导入
Pool类。 - 实例化 进程池对象。通过
processes参数指定核心数量(通常设为os.cpu_count())。若不传参,系统默认使用 当前机器的所有 CPU 核心数。 - 选择 合适的分发方法执行任务。参考下表对比核心方法特性:
| 方法名 | 执行模式 | 参数传递 | 返回值 | 适用场景 |
|---|---|---|---|---|
pool.map() |
阻塞执行 | 仅支持单参数函数 | 按输入顺序返回列表 | 数据规整、只需按顺序收集结果的任务 |
pool.starmap() |
阻塞执行 | 支持多参数(元组解包) | 按输入顺序返回列表 | 目标函数需要接收多个独立参数 |
pool.apply_async() |
异步非阻塞 | 支持多参数与回调函数 | 返回 AsyncResult 对象 |
需提交后立即执行其他逻辑,或需处理单个复杂任务 |
- 调用
.close()方法关闭 进程池入口,禁止 再提交新任务。 - 调用
.join()方法等待 池内所有任务执行结束,回收 所有工作进程资源。
import multiprocessing
import time
def heavy_calculation(n):
total = sum(i * i for i in range(n))
return total
if __name__ == '__main__':
tasks = [100000, 200000, 300000, 400000]
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(heavy_calculation, tasks)
for t, res in zip(tasks, results):
print(f"任务 {t} 结果: {res}")
第四阶段:异步任务处理与回调函数
当任务执行时间差异极大,或需要在主进程中实时响应任务状态时,使用异步提交模式。
- 定义 回调函数(Callback)。该函数必须接收一个参数(即异步任务的返回值)。
- 调用
.apply_async(target, args=..., callback=callback_func)异步提交 任务。主程序不阻塞,继续向下执行。 - 调用
AsyncResult对象的.get()方法主动获取 结果,或使用.ready()方法检查 任务是否完成。 - 验证 结果完整性。若任务内部抛出异常,
.get()会重新抛出 该异常,必须 使用try...except捕获。
import multiprocessing
def slow_task(val):
import time
time.sleep(val)
return f"耗时 {val} 秒完成"
def on_finish(res):
print(f"收到回调通知: {res}")
if __name__ == '__main__':
pool = multiprocessing.Pool(2)
res_obj1 = pool.apply_async(slow_task, args=(2,), callback=on_finish)
res_obj2 = pool.apply_async(slow_task, args=(1,), callback=on_finish)
print("任务已提交,主程序继续运行其他逻辑...")
print(f"获取结果1: {res_obj1.get()}")
print(f"获取结果2: {res_obj2.get()}")
pool.close()
pool.join()
第五阶段:核心避坑指南与资源清理
多进程编程极易因环境差异或资源泄漏引发崩溃。严格遵循以下操作规范可彻底消除 常见故障。
- 添加 入口保护语句。在 Windows 与 macOS 的
spawn启动模式下,子进程会重新导入主脚本。必须 将所有启动代码置于if __name__ == '__main__':代码块内,否则将触发无限递归创建进程的致命错误。 - 隔离 全局状态。多进程不共享 普通全局变量。使用
multiprocessing.Value与multiprocessing.Array替代基础类型,或完全依赖Queue与Pipe传递状态。 - 捕获 子进程异常。
Pool.map与.get()默认不直接打印 子进程的具体错误堆栈。包裹 调用代码至try...except Exception as e:块中,或配置 独立日志记录器,以便快速定位 崩溃源头。 - 限制 进程数量。根据实际 CPU 核心数设置池大小。避免 创建远超物理核心数的进程,否则上下文切换开销将导致性能断崖式下降。
- 清理 僵尸进程。确保 每次
close()与join()均被正确调用。推荐优先使用with multiprocessing.Pool() as pool:上下文管理器语法,自动触发 资源释放逻辑,防止 脚本被强制中断时残留后台进程。

暂无评论,快来抢沙发吧!