文章目录

Go 并发模式:worker pool 实现

发布于 2026-04-19 03:27:07 · 浏览 5 次 · 评论 0 条

Go 并发模式:worker pool 实现

处理高并发任务时,无限制地创建 goroutine 会导致系统资源耗尽。Worker Pool(工作池)模式通过固定数量的 Worker 协同处理任务队列,既能利用并发优势,又能有效控制系统负载。以下是具体实现步骤。


理解 Worker Pool 的工作流

Worker Pool 的核心思想是将“任务生产”与“任务执行”分离。主程序作为分发者将任务放入通道,固定数量的 Worker 从通道取出任务并执行,最终将结果返回。

以下是数据流向的逻辑图:

graph LR A["主程序"] -->|"分发任务"| B["任务通道"] B -->|"消费任务"| C["Worker 1"] B -->|"消费任务"| D["Worker 2"] B -->|"消费任务"| E["Worker N"] C -->|"写入结果"| F["结果通道"] D -->|"写入结果"| F E -->|"写入结果"| F F -->|"汇总输出"| A

基础 Worker Pool 实现

1. 定义数据结构

创建 Job 结构体表示待处理的任务,创建 Result 结构体表示处理后的输出。

type Job struct {
    ID      int
    Payload int
}

type Result struct {
    Job    Job
    Output int
}

2. 编写 Worker 函数

Worker 是一个长期运行的函数,监听 jobs 通道。一旦收到任务,执行 计算逻辑,并将结果发送到 results 通道。

func worker(id int, jobs <-chan Job, results chan<- Result) {
    for j := range jobs {
        // 模拟耗时计算:计算 Payload 的各位数字之和
        sum := 0
        num := j.Payload
        for num > 0 {
            sum += num % 10
            num /= 10
        }

        // 将结果发送回通道
        results <- Result{
            Job:    j,
            Output: sum,
        }
        fmt.Printf("Worker %d finished job %d\n", id, j.ID)
    }
}

3. 构建主流程

main 函数中初始化通道,启动 指定数量的 Worker,分发 任务,并收集 结果。

package main

import (
    "fmt"
)

func main() {
    // 1. 定义通道
    jobs := make(chan Job, 100)    // 缓冲通道存放任务
    results := make(chan Result, 100) // 缓冲通道存放结果

    // 2. 启动 Worker 团队(例如 3 个 Worker)
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 3. 分发任务
    for i := 1; i <= 9; i++ {
        jobs <- Job{ID: i, Payload: i * 123}
    }
    close(jobs) // 关闭通道,通知 Worker 没有新任务了

    // 4. 收集结果
    for i := 1; i <= 9; i++ {
        r := <-results
        fmt.Printf("Job %d result: %d\n", r.Job.ID, r.Output)
    }
}

进阶:使用 Context 实现优雅退出

在实际生产环境中,如果程序需要强制终止(例如收到中断信号或超时),必须让 Worker 停止等待并退出。使用 context.Context 可以实现这一功能。

1. 修改 Worker 函数签名

在 Worker 函数中增加 ctx context.Context 参数,并在 select 语句中同时监听 jobs 通道和 ctx.Done()

func workerWithCtx(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) {
    for {
        select {
        case <-ctx.Done():
            // 收到取消信号,退出循环
            return
        case j, ok := <-jobs:
            if !ok {
                // 通道已关闭且无数据,正常退出
                return
            }
            // 执行任务逻辑
            sum := 0
            for _, d := range fmt.Sprint(j.Payload) {
                sum += int(d - '0')
            }
            results <- Result{Job: j, Output: sum}
        }
    }
}

2. 在主函数中使用 Context

创建 一个可取消的 Context,并将其传递给 Worker。

func mainAdvanced() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 确保函数退出时触发取消

    jobs := make(chan Job, 100)
    results := make(chan Result, 100)

    // 启动带 Context 的 Worker
    for w := 1; w <= 3; w++ {
        go workerWithCtx(ctx, w, jobs, results)
    }

    // 模拟分发任务
    go func() {
        for i := 1; i <= 5; i++ {
            jobs <- Job{ID: i, Payload: i * 999}
        }
        close(jobs)
    }()

    // 收集结果
    for i := 1; i <= 5; i++ {
        fmt.Println(<-results)
    }

    // 如果此时调用 cancel(),所有阻塞在 jobs 读取上的 Worker 都会立即收到信号并退出
}

关键点总结

  1. 通道缓冲:务必根据任务量设置 make(chan, size) 的缓冲大小,防止阻塞主分发流程。
  2. 关闭通道:任务分发完毕后,执行 close(jobs),这样 Worker 中的 range 循环或 case j, ok := <-jobs 才能正确判断结束并退出,避免死锁。
  3. 数量控制:Worker 的数量通常设置为 CPU 核心数(runtime.NumCPU())或根据具体 I/O 密集型/计算型任务调整,避免过多的上下文切换。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文