欢迎光临
我们一直在努力

C++现代并发编程实战:从std::thread到C++20信号量、闩锁与屏障完整指南

引言:为什么并发编程是现代C++的必修课

在多核处理器已经成为主流的今天,程序的性能不再仅仅依赖于单核频率的提升,而是更多地取决于能否充分利用多核资源。C++作为一种系统级编程语言,从C++11开始就在标准库中引入了完整的线程支持库,此后每个版本都在持续强化并发编程能力。C++20更是带来了信号量、闩锁、屏障等高级同步原语,让C++的并发编程工具箱更加完善。

然而,并发编程的难度不容小觑——数据竞争、死锁、活锁、虚假唤醒等问题让许多开发者望而却步。本文将从最基础的线程创建开始,逐步深入到C++20的最新同步机制,通过大量可运行的代码示例,帮助读者建立起完整的C++并发编程知识体系。

本文将涵盖以下内容:

  • std::thread 线程创建与管理基础
  • 互斥锁与锁管理(std::mutex, std::lock_guard, std::unique_lock, std::scoped_lock)
  • 条件变量与虚假唤醒
  • std::atomic 与内存序
  • C++20 信号量(std::counting_semaphore, std::binary_semaphore)
  • C++20 闩锁与屏障(std::latch, std::barrier)
  • std::jthread 与自动取消令牌
  • std::async、std::future 与 std::promise
  • 最佳实践与常见陷阱

C++并发编程概念图

1. 线程基础:std::thread 的创建与管理

1.1 创建线程的三种方式

std::thread 是C++并发编程的基石。它可以通过函数指针、函数对象、Lambda表达式三种方式创建:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <thread>
#include <iostream>

// 方式一:函数指针
void worker(int id) {
    std::cout << "线程 " << id << " 开始工作\n";
}

int main() {
    // 方式一:函数指针
    std::thread t1(worker, 1);
   
    // 方式二:函数对象(仿函数)
    struct Task {
        void operator()(int n) const {
            std::cout << "Task 处理 " << n << " 项\n";
        }
    };
    std::thread t2(Task{}, 100);
   
    // 方式三:Lambda 表达式(最推荐)
    std::thread t3([](int a, int b) {
        std::cout << "Lambda: " << a + b << "\n";
    }, 3, 4);
   
    t1.join();
    t2.join();
    t3.join();
    return 0;
}

1.2 join 与 detach 的区别

每个 std::thread 对象在销毁前必须明确选择是 join 还是 detach,否则程序会调用 std::terminate() 直接终止:

方法 行为 资源释放 适用场景
join() 阻塞直到线程执行完毕 调用后自动释放 需要等待线程结果
detach() 分离线程,让其后台运行 线程结束时自动释放 “即发即忘”的后台任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void background_task() {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    // 执行一些后台工作...
}

int main() {
    std::thread t(background_task);
    t.detach();  // 分离后,主线程不再等待t
    // 注意:detach 后不能再 join
    // if (t.joinable()) t.join(); // 检查是否可join
   
    std::thread t2(background_task);
    t2.join();  // 主线程阻塞3秒等待t2完成
}

重要提示:detach 后的线程如果访问了已销毁的局部变量,会导致未定义行为。使用 detach 前务必确保线程不再需要访问主线程的局部变量,或者使用智能指针和 shared_ptr 延长生命周期。C++20 的 std::jthread 从设计上解决了这个问题,我们稍后会讨论。

2. 互斥锁:保护共享数据

2.1 最基本的互斥锁 std::mutex

当多个线程同时读写同一份数据时,数据竞争(data race)会导致未定义行为。互斥锁(mutex)是最基本的保护机制:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx;
int counter = 0;

void increment() {
    for (int i = 0; i < 100000; ++i) {
        mtx.lock();
        ++counter;
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    t1.join();
    t2.join();
    std::cout << "Counter: " << counter << "\n";  // 正确输出 200000
    return 0;
}

但手动 lock/unlock 存在风险——如果 lock 和 unlock 之间抛出异常,锁将永远不会被释放。这就是 RAII 锁管理器存在的意义。

2.2 RAII 锁管理:std::lock_guard、std::unique_lock、std::scoped_lock

C++标准库提供了三种 RAII 风格的锁管理器,它们在构造时加锁,析构时自动解锁,从根本上杜绝了忘记解锁的问题:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <mutex>
#include <shared_mutex>  // C++17

std::mutex mtx;

// 1. lock_guard - 最简单,推荐用于大多数场景
void safe_increment() {
    std::lock_guard<std::mutex> lock(mtx);
    // 锁定期间,异常安全
    ++counter;
}  // 离开作用域自动解锁

// 2. unique_lock - 更灵活,可延迟锁定、提前解锁、转移所有权
void flexible_lock() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);  // 构造时不锁定
    // ... 做一些不需要锁的操作 ...
    lock.lock();   // 延迟锁定
    // 访问共享数据
    lock.unlock();  // 提前解锁
    // 执行不需要锁的操作
    lock.lock();   // 再次锁定
    // 再次访问
}  // 自动解锁

// 3. scoped_lock (C++17) - 支持同时锁定多个互斥锁,避免死锁
void transfer(BankAccount& from, BankAccount& to, int amount) {
    std::scoped_lock locks(from.mtx, to.mtx);  // 同时锁定两个mutex
    // 安全地操作两个账户
    from.balance -= amount;
    to.balance += amount;
}  // 同时解锁两个mutex

std::scoped_lock 是C++17引入的,它使用std::lock算法同时锁定多个互斥量,避免了多锁场景下的死锁风险。在C++17及以后,只要需要同时锁定多个互斥量,就应该优先使用 scoped_lock。

2.3 读写锁:std::shared_mutex (C++17)

对于读多写少的场景,使用独占锁(std::mutex)会严重降低并发性能。std::shared_mutex 允许多个线程同时读取,但只允许一个线程写入:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <shared_mutex>
#include <unordered_map>

class ThreadSafeCache {
    std::unordered_map<std::string, int> cache_;
    mutable std::shared_mutex rw_lock_;
   
public:
    int get(const std::string& key) const {
        std::shared_lock lock(rw_lock_);  // 共享锁,多个线程可同时读
        auto it = cache_.find(key);
        return it != cache_.end() ? it->second : -1;
    }
   
    void set(const std::string& key, int value) {
        std::unique_lock lock(rw_lock_);  // 独占锁,写时阻塞所有读
        cache_[key] = value;
    }
};

并发编程代码示例

3. 条件变量与生产者-消费者模型

3.1 条件变量的基本用法

条件变量(std::condition_variable)用于阻塞一个或多个线程,直到另一个线程修改共享变量并通知条件变量。它常与 std::unique_lock 配合使用:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <condition_variable>
#include <queue>

std::queue<int> tasks;
std::mutex q_mtx;
std::condition_variable cv;

void producer() {
    for (int i = 0; i < 10; ++i) {
        {
            std::lock_guard<std::mutex> lock(q_mtx);
            tasks.push(i);
            std::cout << "生产: " << i << "\n";
        }
        cv.notify_one();  // 通知一个消费者
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(q_mtx);
        // 使用 while 而非 if 来防止虚假唤醒
        cv.wait(lock, []{ return !tasks.empty(); });
       
        int task = tasks.front();
        tasks.pop();
        lock.unlock();  // 提前解锁,让生产者可以继续生产
       
        std::cout << "消费: " << task << "\n";
        if (task == 9) break;  // 最后一项,退出
    }
}

3.2 虚假唤醒(Spurious Wakeup)与正确用法

一个极其重要的概念:条件变量的 wait() 可能会在没有收到通知的情况下返回,这就是”虚假唤醒”。因此,永远不要在 if 中使用 wait,必须使用 while 循环检查条件:


1
2
3
4
5
6
7
8
9
10
11
// ❌ 错误:可能被虚假唤醒
cv.wait(lock);
if (queue.empty()) continue;  // 虚假唤醒会导致空队列访问

// ✅ 正确:使用谓词重载,内部自动处理虚假唤醒
cv.wait(lock, []{ return !queue.empty(); });

// ✅ 等价于上面手动写法
while (queue.empty()) {
    cv.wait(lock);  // 循环检查,虚假唤醒后重新等待
}

从C++11开始,condition_variable::wait 提供了带谓词的重载版本,内部就是使用 while 循环实现的,推荐直接使用这个版本。

4. 原子操作与内存序

4.1 std::atomic 基础

对于简单的整型计数场景,使用互斥锁过于重量级。std::atomic 提供了无锁的原子操作,性能远优于互斥锁:


1
2
3
4
5
6
7
8
9
10
11
12
#include <atomic>

std::atomic<int> atomic_counter{0};

void atomic_increment() {
    for (int i = 0; i < 100000; ++i) {
        atomic_counter.fetch_add(1, std::memory_order_relaxed);
        // 等价于:++atomic_counter;
    }
}

// 性能对比:atomic 比 mutex 快 10-50 倍

std::atomic 支持的常用操作:

操作 说明 等价于
load() 原子读取 var
store() 原子写入 var = x
exchange() 原子交换并返回旧值 std::atomic_exchange
compare_exchange_weak() CAS操作(弱版本,可能失败) 无锁编程核心
compare_exchange_strong() CAS操作(强版本) 无锁编程核心
fetch_add() / fetch_sub() 原子加减 ++ / —

4.2 内存序(Memory Order)详解

内存序是C++原子操作中最复杂也最容易被误解的部分。它控制着原子操作对其它线程的可见性:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <atomic>
#include <thread>
#include <cassert>

std::atomic<bool> ready{false};
int data = 0;

// 内存序的六种类型(从弱到强):
// 1. memory_order_relaxed  - 仅保证原子性,无顺序保证
// 2. memory_order_consume  - 数据依赖排序(C++17弃用中)
// 3. memory_order_acquire  - 保证后续读取操作在 acquire 之后
// 4. memory_order_release  - 保证之前的所有写入在 release 之前
// 5. memory_order_acq_rel  - acquire + release 组合
// 6. memory_order_seq_cst  - 顺序一致性(默认,最严格但最慢)

void writer() {
    data = 42;                                    // 普通写入
    ready.store(true, std::memory_order_release);  // release-屏障
}

void reader() {
    while (!ready.load(std::memory_order_acquire));  // acquire-屏障
    assert(data == 42);  // 保证成立!acquire 看到 release 前的所有写入
}

int main() {
    std::thread t1(writer);
    std::thread t2(reader);
    t1.join();
    t2.join();
    return 0;
}

核心原则:release 操作与 acquire 操作配对,构成 happens-before 关系。release 之前的所有写入,在另一个线程中对应的 acquire 之后都可见。对于大多数日常应用,使用默认的 memory_order_seq_cst 即可,只有在性能敏感场景下才需要优化内存序。

5. C++20 新同步原语

5.1 信号量:std::counting_semaphore 与 std::binary_semaphore

信号量是C++20引入的经典同步原语,用于限制同时访问某资源的线程数量。std::counting_semaphore 的计数可以大于1,而 std::binary_semaphore 是计数为1的特化版本:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <semaphore>  // C++20

// 限制最多3个线程同时访问数据库连接池
std::counting_semaphore<3> db_pool(3);

void query_database(int id) {
    db_pool.acquire();  // P 操作:获取信号量,计数减1
    // 如果计数为0,线程阻塞
   
    std::cout << "线程 " << id << " 正在查询数据库...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));
   
    db_pool.release();  // V 操作:释放信号量,计数加1
}

// 使用二进制信号量实现线程间通知
std::binary_semaphore signal{0};

void waiter() {
    signal.acquire();  // 等待通知
    std::cout << "收到信号,继续执行\n";
}

void notifier() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    signal.release();  // 发送通知
}

信号量的优势在于它比条件变量更轻量——不需要锁的配合,API 更简洁,出错概率更低。在需要限制并发数的场景(如连接池、线程池限流),信号量是首选。

5.2 闩锁与屏障:std::latch 与 std::barrier

闩锁(latch)是C++20引入的一次性同步屏障,计数器减到0后所有等待线程同时释放。它只能使用一次:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <latch>  // C++20

std::latch work_done(4);  // 需要4个线程到达

void worker(int id) {
    std::this_thread::sleep_for(std::chrono::milliseconds(id * 100));
    std::cout << "工人 " << id << " 完成工作\n";
    work_done.count_down();  // 计数器减1
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i)
        threads.emplace_back(worker, i);
   
    work_done.wait();  // 等待所有工人完成
    std::cout << "所有工作完成,开始合并结果\n";
   
    for (auto& t : threads) t.join();
}

屏障(barrier)与闩锁类似,但可以重复使用——每次完成一个”阶段”后自动重置计数器:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <barrier>  // C++20

std::barrier sync_point(4, []() noexcept {
    std::cout << "--- 阶段完成 ---\n";
});

void worker(int id) {
    for (int phase = 0; phase < 3; ++phase) {
        std::cout << "线程 " << id << " 到达阶段 " << phase << "\n";
        sync_point.arrive_and_wait();  // 等待所有线程到达
        // 所有线程到达后,继续下一阶段
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i)
        threads.emplace_back(worker, i);
    for (auto& t : threads) t.join();
}

std::barrier 的完成阶段回调函数在每个阶段执行一次,非常适合分治算法中每个阶段结束后需要汇总或同步的场景。

6. C++20 std::jthread —— 更好的线程管理

std::jthread 是C++20引入的”联合线程”(joining thread),它在析构时自动调用 join(),并支持可中断的停止令牌(std::stop_token):


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <thread>  // C++20 的 jthread 在此头文件

void cancellable_worker(std::stop_token stoken) {
    while (!stoken.stop_requested()) {
        std::cout << "工作中...\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
    std::cout << "收到取消请求,优雅退出\n";
}

int main() {
    std::jthread jt(cancellable_worker);
    std::this_thread::sleep_for(std::chrono::seconds(1));
   
    jt.request_stop();  // 发送停止请求
    // 析构时自动 join,无需手动调用
    return 0;
}

std::jthread 解决了 std::thread 的三大痛点:

  1. 忘记 join 导致 terminate —— jthread 析构自动 join
  2. 无法优雅停止线程 —— 内置 stop_token 机制
  3. 异常安全 —— 即使在异常堆栈展开时也能正确 join

从C++20开始,只要不涉及需要提前 detach 的场景,都应该优先使用 std::jthread 替代 std::thread。

7. 异步任务:std::async、std::future 与 std::promise

7.1 std::async —— 最简单的异步任务

对于只需要异步执行一个函数并获取返回值的场景,std::async 是最简单的方式:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <future>

int heavy_computation(int n) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return n * n;
}

int main() {
    // std::launch::async  - 强制在新线程执行
    // std::launch::deferred - 在 get() 时同步执行(延迟求值)
   
    auto future = std::async(std::launch::async, heavy_computation, 42);
   
    std::cout << "等待结果...\n";
    int result = future.get();  // 阻塞直到结果就绪
    std::cout << "结果: " << result << "\n";
   
    // 并行执行多个任务
    auto f1 = std::async(std::launch::async, heavy_computation, 10);
    auto f2 = std::async(std::launch::async, heavy_computation, 20);
    auto f3 = std::async(std::launch::async, heavy_computation, 30);
    std::cout << "并行结果: " << f1.get() + f2.get() + f3.get() << "\n";
}

7.2 std::promise 与 std::future 的配对使用

当需要更精细的控制时,可以使用 std::promise 手动设置值,通过 std::future 获取:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <future>

void task(std::promise<int> prom) {
    try {
        int result = 42;  // 计算结果
        prom.set_value(result);  // 设置结果
    } catch (...) {
        prom.set_exception(std::current_exception());  // 传递异常
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
   
    std::thread t(task, std::move(prom));  // promise 只能移动
   
    int value = fut.get();  // 获取结果,或抛出异常
    std::cout << "Promise 结果: " << value << "\n";
    t.join();
}

8. 最佳实践与常见陷阱

8.1 死锁预防

死锁是并发编程中最常见的致命问题。以下是一些预防策略:

  • 始终使用固定顺序锁定:如果多个线程需要锁定多个互斥量,必须保证所有线程以相同的顺序加锁
  • 使用 std::scoped_lock:C++17 的 scoped_lock 使用 std::lock 算法同时锁定,避免死锁
  • 避免在持有锁时调用用户代码:你不知道用户代码会做什么(可能会尝试锁定同一个锁)
  • 使用锁层次结构:为每个锁分配一个层级,只允许按层级递增顺序加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 锁层次结构示例
class Account {
    int balance_{0};
    static std::atomic<int> lock_level_;
    int level_;
public:
    Account() : level_(lock_level_.fetch_add(1)) {}
   
    void transfer(Account& to, int amount) {
        if (level_ < to.level_) {
            std::scoped_lock locks(mtx_, to.mtx_);
            // 转账
        } else {
            std::scoped_lock locks(to.mtx_, mtx_);
            // 转账
        }
    }
};

8.2 数据竞争与ThreadSanitizer

数据竞争是未定义行为,编译器优化可能会让程序产生完全出乎意料的结果。使用Clang/GCC的ThreadSanitizer(TSan)可以有效检测数据竞争:


1
2
3
4
5
# 编译时开启 TSan
g++ -fsanitize=thread -g -O1 -o myapp myapp.cpp

# 运行,TSan 会报告所有数据竞争
./myapp

8.3 性能考量

同步机制 相对开销 适用场景
std::atomic (relaxed) 极低 计数器、标志位
std::atomic (seq_cst) 一般原子操作
std::shared_mutex (读) 中低 读多写少的数据结构
std::mutex 一般互斥保护
std::counting_semaphore 限流、资源池
std::condition_variable 复杂的等待/通知模式

优化建议:从最简单的方案开始(mutex + lock_guard),仅有性能瓶颈时再优化到更精细的机制。过早优化是万恶之源。

总结

C++的并发编程支持从C++11到C++20经历了巨大的演进。从最初的 std::thread 和 std::mutex 基础组件,到C++17的 shared_mutex 和 scoped_lock,再到C++20的信号量、闩锁、屏障和 jthread,标准库为并发编程提供了越来越完善、越来越安全的工具集。

在选择并发方案时,建议遵循以下原则:

  1. 优先使用高级抽象:std::async 优于手动管理线程,std::jthread 优于 std::thread
  2. 总是使用RAII锁管理器:避免手动 lock/unlock
  3. 用 TSan 检测数据竞争:在开发阶段就开启 TSan
  4. 从简单开始,按需优化:大多数场景下 std::mutex + lock_guard 就足够了
  5. C++20 新特性值得采用:信号量、jthread、屏障等新原语让代码更简洁、更安全

并发编程的学习曲线虽然陡峭,但一旦掌握了这些核心概念和工具,你将能够编写出充分利用多核性能的高效C++程序。

【本站文章皆为原创,未经允许不得转载】:汤不热吧 » C++现代并发编程实战:从std::thread到C++20信号量、闩锁与屏障完整指南
分享到: 更多 (0)