C++ 原子操作与内存序在自旋锁中的实现
多线程程序中,多个线程同时访问共享资源时必须进行同步,否则会导致数据竞争和未定义行为。自旋锁是一种轻量级的同步原语,适用于临界区执行时间较短的场景。C++11 引入了原子操作和内存序机制,使得我们可以不用依赖平台相关的内联汇编或系统 API,就能写出可移植、高性能的自旋锁。
什么是原子操作?
原子操作是指“不可分割”的操作:要么完全执行,要么完全不执行,不会被其他线程打断。在 C++ 中,std::atomic<T> 模板类提供了对基本类型(如 bool、int、指针等)的原子访问支持。
例如,声明一个原子布尔变量:
std::atomic<bool> flag{false};
对该变量的读写操作默认是原子的,但其内存可见性和执行顺序还受“内存序”控制。
内存序的作用
即使操作是原子的,编译器和 CPU 仍可能对指令重排以优化性能。内存序(memory order)用于约束这种重排行为,并控制不同线程间内存操作的可见性顺序。
C++ 提供六种内存序,其中与自旋锁最相关的是:
memory_order_acquire:用于读操作,确保该操作之后的所有读写不会被重排到它之前。memory_order_release:用于写操作,确保该操作之前的所有读写不会被重排到它之后。memory_order_relaxed:无同步语义,仅保证原子性。
正确的内存序组合可以构建“获取-释放”同步(acquire-release semantics),这是实现锁的关键。
实现一个基础自旋锁
定义自旋锁结构体,内部使用一个 std::atomic<bool> 表示锁状态:
class SpinLock {
private:
std::atomic<bool> locked{false};
public:
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) {
// 自旋等待,直到 locked 变为 false
}
}
void unlock() {
locked.store(false, std::memory_order_release);
}
};
分析关键点:
-
lock()中使用exchange(true, std::memory_order_acquire)exchange原子地将locked设置为true,并返回旧值。- 如果旧值是
true,说明锁已被占用,继续循环。 - 如果旧值是
false,说明成功获取锁,退出循环。 - 使用
memory_order_acquire确保后续临界区代码不会被重排到lock()调用之前。
-
unlock()中使用store(false, std::memory_order_release)- 将
locked设为false,释放锁。 - 使用
memory_order_release确保临界区内的所有操作在释放前对其他线程可见。
- 将
这种组合构成了经典的“获取-释放”同步对。
为什么不能全用 relaxed?
假设 lock() 和 unlock() 都使用 memory_order_relaxed:
// 错误示例
while (locked.exchange(true, std::memory_order_relaxed)) {}
locked.store(false, std::memory_order_relaxed);
此时虽然 locked 的读写是原子的,但临界区内的操作可能被重排到锁外。例如,一个线程在持有锁时修改共享变量 x = 42;,但由于没有内存屏障,另一个线程可能在看到 locked == false 后仍读到 x 的旧值。这破坏了锁的语义。
性能优化:加入暂停指令
在 x86 架构上,持续的忙等待会浪费 CPU 资源并可能导致缓存一致性流量激增。可以在自旋循环中插入 _mm_pause()(Intel)或等效指令,提示 CPU 当前处于自旋状态,降低功耗并提升超线程性能。
修改 lock() 方法:
#include <immintrin.h> // for _mm_pause
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) {
while (locked.load(std::memory_order_relaxed)) {
_mm_pause(); // 提示 CPU 处于自旋等待
}
}
}
这里内层循环先用 relaxed 加载检查锁状态,避免频繁执行代价较高的 exchange。只有当检测到锁可能已释放时,才尝试 exchange 获取锁。
使用 RAII 封装确保异常安全
直接调用 lock() 和 unlock() 容易因异常导致死锁。应使用 RAII(Resource Acquisition Is Initialization)惯用法。
定义锁守卫类:
class SpinLockGuard {
private:
SpinLock& lock_;
public:
explicit SpinLockGuard(SpinLock& l) : lock_(l) {
lock_.lock();
}
~SpinLockGuard() {
lock_.unlock();
}
// 禁止拷贝
SpinLockGuard(const SpinLockGuard&) = delete;
SpinLockGuard& operator=(const SpinLockGuard&) = delete;
};
使用方式:
SpinLock my_lock;
{
SpinLockGuard guard(my_lock); // 自动加锁
// 访问共享资源
} // 离开作用域自动解锁
内存序选择对照表
以下表格总结了不同内存序在自旋锁中的适用性:
| 操作 | 推荐内存序 | 原因说明 |
|---|---|---|
exchange in lock() |
memory_order_acquire |
确保临界区操作不会重排到加锁前 |
store in unlock() |
memory_order_release |
确保临界区操作在释放前对其他线程可见 |
忙等待中的 load |
memory_order_relaxed |
仅用于探测状态,无需同步语义 |
注意事项
- 避免长时间持有自旋锁:自旋锁会持续占用 CPU,若临界区可能阻塞(如 I/O 操作),应改用互斥锁(
std::mutex)。 - 不要嵌套加锁:此实现不支持递归锁,同一线程重复加锁会导致死锁。
- 跨平台兼容性:
_mm_pause()是 x86 特有。若需跨平台,可用条件编译:#ifdef __x86_64__ _mm_pause(); #elif defined(__aarch64__) __asm__ volatile("yield" ::: "memory"); #endif
完整可运行示例
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <immintrin.h>
class SpinLock {
std::atomic<bool> locked{false};
public:
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) {
while (locked.load(std::memory_order_relaxed)) {
_mm_pause();
}
}
}
void unlock() {
locked.store(false, std::memory_order_release);
}
};
class SpinLockGuard {
SpinLock& lock_;
public:
explicit SpinLockGuard(SpinLock& l) : lock_(l) { lock_.lock(); }
~SpinLockGuard() { lock_.unlock(); }
SpinLockGuard(const SpinLockGuard&) = delete;
SpinLockGuard& operator=(const SpinLockGuard&) = delete;
};
int main() {
SpinLock lock;
int counter = 0;
auto worker = [&]() {
for (int i = 0; i < 10000; ++i) {
SpinLockGuard guard(lock);
++counter;
}
};
std::thread t1(worker);
std::thread t2(worker);
t1.join();
t2.join();
std::cout << "Final counter: " << counter << std::endl; // 应输出 20000
return 0;
}
暂无评论,快来抢沙发吧!