文章目录

Python多进程之间如何共享数据:Queue、Pipe、Manager

发布于 2026-05-01 18:27:40 · 浏览 12 次 · 评论 0 条

Python 多进程编程中,由于每个进程拥有独立的内存空间,变量无法像多线程那样直接共享。要实现进程间通信(IPC),必须使用特定的数据结构或机制。以下是三种主流方案:Queue(队列)、Pipe(管道)和 Manager(管理器)的实操指南。


方法一:使用 Queue(队列)

Queue 是线程安全、进程安全的先进先出(FIFO)数据结构。它最适合用于“生产者-消费者”模型,即一个进程负责放入数据,另一个进程负责取出数据。

  1. 导入 multiprocessing 模块中的 Queue 类。
  2. 实例化 一个队列对象 q = Queue()
  3. 在子进程中,使用 q.put() 方法写入数据。
  4. 在主进程中,使用 q.get() 方法读取数据。注意 get() 方法会阻塞,直到队列中有数据。
import multiprocessing
import time

def writer(q):
    """子进程:向队列写入数据"""
    for i in range(5):
        print(f"子进程写入: {i}")
        q.put(i)
        time.sleep(0.1)

def reader(q):
    """主进程:从队列读取数据"""
    while True:
        # 尝试获取数据,如果队列为空则抛出异常退出
        try:
            # 设置超时时间,防止死循环无法退出
            val = q.get(timeout=1)
            print(f"主进程读取: {val}")
        except:
            break

if __name__ == "__main__":
    q = multiprocessing.Queue()
    # 创建写入进程
    p = multiprocessing.Process(target=writer, args=(q,))
    p.start()

    # 主进程充当读取者
    reader(q)

    p.join()

方法二:使用 Pipe(管道)

Pipe 返回一对连接对象(conn1, conn2),分别代表管道的两端。它比 Queue 更底层、更快,但只适用于两个进程之间的通信。

  1. 调用 multiprocessing.Pipe() 创建管道,它会返回两个连接端点。
  2. 其中一个端点 传递给 子进程。
  3. 发送端 使用 .send() 方法发送数据。
  4. 接收端 使用 .recv() 方法接收数据。
import multiprocessing

def sender(conn):
    """子进程:发送数据"""
    data = ["Hello", "from", "Pipe"]
    for item in data:
        conn.send(item)
        print(f"发送: {item}")
    conn.close() # 发送完毕关闭连接

if __name__ == "__main__":
    # 创建管道,duplex=True表示双向管道(默认),False则为单向
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=sender, args=(child_conn,))
    p.start()

    # 主进程接收数据
    while True:
        try:
            msg = parent_conn.recv()
            print(f"接收: {msg}")
        except EOFError:
            # 当连接关闭时抛出 EOFError
            break

    p.join()

方法三:使用 Manager(管理器)

Manager 比较特殊,它启动一个专门的服务进程来管理数据,并允许其他进程通过代理访问这些数据。它支持更复杂的数据结构,如 listdict 等,但性能开销最大。

  1. 创建 multiprocessing.Manager() 实例。
  2. 使用 管理器对象 创建共享数据结构(如 shared_list = manager.list())。
  3. 共享对象 传递给 子进程。
  4. 像操作普通对象一样 在子进程中 修改 共享数据,修改会自动同步。
import multiprocessing

def modify_list(shared_list):
    """子进程:修改共享列表"""
    shared_list.append("子进程添加的数据")
    print(f"子进程内部看到: {shared_list}")

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        # 创建共享列表
        shared_list = manager.list()
        shared_list.append("初始数据")

        p = multiprocessing.Process(target=modify_list, args=(shared_list,))
        p.start()
        p.join()

        # 主进程查看修改后的结果
        print(f"主进程最终看到: {shared_list}")

方案选择指南

为了在实际项目中快速决策,请参考下表对比这三种方式的特性:

特性 Queue Pipe Manager
适用场景 多个进程间通信,生产者-消费者模型 两个进程间的高速通信 需要共享复杂数据结构
数据结构 队列 (FIFO) 管道 (双向或单向) List, Dict, Namespace 等
性能 中等 最快 最慢 (IPC 开销大)
安全性 自动加锁,安全 需自行处理并发 (小心死锁) 自动加锁,安全

为了更直观地展示选择逻辑,可以参考以下流程:

graph TD A["开始选择通信方式"] --> B{需要共享复杂
数据结构吗?} B -- "是" --> C["使用 Manager"] B -- "否" --> D{通信进程数量} D -- "多于2个" --> E["使用 Queue"] D -- "仅2个" --> F{是否追求极致性能?} F -- "是" --> G["使用 Pipe"] F -- "否" --> E

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文