Go 并发编程的核心在于高效利用多核 CPU,同时避免资源耗尽。通过控制并发粒度,我们可以显著提升数据处理速度。以下是构建工作池与扇入扇出模式的实操指南。
第一部分:工作池
工作池模式的核心思想是限制并发运行的 Goroutine 数量。想象你有一个无限长的任务列表,但只有 3 个工人来处理,这样可以防止系统因创建成千上万个 Goroutine 而崩溃。
下面展示任务如何在通道、工作 Goroutine 和结果通道之间流动:
编写工作池模式的代码步骤如下:
-
定义 Worker 函数
该函数接收任务通道和结果通道,循环处理任务。 -
创建 通道
初始化一个用于发送任务的缓冲通道jobs和一个用于接收结果的缓冲通道results。 -
启动 固定数量的 Goroutine
使用for循环开启指定数量(如 3 个)的 Worker,作为“池子”。 -
分发 任务
将任务发送到jobs通道,并关闭 该通道以通知 Worker 没有新任务了。 -
收集 结果
从results通道读取所有处理完成的数据。
执行以下代码实现上述逻辑:
package main
import (
"fmt"
"time"
)
// worker 模拟处理任务的函数
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second) // 模拟耗时操作
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2 // 将结果发送回结果通道
}
}
func main() {
// 1. 定义通道数量和任务数
numWorkers := 3
numJobs := 5
// 2. 创建通道
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 3. 启动 Worker
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// 4. 分发任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 重要:关闭通道,让 range 能退出
// 5. 收集结果
for r := 1; r <= numJobs; r++ {
<-results
}
}
第二部分:扇出
扇出是指将一个数据源的数据分发到多个处理函数中,并行处理。这就像是把一个主水源的水管分接到多个水龙头,每个水龙头独立用水。
在代码中,通常表现为启动多个 Goroutine 从同一个输入通道读取数据。
实施扇出模式的步骤:
- 创建 一个输入数据通道。
- 生成 数据并发送到该通道。
- 启动 多个处理函数(Goroutine)。
- 让 所有处理函数都从同一个输入通道读取数据。
注意:扇出本身不会自动聚合结果,它只是并发消费的步骤。
第三部分:扇入
扇入是扇出的逆过程:将多个通道的结果汇聚到一个统一的通道中,以便主线程可以像读取一个源一样读取所有结果。
下图展示了多个输入通道如何通过一个函数汇聚到一个输出通道:
编写扇入函数的步骤:
- 创建 一个输出通道
output。 - 启动 一个辅助 Goroutine。
- 使用
select语句监听所有输入通道。 - 转发 无论从哪个输入通道读取到的数据,都发送 到
output通道。
执行以下代码查看扇入与扇出的综合运用:
package main
import (
"fmt"
"math/rand"
"time"
)
// 模拟一个耗时工作
func boring(msg string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}()
return c
}
// fanIn 函数将多个输入通道汇聚到一个输出通道
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1:
c <- s
case s := <-input2:
c <- s
}
}
}()
return c
}
func main() {
// 扇出:启动两个生成器
c := boring("Joe")
d := boring("Ann")
// 扇入:将两个通道汇聚
mainChan := fanIn(c, d)
// 从统一通道读取
for i := 0; i < 10; i++ {
fmt.Println(<-mainChan)
}
fmt.Println("You're both boring; I'm leaving.")
}
在上述代码中,boring 函数产生数据流(扇出的源头),fanIn 函数将这些数据流合并。select 语句会阻塞,直到其中任意一个通道有数据可读,从而实现了多路复用。

暂无评论,快来抢沙发吧!