Java StructuredTaskScope结构化并发的子任务管理
Java 21 正式推出了结构化并发,它通过 StructuredTaskScope 将一组相关的子任务视为一个单元进行管理。这种方式极大地简化了多线程编程,特别是当主任务需要等待多个子任务完成或任意一个完成时。本文将详细介绍如何使用 StructuredTaskScope 管理子任务,包括创建、状态监控、结果获取以及异常处理。
1. 环境准备
在使用 StructuredTaskScope 之前,需要确保开发环境满足以下条件。
- 确认 Java 版本为 21 或更高。打开终端,输入以下命令检查版本:
java -version - 配置 IDE 项目语言级别。如果在 IntelliJ IDEA 中,打开
File->Project Structure->Project,将Language level设置 为21 - Pattern matching for switch。 - 添加 模块依赖。由于
StructuredTaskScope位于预览 API(虽然 Java 21 是正式版,但部分相关 API 可能仍在孵化或预览状态,视具体更新而定,通常在jdk.incubator.concurrent包下,但在 Java 21 Final 中已移入java.util.concurrent),通常不需要额外配置,但若使用早期版本,需在module-info.java中添加:requires java.util.concurrent;
2. 基础用法:创建与等待子任务
StructuredTaskScope 的核心思想是“作用域”。子任务在作用域内派生,并在作用域关闭时自动清理。
-
定义 具体的业务逻辑。假设我们需要模拟两个耗时操作:查询用户信息和查询订单信息。
import java.util.concurrent.*; import java.util.concurrent.StructuredTaskScope.Subtask; String fetchUser() { try { Thread.sleep(1000); } catch (InterruptedException e) {} return "User: Alice"; } String fetchOrder() { try { Thread.sleep(1500); } catch (InterruptedException e) {} return "Order: #12345"; } -
创建
StructuredTaskScope实例。使用 try-with-resources 语句确保作用域正确关闭。try (var scope = new StructuredTaskScope<String>()) { // 后续代码将在此处编写 } // scope.close() 自动调用,等待所有子任务结束 -
派生 子任务。在作用域内,调用
scope.fork()方法将任务提交到线程池。try (var scope = new StructuredTaskScope<String>()) { Subtask<String> userTask = scope.fork(() -> fetchUser()); Subtask<String> orderTask = scope.fork(() -> fetchOrder()); } -
等待 所有子任务完成。调用
scope.join()方法阻塞当前线程,直到所有子任务终止(无论成功或失败)。try (var scope = new StructuredTaskScope<String>()) { Subtask<String> userTask = scope.fork(() -> fetchUser()); Subtask<String> orderTask = scope.fork(() -> fetchOrder()); scope.join(); // 阻塞等待 } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
3. 子任务状态管理与结果获取
fork() 方法返回一个 Subtask 对象,它是管理子任务状态的关键句柄。在 join() 返回后,可以通过 Subtask 获取执行结果。
3.1 理解子任务状态
子任务的生命周期可以通过以下状态机描述。当任务执行完毕,它最终会处于 SUCCESS 或 FAILED 状态。
3.2 检查状态并获取结果
-
检查 子任务状态。使用
userTask.state()方法。if (userTask.state() == Subtask.State.SUCCESS) { // 处理成功逻辑 } -
获取 子任务结果。如果状态为
SUCCESS,调用get()方法返回值;如果状态为FAILED,调用get()会抛出异常。try (var scope = new StructuredTaskScope<String>()) { Subtask<String> userTask = scope.fork(() -> fetchUser()); Subtask<String> orderTask = scope.fork(() -> fetchOrder()); scope.join(); // 获取结果 if (userTask.state() == Subtask.State.SUCCESS) { System.out.println("Result: " + userTask.get()); } if (orderTask.state() == Subtask.State.SUCCESS) { System.out.println("Result: " + orderTask.get()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
4. 进阶管理:聚合结果与异常处理
StructuredTaskScope 提供了两个预定义的实现类来简化常见的并发模式:ShutdownOnSuccess 和 ShutdownOnFailure。
4.1 成功一个即可(ShutdownOnSuccess)
当需要“谁快用谁”的场景(例如查询多个镜像服务器),使用此模式。
- 创建
ShutdownOnSuccess实例。try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { - 派生 多个竞争性任务。
scope.fork(() -> fetchFromServerA()); scope.fork(() -> fetchFromServerB()); scope.fork(() -> fetchFromServerC()); - 调用
scope.joinUntil(Instant)或scope.join()等待第一个成功结果。 - 获取 胜利者的结果。使用
scope.result()方法。scope.join(); String result = scope.result(); // 抛出 ExecutionException 如果全部失败 System.out.println("First result: " + result); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
4.2 全部成功或任一失败(ShutdownOnFailure)
当需要“只要有一个错就全停”的场景(例如银行转账多步骤校验),使用此模式。
-
创建
ShutdownOnFailure实例。try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { -
派生 所有必须成功的子任务。
Subtask<String> login = scope.fork(() -> login()); Subtask<String> checkBalance = scope.fork(() -> checkBalance()); -
等待 完成并检查异常。
scope.join(); // 等待所有任务结束或因异常被中断 // 如果有任务失败,抛出聚合的异常 scope.throwIfFailed(); // 全部成功,继续处理 System.out.println("Login: " + login.get()); System.out.println("Balance: " + checkBalance.get()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
5. 自定义子任务处理策略
如果预定义的 ShutdownOnSuccess 或 ShutdownOnFailure 无法满足需求,可以继承 StructuredTaskScope 并重写 handleComplete 方法来实现自定义逻辑。
- 定义 一个继承自
StructuredTaskScope的内部类。 - 重写
handleComplete(Subtask<? extends T> subtask)方法。每当一个子任务完成时,此方法会被调用。 - 实现 “收集所有成功结果”的逻辑。
class CollectorScope<T> extends StructuredTaskScope<T> {
private final List<T> results = new ArrayList<>();
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
switch (subtask.state()) {
case SUCCESS:
// 将成功的结果加入列表
results.add(subtask.get());
break;
case FAILED:
// 这里可以选择记录日志,或者调用 shutdown() 来取消其他任务
System.out.println("Task failed: " + subtask.exception());
break;
case UNAVAILABLE:
// 理论上不会进入此分支
break;
}
}
// 提供一个获取结果的方法
public List<T> results() {
return Collections.unmodifiableList(results);
}
}
- 使用 自定义 Scope。
try (var scope = new CollectorScope<String>()) {
scope.fork(() -> "Task A Result");
scope.fork(() -> { throw new RuntimeException("Task B Failed"); });
scope.fork(() -> "Task C Result");
scope.join();
// 获取所有成功的子任务结果
List<String> allResults = scope.results();
System.out.println("Successful tasks: " + allResults);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
6. 常见问题与注意事项
以下表格总结了在使用过程中可能遇到的关键点及处理方法。
| 场景 | 关键方法/行为 | 注意事项 |
|---|---|---|
| 取消所有子任务 | scope.shutdown() |
调用后,已派生但未开始的任务不会开始,正在运行的任务会收到中断信号(若支持中断)。 |
| 等待超时 | scope.joinUntil(Instant.now().plusSeconds(5)) |
必须处理 TimeoutException,超时后通常应调用 scope.shutdown() 清理资源。 |
| 线程安全 | handleComplete 内部操作 |
handleComplete 可能被并发调用,操作共享数据(如 List)时必须使用线程安全集合或加锁。 |
| 异常聚合 | scope.exception() |
在 ShutdownOnFailure 模式下,如果有多个子任务失败,异常可能被聚合。 |
通过遵循上述步骤和模式,你可以有效地利用 Java 结构化并发来管理复杂的子任务关系,同时保持代码的清晰性和可维护性。

暂无评论,快来抢沙发吧!