文章目录

Java Exchanger线程间交换数据的同步工具

发布于 2026-05-06 03:23:01 · 浏览 13 次 · 评论 0 条

Java Exchanger线程间交换数据的同步工具

Java Exchanger 是一个用于线程间协作的同步工具类,它允许两个线程在汇合点交换数据。它提供了一个同步点,在这个同步点,两个线程可以互换数据。这种设计模式非常适合像“生产者-消费者”模式中,生产者需要缓冲区而消费者需要数据,或者遗传算法中两个个体需要交换基因等场景。


适用场景

在开始编写代码之前,确认你的业务场景符合以下特征:

  1. 成对协作:任务必须由两个线程配合完成,一个线程的数据是另一个线程的输入,反之亦然。
  2. 对等交换:两个线程地位对等,互为生产者和消费者,而不是单向传递。
  3. 手动同步:需要在代码的特定位置进行数据交换,而不是通过队列自动缓冲。

核心方法

Exchanger 类的核心逻辑非常简单,只包含两个重载的 exchange 方法。理解这两个方法的区别是编写稳定代码的关键。

方法签名 行为特征 适用场景
V exchange(V x) 阻塞等待。当前线程一直等待,直到另一个线程到达交换点,或者线程被中断。 确定对方一定会来的情况,逻辑简单直接。
V exchange(V x, long timeout, TimeUnit unit) 限时等待。等待指定的时间,如果超时仍未有伙伴线程到达,则抛出 TimeoutException 需要防止死锁,或者对响应时间有严格要求的系统。

基础实战:数据交换

以下步骤演示如何创建两个线程,通过 Exchanger 交换字符串数据。

  1. 创建 Exchanger 对象。
    在类的成员变量或主方法中,实例化一个泛型为 String 的 Exchanger。

    Exchanger<String> exchanger = new Exchanger<>();
  2. 定义第一个线程(Thread A)。
    该线程准备发送数据 "Data from A",并接收来自对方的数据。

    Thread threadA = new Thread(() -> {
        try {
            String dataA = "Data from A";
            System.out.println("Thread A: 准备发送 " + dataA);
    
            // 调用 exchange 方法,发送 dataA 并阻塞等待接收数据
            String receivedData = exchanger.exchange(dataA);
    
            System.out.println("Thread A: 收到回复 " + receivedData);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Thread A: 被中断");
        }
    });
  3. 定义第二个线程(Thread B)。
    该线程准备发送数据 "Data from B",并接收来自对方的数据。

    Thread threadB = new Thread(() -> {
        try {
            String dataB = "Data from B";
            System.out.println("Thread B: 准备发送 " + dataB);
    
            // 模拟耗时操作,证明 exchange 会等待
            Thread.sleep(1000); 
    
            String receivedData = exchanger.exchange(dataB);
    
            System.out.println("Thread B: 收到回复 " + receivedData);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Thread B: 被中断");
        }
    });
  4. 启动并观察线程运行。
    先后启动两个线程。你会看到 Thread A 启动后会进入等待状态,直到 Thread B 执行完 sleep 并调用 exchange 方法,两者瞬间完成交换并继续运行。

    threadA.start();
    threadB.start();

进阶处理:防止死锁

在实际生产环境中,如果 Thread B 因为逻辑错误或崩溃永远无法到达交换点,Thread A 将会无限期等待。为了避免这种情况,使用带超时参数的 exchange 方法。

  1. 修改 Thread A 的逻辑,加入超时控制。
    设置等待时间为 2 秒。如果 2 秒内没有伙伴线程交换数据,则抛出异常。

    Thread threadA = new Thread(() -> {
        try {
            String dataA = "Data from A";
            System.out.println("Thread A: 尝试交换数据(限时2秒)");
    
            // 使用带超时的 exchange 方法
            String receivedData = exchanger.exchange(dataA, 2, TimeUnit.SECONDS);
    
            System.out.println("Thread A: 交换成功 " + receivedData);
        } catch (TimeoutException e) {
            System.out.println("Thread A: 等待超时,交换失败");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Thread A: 被中断");
        }
    });
  2. 验证异常处理。
    如果只启动 Thread A 而不启动 Thread B,控制台将输出“等待超时,交换失败”,程序能正常结束而不是卡死。


底层原理

Exchanger 的工作机制可以概括为“单槽位交换”。它内部维护了一个共享槽位。为了更直观地理解线程间的交互过程,参考以下时序图。

sequenceDiagram participant "Thread A" as TA participant "Exchanger" as EX participant "Thread B" as TB TA->>EX: "exchange(Data A)" Note over TA: 阻塞等待 activate EX Note over EX: 占用槽位
等待 Thread B TB->>EX: "exchange(Data B)" Note over EX: 检测到双方就绪
执行数据互换 EX-->>TA: "返回 Data B" deactivate EX EX-->>TB: "返回 Data A" Note over TA, TB: 继续执行后续逻辑

从图中可以看出,Exchanger 的核心在于如果一个线程先到达,它必须持有锁并进入等待状态(通常是自旋或挂起),直到第二个线程到来。这种设计保证了数据传递的原子性和一致性。


注意事项

在使用 Exchanger 时,注意以下限制:

  1. 仅限两方:Exchanger 只能在两个线程之间交换数据。如果有多个线程调用同一个 Exchanger 实例,会出现数据混乱或不可预期的结果。
  2. 对象引用:Exchanger 交换的是对象的引用,而不是对象的拷贝。这意味着如果交换后修改了对象内容,对方线程看到的数据也会变化。
  3. 内存可见性:exchange 操作本身就具有 volatile 读写的内存语义,能够保证交换数据的可见性,无需额外加锁。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文