C++中互斥锁的使用与等候演变
创建一把锁#
创建一把互斥锁:
std::mutex mtx;
如果有const成员函数也会用到这把锁,例如:
double getValue() const {
std::lock_guard<std::mutex> lg(mtx);
return value;
}
那么需要加上mutable关键字:
mutable std::mutex mtx;
因为上锁和解锁实际上改变了锁(比如锁内部的持有标志),如果没有mutable关键字,编译器会报错,因为你在一个const成员函数里试图修改mtx成员变量。
(如果这个类的const成员函数都不用锁,那当然就不需要mutable)
创建一个条件变量#
std::condition_variable cv;
上锁#
如果不涉及条件变量、只需确保不会发生race,那么用lock_guard就可以了,见上面的getValue成员函数。这样性能最高、最简单,不需要手动释放锁,lg的生命周期结束之后(getValue函数返回之后),锁就会自动释放。
但是如果涉及到条件变量,就必须用unique_lock来上锁:
void pop() {
std::unique_lock<std::mutex> ul(mtx);
cv_.wait(ul, [this] { return (!Q.empty()); });
if (!Q.empty()) {
value = Q.front();
Q.pop();
}
return;
}
线程发现不满足条件之后、进入休眠之前,会释放锁,而lock_guard不支持中途释放锁,所以必须使用unique_lock。
支持条件变量与否,是unique_lock与lock_guard的最大区别。
解锁与唤醒#
一般来说不需要手动解锁,lock_guard和unique_lock都会在生命周期结束之后,在析构的时候解锁,即RAII。条件变量休眠前的解锁也是C++底层实现的,不需要我们操心。
至于唤醒,只有涉及条件变量的地方才需要唤醒:
size_t push(const T &value) {
bool empty = false;
size_t size = 0;
{
std::lock_guard<std::mutex> lg(mtx);
empty = Q.empty();
Q.push(value);
size = Q.size();
}
if (empty)
cv.notify_one();
return size;
}
这里唤醒休眠的消费者线程(也就是前面的pop函数)的条件是empty,也就是说只有在队列Q从空变成非空的那个点,我们才会去唤醒。这是为了节省系统调用的开销,cv.notify_one()底层涉及系统调用(如Linux下的futex),频繁地调用它是昂贵的。
- 如果队列之前不为空:
- 意味着队列里已经有数据了。此时消费者线程(
pop)大概率处于活跃状态,它要么正在处理上一个数据,要么正准备去取下一个数据。当它再次尝试去取数据时,发现队列里有东西(!Q.empty()为真),它就不会进入睡眠状态,而是直接拿走数据。此时生产者线程(push)发出的notify_one信号是多余的,消费者根本没在睡,你喊它也没意义。
- 意味着队列里已经有数据了。此时消费者线程(
- 如果队列之前为空:
- 意味着消费者可能已经处理完了所有任务,并且因为没有数据而进入了睡眠等待状态(
cv.wait)。这时候你放入了数据,必须调用notify_one把消费者叫醒,否则它会一直睡下去。
- 意味着消费者可能已经处理完了所有任务,并且因为没有数据而进入了睡眠等待状态(
等候演变 (Wait Morphing)#
首先,我们需要知道有两个队列的存在:在并发编程的底层实现中,一个线程在等待资源时,通常会在两个不同的队列之间流转。
条件变量等待队列#
线程调用cv.wait()后,如果发现条件不满足,线程会释放互斥锁、进入睡眠,并将自己挂在条件变量维护的一个链表(或队列)中,这个队列就是条件变量等待队列。在这个队列中的睡眠线程,必须由另一个线程调用cv.notify_one()或cv.notify_all()来(试图)唤醒,区别是前者只(试图)唤醒一个睡眠线程,后者(试图)唤醒全部睡眠线程。
为什么强调(试图)呢?因为notify_one和notify_all只是一个信号,至于睡眠线程是否会、什么时候会被唤醒,并不确定。后面会callback这一点。
因为我们这里只有一个线程调用pop,所以push中使用的是notify_one。
但是如果有多个睡眠线程等待唤醒且条件不同,就必须用notify_all确保每个睡眠线程都有机会醒来检查自己的条件是否满足。举个例子:
// 写线程
cv.wait(ul, [this](){return Q.size() < Q_capacity;});
// 读线程
cv.wait(ul, [this](){return Q.size() > 0;});
这两个线程都想占有ul,但是条件不一样,如果我们在第三个线程中想唤醒其中某个,就必须使用notify_all。试想:缓冲区Q满了,你用notify_one想唤醒“读线程”去消耗数据,结果系统唤醒了“写线程”,“写线程”一看缓冲区是满的,条件不满足,又睡着了。结果“读线程”没被唤醒,“写线程”继续睡,也没有新的唤醒被发起,程序死锁。这就是使用 notify_one 唤醒条件不同的等待者时可能导致的问题——信号被"错误"的线程消耗掉,而真正需要被唤醒的线程永远沉睡,唤醒丢失。
其实这里的最佳实践是对于读、写线程使用不同的条件变量:
// 写线程
cv_write.wait(ul, [this](){return Q.size() < Q_capacity;});
// 读线程
cv_read.wait(ul, [this](){return Q.size() > 0;});
这样各个条件变量的等待者条件一致,适用notify_one,避免notify_all带来的惊群效应(thundering herd)——即大量线程同时被唤醒,但只有一个能拿到锁,其余线程白白消耗了上下文切换的开销。
总结一下:
| 场景 | 推荐用法 |
|---|---|
| 只有一个消费者线程在等待 | notify_one |
| 多个线程等待相同条件、且是消耗型任务(每次只需一个线程处理) | notify_one |
| 多个线程等待不同条件、共用同一个条件变量 | notify_all(否则可能唤醒丢失导致死锁) |
| 每次通知后所有等待线程都应该醒来处理(如广播事件) | notify_all |
互斥锁等待队列#
两种情况下,线程会进入这个队列:
- 线程直接调用
mtx.lock()但发现锁被占用了 - 条件变量等待队列中的睡眠线程接收到
notify_one或notify_all,该线程会切换到互斥锁等待队列
这么说可能有点抽象,看看cv.wait的源码:
template<typename _Predicate>
void
wait(unique_lock<mutex>& __lock, _Predicate __p)
{
while (!__p())
wait(__lock);
}
这里的__p就是我们传进去用于判断条件的Lambda表达式,至于wait(__lock)就涉及到Linux内核的futex了,感兴趣的话可以阅读源码,这里不再展开。简单来说,我们可以将cv.wait(ul, [this](){return Q.size() > 0;})视作:
// 1. 进入 wait 函数,此时必须持有锁
while (!([this] { return Q.size() > 0; }())) {
// --- 这里的操作是原子(Atomic)的 ---
// 2. 释放锁
// 3. 线程进入条件变量等待队列,并开始睡眠
// ------------------------------------
// ... 等待 notify_one/all ...
// 4. 线程进入互斥锁等待队列,继续睡眠
// 5. 拿到锁了!线程醒来
}
// 6. 只有获得锁后、且 Lambda 返回 true 时,才会执行到这里,循环才会结束,wait 返回
只有当条件为真时,这个while循环才会结束、cv.wait才会返回。这里的一个重点是,任何一次对于Lambda条件的判断(包括刚刚进入cv.wait时的第一次条件判断),都必须是在持有锁的前提下进行的,否则会发生race。
第3步与第4步之间,当notify_one或者notify_all发生时,睡眠线程从条件变量等待队列到互斥锁等待队列的切换,现代操作系统内核不会去唤醒线程,而是直接在内核的数据结构里,把线程(的PCB指针)从条件变量队列默默移动到互斥锁队列。这个过程中线程始终在睡眠,但它等待的不再是notify_one或者notify_all,而是锁的持有权。当其他线程终于释放锁时,内核会从互斥锁队列中挑出一个线程,让它持有锁。这时如果条件判断为假,循环又会从第2步开始,线程释放锁、再度入睡……
也就是说在5之前,线程一直在睡。试想如果不是直接切换队列,而是唤醒从条件变量等待队列出来的线程,让它去试图持有锁,结果发现锁被占用了,我们又得让这个线程入睡、并把它放到互斥锁等待队列里。这就是现代操作系统内核的等候演变 (Wait Morphing)优化,它避免了(2 * n次)不必要的CPU上下文切换。