Java线程池ThreadPoolExecutor的饱和策略源码解读
在使用 ThreadPoolExecutor 时,当线程池的核心线程数已满、任务队列也已满,并且线程数达到最大线程数时,线程池会处于“饱和”状态。此时,新提交的任务将由“饱和策略”进行处理。这四种策略决定了线程池在无法接收新任务时的具体行为。
1. 饱和策略的触发机制
要理解饱和策略,首先必须知道它在什么情况下被调用。这需要查看 ThreadPoolExecutor 的 execute 方法源码逻辑。
分析 以下线程池处理任务的核心流程,该流程清晰地展示了策略触发的时机。
从上图可以看出,只有当所有资源(核心线程、队列、最大线程)都已耗尽,线程池才会执行 拒绝逻辑。具体使用哪种策略,取决于创建线程池时传入的 RejectedExecutionHandler 接口实现类。
2. 策略一:AbortPolicy(默认抛出异常)
这是 JDK 默认的饱和策略。它的逻辑非常简单粗暴:直接抛出异常,阻止系统继续运行,以此强制开发者发现并处理任务过载的问题。
查看 AbortPolicy 的源码:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
分析 源码行为:
- 抛出
RejectedExecutionException运行时异常。 - 导致 当前提交任务的线程因异常而中断(如果没有捕获该异常)。
- 结果:该任务被丢弃,且生产者会受到明确的报错通知。
适用场景:
- 对数据一致性要求极高,不允许任务静默丢失的业务。
- 需要通过异常立即感知系统资源不足的场景。
3. 策略二:CallerRunsPolicy(调用者运行)
这种策略提供了一种简单的“自我调节”机制(背压机制)。它不会丢弃任务,而是将任务回退给提交任务的线程(即调用者线程)自己去执行。
查看 CallerRunsPolicy 的源码:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
分析 源码行为:
- 检查 线程池是否已关闭。如果未关闭,执行
r.run()。 - 注意:这里调用的是
run()方法,而不是execute()方法。这意味着任务不会进入线程池队列,而是直接在当前调用execute的线程中运行。 - 结果:
- 拖慢了主线程或调用线程的速度,从而减慢了新任务的提交速度。
- 为线程池争取了处理现有任务的时间。
适用场景:
- 任务不能丢失,但可以接受处理延迟。
- 希望通过牺牲提交速度来换取系统稳定性的场景。
4. 策略三:DiscardPolicy(静默丢弃)
这是一种“极简”且“最危险”的策略。当任务无法提交时,它什么都不做,直接丢弃任务,且不会给任何通知。
查看 DiscardPolicy 的源码:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Do nothing, drop the task silently
}
}
分析 源码行为:
rejectedExecution方法体完全是空的。- 执行 该方法后,任务直接消失,仿佛从未提交过。
- 结果:由于没有日志或异常,线上排查问题时极难发现数据丢失。
适用场景:
- 允许数据丢失的非核心业务(例如:某些不计费的统计日志上报)。
- 任务之间完全独立,且丢失部分数据不影响整体逻辑的流式处理。
5. 策略四:DiscardOldestPolicy(丢弃最老任务)
这种策略在丢弃新任务和丢弃旧任务之间做了一个折衷。它会丢弃队列中等待时间最长的任务,然后尝试再次提交被拒绝的新任务。
查看 DiscardOldestPolicy 的源码:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
分析 源码行为:
- 检查 线程池是否关闭。
- 调用
e.getQueue().poll():弹出 并移除任务队列的队首元素(等待时间最久的任务)。 - 调用
e.execute(r):重新尝试 将当前被拒绝的任务放入线程池。
适用场景:
- 这种策略通常适用于允许部分旧数据过期,但希望尽可能保留最新数据的场景。
- 风险提示:如果队列被快速填满,这种策略会导致队列头部的任务反复被丢弃,永远得不到执行。
6. 四种策略对比总结
为了方便在实际开发中选择 合适的策略,下表总结了它们的核心区别:
| 策略名称 | 核心动作 | 是否抛出异常 | 是否丢失任务 | 典型应用场景 |
|---|---|---|---|---|
AbortPolicy |
抛出异常 | 是 | 是 | 默认策略,需高可靠性感知的场景 |
CallerRunsPolicy |
调用线程执行 | 否 | 否 | 需要背压机制,允许降低提交速度的场景 |
DiscardPolicy |
静默丢弃 | 否 | 是 | 允许大量数据丢失的非核心业务 |
DiscardOldestPolicy |
丢弃队首重试 | 否 | 是 | 优先保留新数据,允许旧数据过期的场景 |
7. 如何自定义饱和策略
如果以上四种策略均不能满足业务需求,你可以通过实现 RejectedExecutionHandler 接口来自定义逻辑。
定义 一个自定义策略类:
public class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 1. 记录详细的日志,包含任务信息和线程池状态
System.err.println("Task rejected: " + r.toString());
System.err.println("Pool size: " + executor.getPoolSize());
// 2. 自定义兜底逻辑,例如:
// - 将任务存入本地磁盘或数据库稍后重试
// - 发送告警邮件或短信
// - 将任务转发到备用线程池
saveToBackupStorage(r);
}
private void saveToBackupStorage(Runnable r) {
// 模拟持久化操作
}
}
应用 自定义策略到线程池:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new CustomRejectionPolicy() // 传入自定义策略
);
实施 自定义策略时,请务必注意逻辑的性能开销,避免在拒绝处理中引入耗时的阻塞操作,否则会导致调用线程(Caller)长时间阻塞。

暂无评论,快来抢沙发吧!