文章目录

Go 并发模式:工作池与扇入扇出

发布于 2026-04-17 07:15:11 · 浏览 17 次 · 评论 0 条

Go 并发编程的核心在于高效利用多核 CPU,同时避免资源耗尽。通过控制并发粒度,我们可以显著提升数据处理速度。以下是构建工作池与扇入扇出模式的实操指南。


第一部分:工作池

工作池模式的核心思想是限制并发运行的 Goroutine 数量。想象你有一个无限长的任务列表,但只有 3 个工人来处理,这样可以防止系统因创建成千上万个 Goroutine 而崩溃。

下面展示任务如何在通道、工作 Goroutine 和结果通道之间流动:

graph LR J["Jobs Channel"] -->|Task 1| W1["Worker 1"] J -->|Task 2| W2["Worker 2"] J -->|Task 3| W3["Worker 3"] J -->|Task 4| W1 W1 -->|Result 1| R["Results Channel"] W2 -->|Result 2| R W3 -->|Result 3| R

编写工作池模式的代码步骤如下:

  1. 定义 Worker 函数
    该函数接收任务通道和结果通道,循环处理任务。

  2. 创建 通道
    初始化一个用于发送任务的缓冲通道 jobs 和一个用于接收结果的缓冲通道 results

  3. 启动 固定数量的 Goroutine
    使用 for 循环开启指定数量(如 3 个)的 Worker,作为“池子”。

  4. 分发 任务
    将任务发送到 jobs 通道,并关闭 该通道以通知 Worker 没有新任务了。

  5. 收集 结果
    results 通道读取所有处理完成的数据。

执行以下代码实现上述逻辑:

package main

import (
    "fmt"
    "time"
)

// worker 模拟处理任务的函数
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        fmt.Printf("Worker %d finished job %d\n", id, j)
        results <- j * 2 // 将结果发送回结果通道
    }
}

func main() {
    // 1. 定义通道数量和任务数
    numWorkers := 3
    numJobs := 5

    // 2. 创建通道
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 3. 启动 Worker
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }

    // 4. 分发任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 重要:关闭通道,让 range 能退出

    // 5. 收集结果
    for r := 1; r <= numJobs; r++ {
        <-results
    }
}

第二部分:扇出

扇出是指将一个数据源的数据分发到多个处理函数中,并行处理。这就像是把一个主水源的水管分接到多个水龙头,每个水龙头独立用水。

在代码中,通常表现为启动多个 Goroutine 从同一个输入通道读取数据。

实施扇出模式的步骤:

  1. 创建 一个输入数据通道。
  2. 生成 数据并发送到该通道。
  3. 启动 多个处理函数(Goroutine)。
  4. 所有处理函数都从同一个输入通道读取数据。

注意:扇出本身不会自动聚合结果,它只是并发消费的步骤。


第三部分:扇入

扇入是扇出的逆过程:将多个通道的结果汇聚到一个统一的通道中,以便主线程可以像读取一个源一样读取所有结果。

下图展示了多个输入通道如何通过一个函数汇聚到一个输出通道:

graph LR I1["Input Channel 1"] -->|Data Stream A| F["Fan-in Function"] I2["Input Channel 2"] -->|Data Stream B| F I3["Input Channel 3"] -->|Data Stream C| F F -->|Combined Stream| O["Output Channel"]

编写扇入函数的步骤:

  1. 创建 一个输出通道 output
  2. 启动 一个辅助 Goroutine。
  3. 使用 select 语句监听所有输入通道。
  4. 转发 无论从哪个输入通道读取到的数据,都发送output 通道。

执行以下代码查看扇入与扇出的综合运用:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// 模拟一个耗时工作
func boring(msg string) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        }
    }()
    return c
}

// fanIn 函数将多个输入通道汇聚到一个输出通道
func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            }
        }
    }()
    return c
}

func main() {
    // 扇出:启动两个生成器
    c := boring("Joe")
    d := boring("Ann")

    // 扇入:将两个通道汇聚
    mainChan := fanIn(c, d)

    // 从统一通道读取
    for i := 0; i < 10; i++ {
        fmt.Println(<-mainChan)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

在上述代码中,boring 函数产生数据流(扇出的源头),fanIn 函数将这些数据流合并。select 语句会阻塞,直到其中任意一个通道有数据可读,从而实现了多路复用。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文