Python 多进程编程中,由于每个进程拥有独立的内存空间,变量无法像多线程那样直接共享。要实现进程间通信(IPC),必须使用特定的数据结构或机制。以下是三种主流方案:Queue(队列)、Pipe(管道)和 Manager(管理器)的实操指南。
方法一:使用 Queue(队列)
Queue 是线程安全、进程安全的先进先出(FIFO)数据结构。它最适合用于“生产者-消费者”模型,即一个进程负责放入数据,另一个进程负责取出数据。
- 导入
multiprocessing模块中的Queue类。 - 实例化 一个队列对象
q = Queue()。 - 在子进程中,使用
q.put()方法写入数据。 - 在主进程中,使用
q.get()方法读取数据。注意get()方法会阻塞,直到队列中有数据。
import multiprocessing
import time
def writer(q):
"""子进程:向队列写入数据"""
for i in range(5):
print(f"子进程写入: {i}")
q.put(i)
time.sleep(0.1)
def reader(q):
"""主进程:从队列读取数据"""
while True:
# 尝试获取数据,如果队列为空则抛出异常退出
try:
# 设置超时时间,防止死循环无法退出
val = q.get(timeout=1)
print(f"主进程读取: {val}")
except:
break
if __name__ == "__main__":
q = multiprocessing.Queue()
# 创建写入进程
p = multiprocessing.Process(target=writer, args=(q,))
p.start()
# 主进程充当读取者
reader(q)
p.join()
方法二:使用 Pipe(管道)
Pipe 返回一对连接对象(conn1, conn2),分别代表管道的两端。它比 Queue 更底层、更快,但只适用于两个进程之间的通信。
- 调用
multiprocessing.Pipe()创建管道,它会返回两个连接端点。 - 将 其中一个端点 传递给 子进程。
- 发送端 使用
.send()方法发送数据。 - 接收端 使用
.recv()方法接收数据。
import multiprocessing
def sender(conn):
"""子进程:发送数据"""
data = ["Hello", "from", "Pipe"]
for item in data:
conn.send(item)
print(f"发送: {item}")
conn.close() # 发送完毕关闭连接
if __name__ == "__main__":
# 创建管道,duplex=True表示双向管道(默认),False则为单向
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=sender, args=(child_conn,))
p.start()
# 主进程接收数据
while True:
try:
msg = parent_conn.recv()
print(f"接收: {msg}")
except EOFError:
# 当连接关闭时抛出 EOFError
break
p.join()
方法三:使用 Manager(管理器)
Manager 比较特殊,它启动一个专门的服务进程来管理数据,并允许其他进程通过代理访问这些数据。它支持更复杂的数据结构,如 list、dict 等,但性能开销最大。
- 创建
multiprocessing.Manager()实例。 - 使用 管理器对象 创建共享数据结构(如
shared_list = manager.list())。 - 将 共享对象 传递给 子进程。
- 像操作普通对象一样 在子进程中 修改 共享数据,修改会自动同步。
import multiprocessing
def modify_list(shared_list):
"""子进程:修改共享列表"""
shared_list.append("子进程添加的数据")
print(f"子进程内部看到: {shared_list}")
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
# 创建共享列表
shared_list = manager.list()
shared_list.append("初始数据")
p = multiprocessing.Process(target=modify_list, args=(shared_list,))
p.start()
p.join()
# 主进程查看修改后的结果
print(f"主进程最终看到: {shared_list}")
方案选择指南
为了在实际项目中快速决策,请参考下表对比这三种方式的特性:
| 特性 | Queue | Pipe | Manager |
|---|---|---|---|
| 适用场景 | 多个进程间通信,生产者-消费者模型 | 两个进程间的高速通信 | 需要共享复杂数据结构 |
| 数据结构 | 队列 (FIFO) | 管道 (双向或单向) | List, Dict, Namespace 等 |
| 性能 | 中等 | 最快 | 最慢 (IPC 开销大) |
| 安全性 | 自动加锁,安全 | 需自行处理并发 (小心死锁) | 自动加锁,安全 |
为了更直观地展示选择逻辑,可以参考以下流程:
graph TD
A["开始选择通信方式"] --> B{需要共享复杂
数据结构吗?} B -- "是" --> C["使用 Manager"] B -- "否" --> D{通信进程数量} D -- "多于2个" --> E["使用 Queue"] D -- "仅2个" --> F{是否追求极致性能?} F -- "是" --> G["使用 Pipe"] F -- "否" --> E
数据结构吗?} B -- "是" --> C["使用 Manager"] B -- "否" --> D{通信进程数量} D -- "多于2个" --> E["使用 Queue"] D -- "仅2个" --> F{是否追求极致性能?} F -- "是" --> G["使用 Pipe"] F -- "否" --> E

暂无评论,快来抢沙发吧!