文章目录

Java线程池ThreadPoolExecutor的饱和策略源码解读

发布于 2026-05-05 07:14:09 · 浏览 15 次 · 评论 0 条

Java线程池ThreadPoolExecutor的饱和策略源码解读

在使用 ThreadPoolExecutor 时,当线程池的核心线程数已满、任务队列也已满,并且线程数达到最大线程数时,线程池会处于“饱和”状态。此时,新提交的任务将由“饱和策略”进行处理。这四种策略决定了线程池在无法接收新任务时的具体行为。


1. 饱和策略的触发机制

要理解饱和策略,首先必须知道它在什么情况下被调用。这需要查看 ThreadPoolExecutorexecute 方法源码逻辑。

分析 以下线程池处理任务的核心流程,该流程清晰地展示了策略触发的时机。

graph TD A["Start: Execute Task"] --> B{"Worker Count < CorePoolSize?"} B -- Yes --> C["Create Core Worker & Start Task"] B -- No --> D{"Work Queue Full?"} D -- No --> E["Add Task to Queue"] D -- Yes --> F{"Worker Count < MaximumPoolSize?"} F -- Yes --> G["Create Non-Core Worker & Start Task"] F -- No --> H["Trigger: RejectedExecutionHandler.rejectedExecution"] C --> I["End"] E --> I G --> I H --> J["Handle Rejection (Strategy Logic)"]

从上图可以看出,只有当所有资源(核心线程、队列、最大线程)都已耗尽,线程池才会执行 拒绝逻辑。具体使用哪种策略,取决于创建线程池时传入的 RejectedExecutionHandler 接口实现类。


2. 策略一:AbortPolicy(默认抛出异常)

这是 JDK 默认的饱和策略。它的逻辑非常简单粗暴:直接抛出异常,阻止系统继续运行,以此强制开发者发现并处理任务过载的问题。

查看 AbortPolicy 的源码:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

分析 源码行为:

  1. 抛出 RejectedExecutionException 运行时异常。
  2. 导致 当前提交任务的线程因异常而中断(如果没有捕获该异常)。
  3. 结果:该任务被丢弃,且生产者会受到明确的报错通知。

适用场景

  • 对数据一致性要求极高,不允许任务静默丢失的业务。
  • 需要通过异常立即感知系统资源不足的场景。

3. 策略二:CallerRunsPolicy(调用者运行)

这种策略提供了一种简单的“自我调节”机制(背压机制)。它不会丢弃任务,而是将任务回退给提交任务的线程(即调用者线程)自己去执行。

查看 CallerRunsPolicy 的源码:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

分析 源码行为:

  1. 检查 线程池是否已关闭。如果未关闭,执行 r.run()
  2. 注意:这里调用的是 run() 方法,而不是 execute() 方法。这意味着任务不会进入线程池队列,而是直接在当前调用 execute 的线程中运行。
  3. 结果
    • 拖慢了主线程或调用线程的速度,从而减慢了新任务的提交速度。
    • 为线程池争取了处理现有任务的时间。

适用场景

  • 任务不能丢失,但可以接受处理延迟。
  • 希望通过牺牲提交速度来换取系统稳定性的场景。

4. 策略三:DiscardPolicy(静默丢弃)

这是一种“极简”且“最危险”的策略。当任务无法提交时,它什么都不做,直接丢弃任务,且不会给任何通知。

查看 DiscardPolicy 的源码:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // Do nothing, drop the task silently
    }
}

分析 源码行为:

  1. rejectedExecution 方法体完全是空的。
  2. 执行 该方法后,任务直接消失,仿佛从未提交过。
  3. 结果:由于没有日志或异常,线上排查问题时极难发现数据丢失。

适用场景

  • 允许数据丢失的非核心业务(例如:某些不计费的统计日志上报)。
  • 任务之间完全独立,且丢失部分数据不影响整体逻辑的流式处理。

5. 策略四:DiscardOldestPolicy(丢弃最老任务)

这种策略在丢弃新任务和丢弃旧任务之间做了一个折衷。它会丢弃队列中等待时间最长的任务,然后尝试再次提交被拒绝的新任务。

查看 DiscardOldestPolicy 的源码:

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

分析 源码行为:

  1. 检查 线程池是否关闭。
  2. 调用 e.getQueue().poll()弹出 并移除任务队列的队首元素(等待时间最久的任务)。
  3. 调用 e.execute(r)重新尝试 将当前被拒绝的任务放入线程池。

适用场景

  • 这种策略通常适用于允许部分旧数据过期,但希望尽可能保留最新数据的场景。
  • 风险提示:如果队列被快速填满,这种策略会导致队列头部的任务反复被丢弃,永远得不到执行。

6. 四种策略对比总结

为了方便在实际开发中选择 合适的策略,下表总结了它们的核心区别:

策略名称 核心动作 是否抛出异常 是否丢失任务 典型应用场景
AbortPolicy 抛出异常 默认策略,需高可靠性感知的场景
CallerRunsPolicy 调用线程执行 需要背压机制,允许降低提交速度的场景
DiscardPolicy 静默丢弃 允许大量数据丢失的非核心业务
DiscardOldestPolicy 丢弃队首重试 优先保留新数据,允许旧数据过期的场景

7. 如何自定义饱和策略

如果以上四种策略均不能满足业务需求,你可以通过实现 RejectedExecutionHandler 接口来自定义逻辑。

定义 一个自定义策略类:

public class CustomRejectionPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 1. 记录详细的日志,包含任务信息和线程池状态
        System.err.println("Task rejected: " + r.toString());
        System.err.println("Pool size: " + executor.getPoolSize());

        // 2. 自定义兜底逻辑,例如:
        // - 将任务存入本地磁盘或数据库稍后重试
        // - 发送告警邮件或短信
        // - 将任务转发到备用线程池
        saveToBackupStorage(r);
    }

    private void saveToBackupStorage(Runnable r) {
        // 模拟持久化操作
    }
}

应用 自定义策略到线程池:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    1, 
    1, 
    0L, 
    TimeUnit.MILLISECONDS, 
    new LinkedBlockingQueue<>(1),
    new CustomRejectionPolicy() // 传入自定义策略
);

实施 自定义策略时,请务必注意逻辑的性能开销,避免在拒绝处理中引入耗时的阻塞操作,否则会导致调用线程(Caller)长时间阻塞。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文