文章目录

Java StructuredTaskScope结构化并发的子任务管理

发布于 2026-04-26 07:25:11 · 浏览 3 次 · 评论 0 条

Java StructuredTaskScope结构化并发的子任务管理

Java 21 正式推出了结构化并发,它通过 StructuredTaskScope 将一组相关的子任务视为一个单元进行管理。这种方式极大地简化了多线程编程,特别是当主任务需要等待多个子任务完成或任意一个完成时。本文将详细介绍如何使用 StructuredTaskScope 管理子任务,包括创建、状态监控、结果获取以及异常处理。


1. 环境准备

在使用 StructuredTaskScope 之前,需要确保开发环境满足以下条件。

  1. 确认 Java 版本为 21 或更高。打开终端,输入以下命令检查版本:
    java -version
  2. 配置 IDE 项目语言级别。如果在 IntelliJ IDEA 中,打开 File -> Project Structure -> Project,将 Language level 设置21 - Pattern matching for switch
  3. 添加 模块依赖。由于 StructuredTaskScope 位于预览 API(虽然 Java 21 是正式版,但部分相关 API 可能仍在孵化或预览状态,视具体更新而定,通常在 jdk.incubator.concurrent 包下,但在 Java 21 Final 中已移入 java.util.concurrent),通常不需要额外配置,但若使用早期版本,需在 module-info.java 中添加:
    requires java.util.concurrent;

2. 基础用法:创建与等待子任务

StructuredTaskScope 的核心思想是“作用域”。子任务在作用域内派生,并在作用域关闭时自动清理。

  1. 定义 具体的业务逻辑。假设我们需要模拟两个耗时操作:查询用户信息和查询订单信息。

    import java.util.concurrent.*;
    import java.util.concurrent.StructuredTaskScope.Subtask;
    
    String fetchUser() {
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        return "User: Alice";
    }
    
    String fetchOrder() {
        try { Thread.sleep(1500); } catch (InterruptedException e) {}
        return "Order: #12345";
    }
  2. 创建 StructuredTaskScope 实例。使用 try-with-resources 语句确保作用域正确关闭。

    try (var scope = new StructuredTaskScope<String>()) {
    // 后续代码将在此处编写
    } // scope.close() 自动调用,等待所有子任务结束
  3. 派生 子任务。在作用域内,调用 scope.fork() 方法将任务提交到线程池。

    try (var scope = new StructuredTaskScope<String>()) {
        Subtask<String> userTask = scope.fork(() -> fetchUser());
        Subtask<String> orderTask = scope.fork(() -> fetchOrder());
    }
  4. 等待 所有子任务完成。调用 scope.join() 方法阻塞当前线程,直到所有子任务终止(无论成功或失败)。

    try (var scope = new StructuredTaskScope<String>()) {
        Subtask<String> userTask = scope.fork(() -> fetchUser());
        Subtask<String> orderTask = scope.fork(() -> fetchOrder());
    
        scope.join(); // 阻塞等待
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }

3. 子任务状态管理与结果获取

fork() 方法返回一个 Subtask 对象,它是管理子任务状态的关键句柄。在 join() 返回后,可以通过 Subtask 获取执行结果。

3.1 理解子任务状态

子任务的生命周期可以通过以下状态机描述。当任务执行完毕,它最终会处于 SUCCESSFAILED 状态。

stateDiagram-v2 [*] --> UNAVAILABLE : fork() UNAVAILABLE --> SUCCESS : 任务正常结束 UNAVAILABLE --> FAILED : 抛出异常 SUCCESS --> [*] FAILED --> [*]

3.2 检查状态并获取结果

  1. 检查 子任务状态。使用 userTask.state() 方法。

    if (userTask.state() == Subtask.State.SUCCESS) {
        // 处理成功逻辑
    }
  2. 获取 子任务结果。如果状态为 SUCCESS调用 get() 方法返回值;如果状态为 FAILED调用 get() 会抛出异常。

    try (var scope = new StructuredTaskScope<String>()) {
        Subtask<String> userTask = scope.fork(() -> fetchUser());
        Subtask<String> orderTask = scope.fork(() -> fetchOrder());
    
        scope.join();
    
        // 获取结果
        if (userTask.state() == Subtask.State.SUCCESS) {
            System.out.println("Result: " + userTask.get());
        }
        if (orderTask.state() == Subtask.State.SUCCESS) {
            System.out.println("Result: " + orderTask.get());
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }

4. 进阶管理:聚合结果与异常处理

StructuredTaskScope 提供了两个预定义的实现类来简化常见的并发模式:ShutdownOnSuccessShutdownOnFailure

4.1 成功一个即可(ShutdownOnSuccess)

当需要“谁快用谁”的场景(例如查询多个镜像服务器),使用此模式。

  1. 创建 ShutdownOnSuccess 实例。
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
  2. 派生 多个竞争性任务。
        scope.fork(() -> fetchFromServerA());
        scope.fork(() -> fetchFromServerB());
        scope.fork(() -> fetchFromServerC());
  3. 调用 scope.joinUntil(Instant)scope.join() 等待第一个成功结果。
  4. 获取 胜利者的结果。使用 scope.result() 方法。
        scope.join();
        String result = scope.result(); // 抛出 ExecutionException 如果全部失败
        System.out.println("First result: " + result);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }

4.2 全部成功或任一失败(ShutdownOnFailure)

当需要“只要有一个错就全停”的场景(例如银行转账多步骤校验),使用此模式。

  1. 创建 ShutdownOnFailure 实例。

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  2. 派生 所有必须成功的子任务。

        Subtask<String> login = scope.fork(() -> login());
        Subtask<String> checkBalance = scope.fork(() -> checkBalance());
  3. 等待 完成并检查异常。

        scope.join(); // 等待所有任务结束或因异常被中断
    
        // 如果有任务失败,抛出聚合的异常
        scope.throwIfFailed(); 
    
        // 全部成功,继续处理
        System.out.println("Login: " + login.get());
        System.out.println("Balance: " + checkBalance.get());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }

5. 自定义子任务处理策略

如果预定义的 ShutdownOnSuccessShutdownOnFailure 无法满足需求,可以继承 StructuredTaskScope 并重写 handleComplete 方法来实现自定义逻辑。

  1. 定义 一个继承自 StructuredTaskScope 的内部类。
  2. 重写 handleComplete(Subtask<? extends T> subtask) 方法。每当一个子任务完成时,此方法会被调用。
  3. 实现 “收集所有成功结果”的逻辑。
class CollectorScope<T> extends StructuredTaskScope<T> {
    private final List<T> results = new ArrayList<>();

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        switch (subtask.state()) {
            case SUCCESS:
                // 将成功的结果加入列表
                results.add(subtask.get());
                break;
            case FAILED:
                // 这里可以选择记录日志,或者调用 shutdown() 来取消其他任务
                System.out.println("Task failed: " + subtask.exception());
                break;
            case UNAVAILABLE:
                // 理论上不会进入此分支
                break;
        }
    }

    // 提供一个获取结果的方法
    public List<T> results() {
        return Collections.unmodifiableList(results);
    }
}
  1. 使用 自定义 Scope。
try (var scope = new CollectorScope<String>()) {
    scope.fork(() -> "Task A Result");
    scope.fork(() -> { throw new RuntimeException("Task B Failed"); });
    scope.fork(() -> "Task C Result");

    scope.join();

    // 获取所有成功的子任务结果
    List<String> allResults = scope.results();
    System.out.println("Successful tasks: " + allResults);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

6. 常见问题与注意事项

以下表格总结了在使用过程中可能遇到的关键点及处理方法。

场景 关键方法/行为 注意事项
取消所有子任务 scope.shutdown() 调用后,已派生但未开始的任务不会开始,正在运行的任务会收到中断信号(若支持中断)。
等待超时 scope.joinUntil(Instant.now().plusSeconds(5)) 必须处理 TimeoutException,超时后通常应调用 scope.shutdown() 清理资源。
线程安全 handleComplete 内部操作 handleComplete 可能被并发调用,操作共享数据(如 List)时必须使用线程安全集合或加锁。
异常聚合 scope.exception() ShutdownOnFailure 模式下,如果有多个子任务失败,异常可能被聚合。

通过遵循上述步骤和模式,你可以有效地利用 Java 结构化并发来管理复杂的子任务关系,同时保持代码的清晰性和可维护性。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文