C++ 多线程中 std::atomic 的使用

在多线程编程中,多个线程需要安全访问和修改共享数据。复杂场景通常使用锁(如 std::mutex)防止数据竞争,但在简单场景下,频繁加锁会降低性能。有没有既能保证一定性能,又能避免数据竞争的方法呢?

C++11 引入了 std::atomic,专为简单共享数据设计,无需加锁即可实现线程安全,效率更高。

接下来,我们将通过几个小节介绍 std::atomic 的用法和原理。

1. 问题场景

我们先来看一个非常常见的多线程编程任务:

“启动两个线程,每个线程都对同一个计数器执行一百万次递增操作。”

这看似简单的需求,却恰好暴露了多线程中“共享变量写冲突”的典型问题。为了避免这种错误,我们通常会用两种方式来解决:

  • 使用 std::mutex 互斥锁来保护共享变量
  • 使用 std::atomic 实现无锁的线程安全

下面我们通过一个对比示例,一起看看两种方式的差异以及 std::atomic 能带来什么性能提升。

#include <iostream>
#include <thread>
#include <mutex>

#if 0
std::mutex my_mutex;
int counter = 0;
void worker()
{
	for (int i = 0; i < 1000000; ++i)
	{
		my_mutex.lock();
		++counter;
		my_mutex.unlock();
	}
}
#else
std::atomic<int> counter(0);
void worker()
{
	for (int i = 0; i < 1000000; ++i)
	{
		++counter;
	}
}
#endif

void demo()
{
	auto start = std::chrono::steady_clock::now();

	std::thread t1(worker);
	std::thread t2(worker);
	t1.join();
	t2.join();

	auto end = std::chrono::steady_clock::now();
	// 计算持续时间,单位:毫秒
	auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
	std::cout << "counter = " << counter << " time = " << duration.count() << std::endl;
}

int main()
{
	demo();
	return 0;
}

2. 基本使用

std::atomic 是 C++11 提供的原子类型,该类型提供了一些原子操作方法,用于在多线程环境中安全地读写共享数据。原子操作是指不可被中断的操作,执行过程中不会被其他线程观察到中间状态。

比如一个普通的 counter = counter + 1 实际上需要三步:

  1. 读取 counter 的值(比如是 5)
  2. 加 1,变成 6
  3. 把 6 写回去

如果没有原子操作,线程 A 执行到第 2 步时,线程 B 刚好也开始执行这三步,它读到的还是旧值 5,结果两个线程都写回了 6,虽然本该变成 7。原子操作能把这三步合成一步,不让别的线程插进来,从而保证结果正确。也就是说:原子操作确保一个线程在操作共享变量时,其他线程必须等它完全执行完,才能继续操作。

下面通过几个小例子,快速了解 std::atomic 提供的原子操作方法。

#if 1
#include <iostream>
#include <thread>
#include <mutex>


// 1. 基本操作
void demo01()
{	
	std::atomic<int> atomic_value(0);

	// 给原子变量设置新的值
	atomic_value.store(10);
	// 注意 atomic 并没有重载 << 运算符,这里是先将 atomic 转换为 int,再 cout 输出
	std::cout << "atomic_value = " << atomic_value << std::endl;

	// 读取原子变量当前值
	int cur_value = atomic_value.load();
	std::cout << "cur_value = " << cur_value << std::endl;

	// 先读旧值,再设置新值,最后返回旧值
	int old_value = atomic_value.exchange(20);
	std::cout << "old_value = " << old_value << std::endl;
}

// 2. CAS(compare-and-swap)操作
void demo02()
{
	std::atomic<int> atom(10);

	int expected = 20;
	int desired = 20;

	bool ret = atom.compare_exchange_strong(expected, desired);

	// 为什么比较失败后,要将 expected 设置为 atom 的值?
	// 这是因为失败后,我们一般会想知道当前的 atom 是啥值。这样省略再 atom.load() 读取一下
	std::cout << "ret:" << std::boolalpha << ret
		<< " expected:" << expected
		<< " atom:" << atom << std::endl;

	// compare_exchange_weak 的作用和 strong 是一样的,不同的是:
	// weak 在 expected 和 atom 值一样的时,也可能会出现伪失败,这是硬件层面的原因
	// 其不用确保一定正确,所以其效率要比 strong 更高
	// 注意:某些平台(如 MSVC)可能将 compare_exchange_weak 和 compare_exchange_strong 实现为相同的底层指令,
	// 因此表现上看不到弱版本的伪失败。但在具有更强优化能力的体系结构(如 ARM)上,weak 是可能存在伪失败。
}

// 3. int类型原子变量支持的原子操作
void demo03()
{
	std::atomic<int> atom(0);
	atom = 10;   // 等价于 atom.store(10)
	atom += 10;  // 等价于 atom.fetch_add(10);
	atom -= 10;  // 等价于 atom.fetch_sub(10);
	atom |= 10;  // 等价于 atom.fetch_or(10);
	atom &= 10;  // 等价于 atom.fetch_and(10);
	atom ^= 10;  // 等价于 atom.fetch_xor(10);
	atom++;
	++atom;
}

int main()
{
	demo01();
	demo02();
	demo03();
	return 0;
}
#endif

3. 实现原理

既然 std::atomic 能在多线程环境下保证操作的原子性,而且在某些场景下性能优于锁机制,那它背后到底是怎么实现的?

std::atomic 的原子性实现主要依赖两种机制:

  1. 硬件原子指令支持(Lock-Free):现代 CPU(如 x86、ARM)提供多种原子操作指令(如 x86 的 LOCK 前缀指令,ARM 的 LDREX/STREX 指令),可以直接对某些大小和对齐要求的类型实现高效的无锁原子操作。
  2. 基于锁的模拟(Locking):当类型过大、不满足对齐要求,或目标平台/编译器不支持对应硬件原子指令时,std::atomic 可能通过内部互斥锁(如 std::mutex)模拟原子性,以牺牲性能换取正确性。

判断是否 lock-free:

  • 可以通过 std::atomic<T>::is_lock_free() 判断某个具体类型或实例是否在当前环境下采用 lock-free 实现。
  • 返回 true 表示使用硬件原子指令实现;返回 false 则可能是基于锁实现。

Windows(MSVC)通常采用类似下面的规则来判断某个类型是否支持 lock-free:

_TypeSize <= 8 && (_TypeSize & (_TypeSize - 1)) == 0;

  • 类型大小小于等于 8 字节(64 位)
  • 类型型大小为 2 的幂次方(1、2、4、8 字节)。

下面的程序中,结构体被设置为 1 字节对齐,导致 Demo 的大小为 5 字节,虽然小于 8 字节,但是不支持 std::atomic

#include <iostream>

// 设置 struct 按照 1 字节对齐
#pragma pack(push, 1) 

struct Demo
{
	int a;
	char c;
};

void test()
{
	// 输出:5
	std::cout << sizeof(Demo) << std::endl;

	std::atomic<Demo> demo;
	// 输出:false
	std::cout << std::boolalpha << demo.is_lock_free() << std::endl;
}

int main()
{
	test();
	return 0;
}

注意:is_lock_free() 是与平台、CPU 架构、编译器实现以及实例内存对齐情况紧密相关的动态判断。

从这里,可以看到,std::atomic 主要是针对简单场景下资源竞争问题的一种方案,如果对象较为复杂,这就不是 std::atomic 的目标,例如:

  • 类型自定义的拷贝构造、拷贝赋值、移动构造、移动赋值、析构函数
  • 类型包含虚函数
  • 包含引用成员

简单来讲:std::atomic 支持的类型必须是 trivially copyable(即可以通过 memcpy 正确拷贝)

#include <iostream>

struct Demo
{
	// 包含以下内容,该对象不再是简单的对象,atomic 编译会报错
	// Demo(const Demo&) {}
	// Demo(Demo&&) {}
	// Demo& operator=(const Demo&) {}
	// Demo& operator=(Demo&&) {}
	// virtual void func() {}
	// ~Demo() {}
	// int& mem_ref;
};

void test()
{
	std::atomic<Demo> demo;
}

int main()
{
	test();
	return 0;
}

4. 内存顺序

在前面的内容中,我们讲到了 std::atomic 可以保障操作的原子性,让多个线程对同一变量进行并发访问时不会出现“写一半”或“读出脏数据”的问题。

但这只是并发编程中安全性的第一层保障

在多线程程序中,另一个非常关键但常被忽视的问题是:指令重排(Instruction Reordering)

无论是编译器优化还是 CPU 执行机制,都可能会在不影响单线程语义的前提下,偷偷调整代码的执行顺序。虽然这种优化能提升执行效率,但在并发场景下却可能引发难以发现的同步错误

这时候,仅靠原子操作是不够的,我们还需要借助 std::atomic 提供的 内存顺序(memory order)语义 来约束指令的可见顺序,从而实现真正可靠的线程间同步。

下面我们通过示例来理解指令重排。

指令重排(Instruction Reordering)是指编译器或处理器为了提高性能,在不改变单线程语义的前提下,对程序指令执行顺序进行的优化重排。例如:

int a = 1;
int b = 2;

你写的代码顺序是 a = 1 然后 b = 2,但编译器或 CPU 可能会为了效率,先执行 b = 2,再执行 a = 1,只要这不会影响最终的单线程结果。

这些重排在单线程环境下不会影响程序逻辑,但在但在并发环境中,可能让代码看起来没问题,但实际上存在严重的同步漏洞。这是因为重排会破坏 “我们认为的执行顺序”,导致并发程序中产生 不可预测的结果,比如:

std::atomic<bool> ready{ false };
int data = 0;

// 线程 A
void thread_writer() 
{
    // 普通写入
    data = 42;
    // 原子标志位
    ready.store(true);
}

// 线程 B
void thread_reader()
{
    // 等待标志位为 true
    while (!ready.load());
    // 可能触发断言失败!
    assert(data == 42);
}

从代码执行来看,线程 B 看到的是 ready 被修改为 true 时,data 肯定已经先被设置为 42 了。但是,由于指令重排的存在,编译器或 CPU 可能会将线程 A 的执行顺序修改:

void thread_writer() 
{
    // 原子标志位
    ready.store(true);
    // 普通写入(重排到 ready 写操作之后)
    data = 42;
}

此时,thread_reader 会读取到 ready == true,但是 data == 0 的情况,导致触发断言。再回到 std::atomic,它只能保证操作的原子性,对于指令重排可能导致的隐藏的问题,需要通过设置内存顺序来解决。C++ 中提供了以下四种内存顺序:

  • memory_order_relaxed:不对与该原子操作的其他非原子操作之间的执行顺序做约束
  • memory_order_release:在这个原子操作之前的所有写操作,不能被重排序到这个原子操作之后。
  • memory_order_acquire:在这个原子操作之后的所有读操作,不能被重排序到这个原子操作之前。
  • memory_order_acq_rel:acquire/release 的结合体
  • memory_order_seq_cst:这是 std::atomic 默认的内存序,最强约束

A1;
A2;
atomic_var.store(x, std::memory_order_relaxed);
B1;
B2;

含义: 几乎没有顺序保证,不禁止任何重排。

重排行为:

  • A1A2 可能被重排到 store 之后,
  • B1B2 可能被重排到 store 之前,
  • AB 之间的顺序都可能被改变。

只保证该操作本身是原子的,不会被拆分或看到半写入状态,但不保证操作之间的顺序。

A1;
A2;
atomic_var.store(x, std::memory_order_release);
B1;
B2;

保证原子操作之前的代码(A1, A2)不会被重排到 store 之后,即 A1A2 一定先执行。但对 B1B2 没有约束。

A1;
A2;
atomic_var.load(std::memory_order_acquire);
B1;
B2;

保证原子操作之后的代码(B1, B2)不会被重排到 load 之前,即 B1B2 一定在 load 之后执行。但对 A1A2 没有约束。

A1;
A2;
atomic_var.fetch_add(x, std::memory_order_acq_rel); // 既有 release 又有 acquire 语义
B1;
B2;

保证 A1A2 不会被重排到 store 之后,B1B2 不会被重排到 store 之前,也就是这条 store 操作成为一个同步点,前后代码不能越界重排。

前面提到的 relaxedacquirerelease、acq_rel 主要是约束单线程内的指令重排,并不能保证多个线程对同一原子变量的操作具有统一的可见顺序。

举个例子,假设线程 A、B、C 分别以非 seq_cst 内存序写入值 100、200、300 到同一个原子变量 data,那么线程 D 可能观察到写入顺序为 100 → 200 → 300,而线程 E 可能观察到的顺序却是 100 → 300 → 200。由于原子操作的全局顺序未被统一约束,不同线程对变量历史的观察结果可能不一致,进而导致程序逻辑分歧或结果不可预测。

memory_order_seq_cst 提供了一种最强的顺序一致性保证:所有使用该内存序的原子操作必须在全局形成一个 单一的、线性化的操作顺序(全序),所有线程对这些原子操作的观察结果也必须一致,这就像所有使用 seq_cst 的操作被插入到一个“全局日志”中,所有线程看到的原子修改历史都是这本日志的统一版本。从而消除了因观察不一致带来的潜在问题。

补充一点:std::memory_order_releasestd::memory_order_acquire 分别适用于 纯 store纯 load 操作,它们不能用于读写复合操作。而 std::memory_order_acq_rel 适用于 fetch_addcompare_exchange_strongcompare_exchange_weak读-改-写(RMW)操作,用于同时保证前后代码的顺序。

std::atomic 默认使用的是 memory_order_seq_cst,对于初学和大多数业务逻辑,优先用 seq_cst,保证正确性和简单性。当性能瓶颈出现时,再结合具体场景,分析是否可以安全使用更弱的内存序,另外优化时需要非常小心,否则容易引发难以发现的并发bug。

未经允许不得转载:一亩三分地 » C++ 多线程中 std::atomic 的使用
评论 (0)

6 + 7 =