Go 并发模式:worker pool 实现
处理高并发任务时,无限制地创建 goroutine 会导致系统资源耗尽。Worker Pool(工作池)模式通过固定数量的 Worker 协同处理任务队列,既能利用并发优势,又能有效控制系统负载。以下是具体实现步骤。
理解 Worker Pool 的工作流
Worker Pool 的核心思想是将“任务生产”与“任务执行”分离。主程序作为分发者将任务放入通道,固定数量的 Worker 从通道取出任务并执行,最终将结果返回。
以下是数据流向的逻辑图:
graph LR
A["主程序"] -->|"分发任务"| B["任务通道"]
B -->|"消费任务"| C["Worker 1"]
B -->|"消费任务"| D["Worker 2"]
B -->|"消费任务"| E["Worker N"]
C -->|"写入结果"| F["结果通道"]
D -->|"写入结果"| F
E -->|"写入结果"| F
F -->|"汇总输出"| A
基础 Worker Pool 实现
1. 定义数据结构
创建 Job 结构体表示待处理的任务,创建 Result 结构体表示处理后的输出。
type Job struct {
ID int
Payload int
}
type Result struct {
Job Job
Output int
}
2. 编写 Worker 函数
Worker 是一个长期运行的函数,监听 jobs 通道。一旦收到任务,执行 计算逻辑,并将结果发送到 results 通道。
func worker(id int, jobs <-chan Job, results chan<- Result) {
for j := range jobs {
// 模拟耗时计算:计算 Payload 的各位数字之和
sum := 0
num := j.Payload
for num > 0 {
sum += num % 10
num /= 10
}
// 将结果发送回通道
results <- Result{
Job: j,
Output: sum,
}
fmt.Printf("Worker %d finished job %d\n", id, j.ID)
}
}
3. 构建主流程
在 main 函数中初始化通道,启动 指定数量的 Worker,分发 任务,并收集 结果。
package main
import (
"fmt"
)
func main() {
// 1. 定义通道
jobs := make(chan Job, 100) // 缓冲通道存放任务
results := make(chan Result, 100) // 缓冲通道存放结果
// 2. 启动 Worker 团队(例如 3 个 Worker)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 3. 分发任务
for i := 1; i <= 9; i++ {
jobs <- Job{ID: i, Payload: i * 123}
}
close(jobs) // 关闭通道,通知 Worker 没有新任务了
// 4. 收集结果
for i := 1; i <= 9; i++ {
r := <-results
fmt.Printf("Job %d result: %d\n", r.Job.ID, r.Output)
}
}
进阶:使用 Context 实现优雅退出
在实际生产环境中,如果程序需要强制终止(例如收到中断信号或超时),必须让 Worker 停止等待并退出。使用 context.Context 可以实现这一功能。
1. 修改 Worker 函数签名
在 Worker 函数中增加 ctx context.Context 参数,并在 select 语句中同时监听 jobs 通道和 ctx.Done()。
func workerWithCtx(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) {
for {
select {
case <-ctx.Done():
// 收到取消信号,退出循环
return
case j, ok := <-jobs:
if !ok {
// 通道已关闭且无数据,正常退出
return
}
// 执行任务逻辑
sum := 0
for _, d := range fmt.Sprint(j.Payload) {
sum += int(d - '0')
}
results <- Result{Job: j, Output: sum}
}
}
}
2. 在主函数中使用 Context
创建 一个可取消的 Context,并将其传递给 Worker。
func mainAdvanced() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 确保函数退出时触发取消
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// 启动带 Context 的 Worker
for w := 1; w <= 3; w++ {
go workerWithCtx(ctx, w, jobs, results)
}
// 模拟分发任务
go func() {
for i := 1; i <= 5; i++ {
jobs <- Job{ID: i, Payload: i * 999}
}
close(jobs)
}()
// 收集结果
for i := 1; i <= 5; i++ {
fmt.Println(<-results)
}
// 如果此时调用 cancel(),所有阻塞在 jobs 读取上的 Worker 都会立即收到信号并退出
}
关键点总结
- 通道缓冲:务必根据任务量设置
make(chan, size)的缓冲大小,防止阻塞主分发流程。 - 关闭通道:任务分发完毕后,执行
close(jobs),这样 Worker 中的range循环或case j, ok := <-jobs才能正确判断结束并退出,避免死锁。 - 数量控制:Worker 的数量通常设置为 CPU 核心数(
runtime.NumCPU())或根据具体 I/O 密集型/计算型任务调整,避免过多的上下文切换。

暂无评论,快来抢沙发吧!