Java Exchanger线程间交换数据的同步工具
Java Exchanger 是一个用于线程间协作的同步工具类,它允许两个线程在汇合点交换数据。它提供了一个同步点,在这个同步点,两个线程可以互换数据。这种设计模式非常适合像“生产者-消费者”模式中,生产者需要缓冲区而消费者需要数据,或者遗传算法中两个个体需要交换基因等场景。
适用场景
在开始编写代码之前,确认你的业务场景符合以下特征:
- 成对协作:任务必须由两个线程配合完成,一个线程的数据是另一个线程的输入,反之亦然。
- 对等交换:两个线程地位对等,互为生产者和消费者,而不是单向传递。
- 手动同步:需要在代码的特定位置进行数据交换,而不是通过队列自动缓冲。
核心方法
Exchanger 类的核心逻辑非常简单,只包含两个重载的 exchange 方法。理解这两个方法的区别是编写稳定代码的关键。
| 方法签名 | 行为特征 | 适用场景 |
|---|---|---|
V exchange(V x) |
阻塞等待。当前线程一直等待,直到另一个线程到达交换点,或者线程被中断。 | 确定对方一定会来的情况,逻辑简单直接。 |
V exchange(V x, long timeout, TimeUnit unit) |
限时等待。等待指定的时间,如果超时仍未有伙伴线程到达,则抛出 TimeoutException。 |
需要防止死锁,或者对响应时间有严格要求的系统。 |
基础实战:数据交换
以下步骤演示如何创建两个线程,通过 Exchanger 交换字符串数据。
-
创建
Exchanger对象。
在类的成员变量或主方法中,实例化一个泛型为String的 Exchanger。Exchanger<String> exchanger = new Exchanger<>(); -
定义第一个线程(Thread A)。
该线程准备发送数据 "Data from A",并接收来自对方的数据。Thread threadA = new Thread(() -> { try { String dataA = "Data from A"; System.out.println("Thread A: 准备发送 " + dataA); // 调用 exchange 方法,发送 dataA 并阻塞等待接收数据 String receivedData = exchanger.exchange(dataA); System.out.println("Thread A: 收到回复 " + receivedData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Thread A: 被中断"); } }); -
定义第二个线程(Thread B)。
该线程准备发送数据 "Data from B",并接收来自对方的数据。Thread threadB = new Thread(() -> { try { String dataB = "Data from B"; System.out.println("Thread B: 准备发送 " + dataB); // 模拟耗时操作,证明 exchange 会等待 Thread.sleep(1000); String receivedData = exchanger.exchange(dataB); System.out.println("Thread B: 收到回复 " + receivedData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Thread B: 被中断"); } }); -
启动并观察线程运行。
先后启动两个线程。你会看到 Thread A 启动后会进入等待状态,直到 Thread B 执行完sleep并调用exchange方法,两者瞬间完成交换并继续运行。threadA.start(); threadB.start();
进阶处理:防止死锁
在实际生产环境中,如果 Thread B 因为逻辑错误或崩溃永远无法到达交换点,Thread A 将会无限期等待。为了避免这种情况,使用带超时参数的 exchange 方法。
-
修改 Thread A 的逻辑,加入超时控制。
设置等待时间为 2 秒。如果 2 秒内没有伙伴线程交换数据,则抛出异常。Thread threadA = new Thread(() -> { try { String dataA = "Data from A"; System.out.println("Thread A: 尝试交换数据(限时2秒)"); // 使用带超时的 exchange 方法 String receivedData = exchanger.exchange(dataA, 2, TimeUnit.SECONDS); System.out.println("Thread A: 交换成功 " + receivedData); } catch (TimeoutException e) { System.out.println("Thread A: 等待超时,交换失败"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Thread A: 被中断"); } }); -
验证异常处理。
如果只启动 Thread A 而不启动 Thread B,控制台将输出“等待超时,交换失败”,程序能正常结束而不是卡死。
底层原理
Exchanger 的工作机制可以概括为“单槽位交换”。它内部维护了一个共享槽位。为了更直观地理解线程间的交互过程,参考以下时序图。
等待 Thread B TB->>EX: "exchange(Data B)" Note over EX: 检测到双方就绪
执行数据互换 EX-->>TA: "返回 Data B" deactivate EX EX-->>TB: "返回 Data A" Note over TA, TB: 继续执行后续逻辑
从图中可以看出,Exchanger 的核心在于如果一个线程先到达,它必须持有锁并进入等待状态(通常是自旋或挂起),直到第二个线程到来。这种设计保证了数据传递的原子性和一致性。
注意事项
在使用 Exchanger 时,注意以下限制:
- 仅限两方:Exchanger 只能在两个线程之间交换数据。如果有多个线程调用同一个 Exchanger 实例,会出现数据混乱或不可预期的结果。
- 对象引用:Exchanger 交换的是对象的引用,而不是对象的拷贝。这意味着如果交换后修改了对象内容,对方线程看到的数据也会变化。
- 内存可见性:exchange 操作本身就具有
volatile读写的内存语义,能够保证交换数据的可见性,无需额外加锁。

暂无评论,快来抢沙发吧!