C++中互斥锁的使用与等候演变

创建一把锁#

创建一把互斥锁:

std::mutex mtx;

如果有const成员函数也会用到这把锁,例如:

double getValue() const {
    std::lock_guard<std::mutex> lg(mtx);
    return value;
}

那么需要加上mutable关键字:

mutable std::mutex mtx;

因为上锁和解锁实际上改变了锁(比如锁内部的持有标志),如果没有mutable关键字,编译器会报错,因为你在一个const成员函数里试图修改mtx成员变量。

(如果这个类的const成员函数都不用锁,那当然就不需要mutable

创建一个条件变量#

std::condition_variable cv;

上锁#

如果不涉及条件变量、只需确保不会发生race,那么用lock_guard就可以了,见上面的getValue成员函数。这样性能最高、最简单,不需要手动释放锁,lg的生命周期结束之后(getValue函数返回之后),锁就会自动释放。

但是如果涉及到条件变量,就必须用unique_lock来上锁:

void pop() {
  std::unique_lock<std::mutex> ul(mtx);
  cv_.wait(ul, [this] { return (!Q.empty()); });
  if (!Q.empty()) {
    value = Q.front();
    Q.pop();
  }
  return;
}

线程发现不满足条件之后、进入休眠之前,会释放锁,而lock_guard不支持中途释放锁,所以必须使用unique_lock

支持条件变量与否,是unique_locklock_guard的最大区别。

解锁与唤醒#

一般来说不需要手动解锁,lock_guardunique_lock都会在生命周期结束之后,在析构的时候解锁,即RAII。条件变量休眠前的解锁也是C++底层实现的,不需要我们操心。

至于唤醒,只有涉及条件变量的地方才需要唤醒:

size_t push(const T &value) {
  bool empty = false;
  size_t size = 0;
  {
    std::lock_guard<std::mutex> lg(mtx);
    empty = Q.empty();
    Q.push(value);
    size = Q.size();
  }
  if (empty)
    cv.notify_one();
  return size;
}

这里唤醒休眠的消费者线程(也就是前面的pop函数)的条件是empty,也就是说只有在队列Q从空变成非空的那个点,我们才会去唤醒。这是为了节省系统调用的开销,cv.notify_one()底层涉及系统调用(如Linux下的futex),频繁地调用它是昂贵的。

  • 如果队列之前不为空:
    • 意味着队列里已经有数据了。此时消费者线程(pop)大概率处于活跃状态,它要么正在处理上一个数据,要么正准备去取下一个数据。当它再次尝试去取数据时,发现队列里有东西(!Q.empty()为真),它就不会进入睡眠状态,而是直接拿走数据。此时生产者线程(push)发出的notify_one信号是多余的,消费者根本没在睡,你喊它也没意义。
  • 如果队列之前为空:
    • 意味着消费者可能已经处理完了所有任务,并且因为没有数据而进入了睡眠等待状态(cv.wait)。这时候你放入了数据,必须调用notify_one把消费者叫醒,否则它会一直睡下去。

等候演变 (Wait Morphing)#

首先,我们需要知道有两个队列的存在:在并发编程的底层实现中,一个线程在等待资源时,通常会在两个不同的队列之间流转。

条件变量等待队列#

线程调用cv.wait()后,如果发现条件不满足,线程会释放互斥锁、进入睡眠,并将自己挂在条件变量维护的一个链表(或队列)中,这个队列就是条件变量等待队列。在这个队列中的睡眠线程,必须由另一个线程调用cv.notify_one()cv.notify_all()来(试图)唤醒,区别是前者只(试图)唤醒一个睡眠线程,后者(试图)唤醒全部睡眠线程。

为什么强调(试图)呢?因为notify_onenotify_all只是一个信号,至于睡眠线程是否会、什么时候会被唤醒,并不确定。后面会callback这一点。

因为我们这里只有一个线程调用pop,所以push中使用的是notify_one

但是如果有多个睡眠线程等待唤醒且条件不同,就必须用notify_all确保每个睡眠线程都有机会醒来检查自己的条件是否满足。举个例子:

// 写线程
cv.wait(ul, [this](){return Q.size() < Q_capacity;});
// 读线程
cv.wait(ul, [this](){return Q.size() > 0;});

这两个线程都想占有ul,但是条件不一样,如果我们在第三个线程中想唤醒其中某个,就必须使用notify_all。试想:缓冲区Q满了,你用notify_one想唤醒“读线程”去消耗数据,结果系统唤醒了“写线程”,“写线程”一看缓冲区是满的,条件不满足,又睡着了。结果“读线程”没被唤醒,“写线程”继续睡,也没有新的唤醒被发起,程序死锁。这就是使用 notify_one 唤醒条件不同的等待者时可能导致的问题——信号被"错误"的线程消耗掉,而真正需要被唤醒的线程永远沉睡,唤醒丢失。

其实这里的最佳实践是对于读、写线程使用不同的条件变量:

// 写线程
cv_write.wait(ul, [this](){return Q.size() < Q_capacity;});
// 读线程
cv_read.wait(ul, [this](){return Q.size() > 0;});

这样各个条件变量的等待者条件一致,适用notify_one,避免notify_all带来的惊群效应(thundering herd)——即大量线程同时被唤醒,但只有一个能拿到锁,其余线程白白消耗了上下文切换的开销。

总结一下:

场景 推荐用法
只有一个消费者线程在等待 notify_one
多个线程等待相同条件、且是消耗型任务(每次只需一个线程处理) notify_one
多个线程等待不同条件、共用同一个条件变量 notify_all(否则可能唤醒丢失导致死锁)
每次通知后所有等待线程都应该醒来处理(如广播事件) notify_all

互斥锁等待队列#

两种情况下,线程会进入这个队列:

  1. 线程直接调用mtx.lock()但发现锁被占用了
  2. 条件变量等待队列中的睡眠线程接收到notify_onenotify_all,该线程会切换到互斥锁等待队列

这么说可能有点抽象,看看cv.wait的源码:

template<typename _Predicate>
  void
  wait(unique_lock<mutex>& __lock, _Predicate __p)
  {
    while (!__p())
      wait(__lock);
  }

这里的__p就是我们传进去用于判断条件的Lambda表达式,至于wait(__lock)就涉及到Linux内核的futex了,感兴趣的话可以阅读源码,这里不再展开。简单来说,我们可以将cv.wait(ul, [this](){return Q.size() > 0;})视作:

// 1. 进入 wait 函数,此时必须持有锁
while (!([this] { return Q.size() > 0; }())) {
    // --- 这里的操作是原子(Atomic)的 ---
    // 2. 释放锁
    // 3. 线程进入条件变量等待队列,并开始睡眠
    // ------------------------------------

    // ... 等待 notify_one/all ...

    // 4. 线程进入互斥锁等待队列,继续睡眠
    // 5. 拿到锁了!线程醒来
}
// 6. 只有获得锁后、且 Lambda 返回 true 时,才会执行到这里,循环才会结束,wait 返回

只有当条件为真时,这个while循环才会结束、cv.wait才会返回。这里的一个重点是,任何一次对于Lambda条件的判断(包括刚刚进入cv.wait时的第一次条件判断),都必须是在持有锁的前提下进行的,否则会发生race。

第3步与第4步之间,当notify_one或者notify_all发生时,睡眠线程从条件变量等待队列到互斥锁等待队列的切换,现代操作系统内核不会去唤醒线程,而是直接在内核的数据结构里,把线程(的PCB指针)从条件变量队列默默移动到互斥锁队列。这个过程中线程始终在睡眠,但它等待的不再是notify_one或者notify_all,而是锁的持有权。当其他线程终于释放锁时,内核会从互斥锁队列中挑出一个线程,让它持有锁。这时如果条件判断为假,循环又会从第2步开始,线程释放锁、再度入睡……

也就是说在5之前,线程一直在睡。试想如果不是直接切换队列,而是唤醒从条件变量等待队列出来的线程,让它去试图持有锁,结果发现锁被占用了,我们又得让这个线程入睡、并把它放到互斥锁等待队列里。这就是现代操作系统内核的等候演变 (Wait Morphing)优化,它避免了(2 * n次)不必要的CPU上下文切换。