文章目录

C++ 原子操作与内存序在自旋锁中的实现

发布于 2026-04-02 20:36:09 · 浏览 8 次 · 评论 0 条

C++ 原子操作与内存序在自旋锁中的实现

多线程程序中,多个线程同时访问共享资源时必须进行同步,否则会导致数据竞争和未定义行为。自旋锁是一种轻量级的同步原语,适用于临界区执行时间较短的场景。C++11 引入了原子操作和内存序机制,使得我们可以不用依赖平台相关的内联汇编或系统 API,就能写出可移植、高性能的自旋锁。


什么是原子操作?

原子操作是指“不可分割”的操作:要么完全执行,要么完全不执行,不会被其他线程打断。在 C++ 中,std::atomic<T> 模板类提供了对基本类型(如 boolint、指针等)的原子访问支持。

例如,声明一个原子布尔变量:

std::atomic<bool> flag{false};

对该变量的读写操作默认是原子的,但其内存可见性和执行顺序还受“内存序”控制。


内存序的作用

即使操作是原子的,编译器和 CPU 仍可能对指令重排以优化性能。内存序(memory order)用于约束这种重排行为,并控制不同线程间内存操作的可见性顺序。

C++ 提供六种内存序,其中与自旋锁最相关的是:

  • memory_order_acquire:用于读操作,确保该操作之后的所有读写不会被重排到它之前。
  • memory_order_release:用于写操作,确保该操作之前的所有读写不会被重排到它之后。
  • memory_order_relaxed:无同步语义,仅保证原子性。

正确的内存序组合可以构建“获取-释放”同步(acquire-release semantics),这是实现锁的关键。


实现一个基础自旋锁

定义自旋锁结构体,内部使用一个 std::atomic<bool> 表示锁状态:

class SpinLock {
private:
    std::atomic<bool> locked{false};

public:
    void lock() {
        while (locked.exchange(true, std::memory_order_acquire)) {
            // 自旋等待,直到 locked 变为 false
        }
    }

    void unlock() {
        locked.store(false, std::memory_order_release);
    }
};

分析关键点:

  1. lock() 中使用 exchange(true, std::memory_order_acquire)

    • exchange 原子地将 locked 设置为 true,并返回旧值。
    • 如果旧值是 true,说明锁已被占用,继续循环。
    • 如果旧值是 false,说明成功获取锁,退出循环。
    • 使用 memory_order_acquire 确保后续临界区代码不会被重排到 lock() 调用之前。
  2. unlock() 中使用 store(false, std::memory_order_release)

    • locked 设为 false,释放锁。
    • 使用 memory_order_release 确保临界区内的所有操作在释放前对其他线程可见。

这种组合构成了经典的“获取-释放”同步对。


为什么不能全用 relaxed?

假设 lock()unlock() 都使用 memory_order_relaxed

// 错误示例
while (locked.exchange(true, std::memory_order_relaxed)) {}
locked.store(false, std::memory_order_relaxed);

此时虽然 locked 的读写是原子的,但临界区内的操作可能被重排到锁外。例如,一个线程在持有锁时修改共享变量 x = 42;,但由于没有内存屏障,另一个线程可能在看到 locked == false 后仍读到 x 的旧值。这破坏了锁的语义。


性能优化:加入暂停指令

在 x86 架构上,持续的忙等待会浪费 CPU 资源并可能导致缓存一致性流量激增。可以在自旋循环中插入 _mm_pause()(Intel)或等效指令,提示 CPU 当前处于自旋状态,降低功耗并提升超线程性能。

修改 lock() 方法:

#include <immintrin.h> // for _mm_pause

void lock() {
    while (locked.exchange(true, std::memory_order_acquire)) {
        while (locked.load(std::memory_order_relaxed)) {
            _mm_pause(); // 提示 CPU 处于自旋等待
        }
    }
}

这里内层循环先用 relaxed 加载检查锁状态,避免频繁执行代价较高的 exchange。只有当检测到锁可能已释放时,才尝试 exchange 获取锁。


使用 RAII 封装确保异常安全

直接调用 lock()unlock() 容易因异常导致死锁。应使用 RAII(Resource Acquisition Is Initialization)惯用法。

定义锁守卫类:

class SpinLockGuard {
private:
    SpinLock& lock_;

public:
    explicit SpinLockGuard(SpinLock& l) : lock_(l) {
        lock_.lock();
    }

    ~SpinLockGuard() {
        lock_.unlock();
    }

    // 禁止拷贝
    SpinLockGuard(const SpinLockGuard&) = delete;
    SpinLockGuard& operator=(const SpinLockGuard&) = delete;
};

使用方式:

SpinLock my_lock;
{
    SpinLockGuard guard(my_lock); // 自动加锁
    // 访问共享资源
} // 离开作用域自动解锁

内存序选择对照表

以下表格总结了不同内存序在自旋锁中的适用性:

操作 推荐内存序 原因说明
exchange in lock() memory_order_acquire 确保临界区操作不会重排到加锁前
store in unlock() memory_order_release 确保临界区操作在释放前对其他线程可见
忙等待中的 load memory_order_relaxed 仅用于探测状态,无需同步语义

注意事项

  1. 避免长时间持有自旋锁:自旋锁会持续占用 CPU,若临界区可能阻塞(如 I/O 操作),应改用互斥锁(std::mutex)。
  2. 不要嵌套加锁:此实现不支持递归锁,同一线程重复加锁会导致死锁。
  3. 跨平台兼容性_mm_pause() 是 x86 特有。若需跨平台,可用条件编译:
    #ifdef __x86_64__
        _mm_pause();
    #elif defined(__aarch64__)
        __asm__ volatile("yield" ::: "memory");
    #endif

完整可运行示例

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <immintrin.h>

class SpinLock {
    std::atomic<bool> locked{false};

public:
    void lock() {
        while (locked.exchange(true, std::memory_order_acquire)) {
            while (locked.load(std::memory_order_relaxed)) {
                _mm_pause();
            }
        }
    }

    void unlock() {
        locked.store(false, std::memory_order_release);
    }
};

class SpinLockGuard {
    SpinLock& lock_;
public:
    explicit SpinLockGuard(SpinLock& l) : lock_(l) { lock_.lock(); }
    ~SpinLockGuard() { lock_.unlock(); }
    SpinLockGuard(const SpinLockGuard&) = delete;
    SpinLockGuard& operator=(const SpinLockGuard&) = delete;
};

int main() {
    SpinLock lock;
    int counter = 0;

    auto worker = [&]() {
        for (int i = 0; i < 10000; ++i) {
            SpinLockGuard guard(lock);
            ++counter;
        }
    };

    std::thread t1(worker);
    std::thread t2(worker);

    t1.join();
    t2.join();

    std::cout << "Final counter: " << counter << std::endl; // 应输出 20000
    return 0;
}

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文