Java CompletableFuture.allOf与anyOf的组合策略
Java并发编程中,CompletableFuture 提供了强大的异步编排能力。掌握 allOf 和 anyOf 的组合使用,能够高效解决多任务并发执行、竞速调用及超时控制等复杂场景。
一、 核心机制与差异分析
在深入组合策略之前,必须明确两者的基本行为模式。下图展示了两种方法在处理三个并发任务时的逻辑差异。
graph TD
subgraph "AllOf 策略 (等待所有)"
A1["任务 A"] --> Barrier{"所有任务完成?"}
A2["任务 B"] --> Barrier
A3["任务 C"] --> Barrier
Barrier -- Yes --> End1["统一结束点"]
end
subgraph "AnyOf 策略 (等待任意)"
B1["任务 A"] --> Racer{"谁先完成?"}
B2["任务 B"] --> Racer
B3["任务 C"] --> Racer
Racer -- 任意一个完成 --> End2["最快结果"]
end
理解核心区别:
allOf:阻塞直到所有提供的CompletableFuture完成。适用于“聚合”场景。返回值为CompletableFuture<Void>,需要手动获取各个任务的结果。anyOf:阻塞直到任意一个提供的CompletableFuture完成。适用于“竞速”或“备份”场景。返回值为CompletableFuture<Object>,类型不统一,需要强转。
二、 策略一:并行批处理与结果汇总
这是 allOf 最经典的应用场景。假设需要同时调用三个远程服务获取数据,并在全部完成后合并返回。
执行步骤:
- 定义 异步任务列表。使用
CompletableFuture.supplyAsync启动任务。 - 组合 任务。将所有 Future 转换为数组,传入
CompletableFuture.allOf。 - 阻塞 等待。调用
.join()或.get()等待所有任务结束。 - 提取 结果。遍历原始 Future 列表,调用各自的
.get()获取结果(此时因为主流程已等待,这里会立即返回)。
代码实现:
// 1. 定义异步任务
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleepMillis(200); // 模拟耗时
return "Result-1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleepMillis(300);
return "Result-2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
sleepMillis(100);
return "Result-3";
});
// 2. 组合任务
CompletableFuture<Void> allFutures = CompletableFuture.allOf(task1, task2, task3);
// 3. 阻塞等待所有完成
allFutures.join(); // 这里会阻塞直到最慢的任务结束
// 4. 提取结果
String r1 = task1.join();
String r2 = task2.join();
String r3 = task3.join();
三、 策略二:多源竞速与容错降级
anyOf 适用于“谁快用谁”的场景。例如,同时从本地缓存和远程数据库读取数据,优先返回缓存结果。
执行步骤:
- 创建 主数据源 Future(如数据库查询)。
- 创建 备用数据源 Future(如本地缓存)。
- 执行 竞速。使用
CompletableFuture.anyOf传入两个 Future。 - 处理 结果。对返回的
CompletableFuture<Object>进行类型转换。
代码实现:
// 1. 定义两个数据源
CompletableFuture<String> dbQuery = CompletableFuture.supplyAsync(() -> {
sleepMillis(500); // 慢速源
return "Data from DB";
});
CompletableFuture<String> cacheQuery = CompletableFuture.supplyAsync(() -> {
sleepMillis(100); // 快速源
return "Data from Cache";
});
// 2. 竞速执行
CompletableFuture<Object> fastestResult = CompletableFuture.anyOf(dbQuery, cacheQuery);
// 3. 获取最快结果
String result = (String) fastestResult.join();
// 输出: "Data from Cache"
四、 策略三:整体超时控制(组合拳)
这是一个高级组合策略。假设使用 allOf 等待一批任务,但如果整个批处理超过 2 秒还没完成,我们希望立即取消或抛出异常,而不是无限等待。这可以通过 anyOf 结合一个“定时器 Future”来实现。
逻辑流程:
graph LR
A[开始批处理] --> B[创建任务集 AllOf]
A --> C[创建延时定时器 Timer]
B --> D{AnyOf: 任务完成 or 超时?}
C --> D
D -- 任务集完成 --> E[正常返回结果]
D -- 定时器触发 --> F[抛出超时异常]
执行步骤:
- 生成 业务任务集合并创建
allOfFuture。 - 构建 超时控制 Future。使用
CompletableFuture.supplyAsync并在内部Thread.sleep指定毫秒数。 - 组合 竞速。将业务
allOf与超时timer传入anyOf。 - 判断 触发源。
anyOf返回后,通过引用判断是业务先完成还是定时器先触发。
代码实现:
List<CompletableFuture<String>> tasks = List.of(
CompletableFuture.supplyAsync(() -> { sleepMillis(3000); return "A"; }),
CompletableFuture.supplyAsync(() -> { sleepMillis(1000); return "B"; })
);
// 1. 业务总任务
CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
// 2. 超时定时器 (2秒超时)
CompletableFuture<String> timeoutFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
return "Timeout Interrupted";
}
return "Timeout Triggered";
});
// 3. 竞速:看是任务先做完,还是超时先来
CompletableFuture<Object> raceResult = CompletableFuture.anyOf(allTasks, timeoutFuture);
// 4. 处理结果
Object winner = raceResult.join();
// 注意:因为 timeoutFuture 返回的是 String,而 allTasks 是 Void,这里直接 instanceof 判断即可
if (winner instanceof String && ((String) winner).startsWith("Timeout")) {
System.out.println("批处理超时,执行降级逻辑");
// 可以在这里取消 tasks: tasks.forEach(t -> t.cancel(true));
} else {
System.out.println("批处理在超时前完成");
// 收集结果
}
五、 策略对比与选择指南
为了在实际开发中快速决策,参考下表选择合适的策略。
| 策略名称 | 使用场景 | 核心方法组合 | 优点 | 缺点 |
|---|---|---|---|---|
| 全量聚合 | 需要所有任务结果才能继续(如组装报表) | 单独使用 allOf |
结果完整,逻辑简单 | 耗时取决于最慢的任务,无法处理部分失败 |
| 快速通道 | 多源冗余,只取最快结果(如多级缓存) | 单独使用 anyOf |
响应速度极快 | 无法获取慢速任务的数据,需处理类型转换 |
| 超时熔断 | 批处理任务必须有时间上限(如秒杀接口) | allOf + anyOf (加定时器) |
防止线程池阻塞,可控性强 | 代码复杂度较高,需额外处理清理逻辑 |
| 部分容错 | 只要有一个成功即可,允许部分失败 | 多个 exceptionally + anyOf |
高可用性,鲁棒性强 | 资源消耗可能较大(所有任务都会跑完) |
六、 实战建议与避坑指南
在使用上述组合策略时,请务必遵守以下建议以确保代码健壮性。
-
指定 线程池
- 不要 使用默认的
ForkJoinPool.commonPool(),它通常被 JVM 的其他并行流占用。 - 务必 在
supplyAsync或runAsync的第二个参数传入自定义的ExecutorService,以避免业务阻塞系统任务。
ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> "Task", executor); - 不要 使用默认的
-
处理 异常传播
allOf不会 因为单个任务失败而立即抛出异常,它会等待所有任务结束。如果某个任务失败,在调用该任务的.join()时才会抛出异常。- 建议 对单个 Future 使用
.exceptionally()进行异常捕获,避免主流程被一个任务的错误打断。
-
注意 返回值类型
allOf返回CompletableFuture<Void>,不要 试图从中直接获取具体结果。anyOf返回CompletableFuture<Object>,不要 假设它一定是某种类型,务必进行类型安全检查(如instanceof)。

暂无评论,快来抢沙发吧!