Linux操作系统之线程(七):线程同步
目录
前言:
二、生产者消费者模型
三、调用接口
四、以阻塞队列实现的生产者消费者模型
总结:
前言:
我们之前已经讲到了线程的互斥机制,给大家引出了锁的概念,就是为了保护多线程之间访问临界区时的安全性。
今天我们将会学习线程同步的有关内容,希望对大家有所帮助。
一、线程同步与条件变量
如果只是光靠之前讲的互斥,它虽然能解决数据竞争的问题,但它无法解决以下问题:
线程间协作问题
-
某些线程可能需要等待某个条件成立(如“队列非空”或“任务完成”)才能继续执行。
-
单纯使用互斥锁,线程只能通过 轮询(Polling) 的方式检查条件(如
while (!condition) {}
),这会浪费 CPU 资源。
死锁风险:如果线程A持有锁并等待某个条件,而线程B需要修改该条件但无法获取锁,就会导致 死锁(Deadlock)。(比如A持有锁,需要等待一个信号使得自己释放锁,但此时负责发送信号的线程B无法获取锁,所以发送不了信号。线程A等不到信号,不释放锁;线程B拿不到锁,发不了信号 → 相互永久阻塞)
无法高效通知:互斥锁没有机制让线程在条件满足时 主动通知 其他线程,导致线程只能不断尝试加锁检查,效率低下。
大家可以想象一下只有一个自习室,却只有一个人能使用,钥匙在谁身上谁就能进去。
此时进去的那个人说有人进来了我就出去,但此时外面一堆人在等待,由于没有锁,就都进不去。于是双方就陷入了一个死循环。
由于互斥锁无法解决线程间的协作问题,我们需要 线程同步(Thread Synchronization) 机制,其中 条件变量(Condition Variable) 是最常用的解决方案。
什么是线程同步机制能?它指的就是让线程按照一定的顺序执行,避免无意义的轮询,允许线程在条件不满足时挂起(阻塞),并在条件满足时被唤醒。这是线程同步的核心思想,于此我们能有效的避免饥饿问题。
其中,让线程在不满足时阻塞,随后根据某种通知方式被唤醒,这个通知方式,就是条件变量。
二、生产者消费者模型
其中,生产者消费者模型是比较典型的一种依靠条件变量解决问题的经典模型,让我们一起来学习一下。
假如我们这里有一个厨师线程,有一个服务员线程,还有一个出菜窗口。
这三个东西分别代表生产者模型(厨师),消费者模型(),出菜窗口(共享区,临界区资源)。一个共享缓冲区通常有大小限制,比如我们这里的出菜窗口肯定不可能放下无限个菜。
-
如果出菜口是空的,服务员必须 不断查看(轮询) 是否有新菜,这样很浪费精力。
-
如果出菜口满了,厨师必须 不断查看 是否有空位放新菜,同样低效。
所以条件变量出现了,在这里条件变量是干什么的呢?
-
当出菜口为空时,服务员可以 去休息(阻塞),而不是一直问“菜好了吗?”。
-
等厨师做好菜后 通知(
signal
) 服务员:“菜好了,可以来取了!”
而厨师需要做什么呢?
-
当出菜口满时,厨师可以 去休息(阻塞),而不是一直问“有空位了吗?”。
-
等服务员端走菜后 通知(
signal
) 厨师:“有空位了,可以继续做菜!”
所以,
条件变量的核心思想是:“不要轮询,等通知!”
让线程在需要等待时休眠,在条件满足时被唤醒,从而高效协作。
总的来说,生产者消费者模式就是通过一个容器来解决生产者与消费者的强耦合问题(比如同步阻塞啊)。生产者与消费者之间彼此不直接通讯,而是过阻塞队列来进行通信。所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列。而消费者也不需要直接找生产者要数据,而是直接通过这个阻塞队列里取。阻塞队列就相当于一个缓冲区,平衡双方速度差异,避免互相等待。于此平衡生产者与消费者的处理能力。
这个阻塞队列就是用来给生产者消费者解耦的。
我们需要明白,生产者与生产者之间是互斥关系(多个生产者同时向共享缓冲区(如队列)写入数据时,可能导致数据覆盖或逻辑错误。)
消费者与消费者是互斥关系(多个消费者同时从缓冲区取数据时,可能重复消费或读取无效数据。)
生产者与消费者是互斥加同步关系。(生产者和消费者不能同时操作缓冲区(防止数据竞争),并且双方需要协作,通过条件变量来实现互相的唤醒)
三种关系,两个角色,一个交易场所,就被称为消费者生产者模型的321原则。
三、调用接口
系统提供了一些条件变量的接口,让我们一起来学习一下。
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
这个接口用来动态初始化一个条件变量,成功返回 0
,失败返回错误码。(这里说一下这些pthread开头的调用成功都是返回0,失败返回错误码)
-
cond
:指向要初始化的条件变量的指针。 -
attr
:条件变量属性,通常设为NULL
(使用默认属性)。
使用此函数时,通常需要像线程一样,先定义一个cond类型的变量,随后放进这个函数里初始化。
pthread_cond_t cond;pthread_cond_init(&cond, NULL); // 初始化条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
销毁一个动态初始化的条件变量,释放其资源。
cond
:指向要销毁的条件变量的指针。
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
阻塞当前线程,等待条件变量被唤醒,并自动释放关联的互斥锁。被唤醒后,线程会重新获取锁。(这样设计的原因我们之后会讲)
-
cond
:指向要等待的条件变量的指针。 -
mutex
:指向当前线程已锁定的互斥锁的指针。
int pthread_cond_signal(pthread_cond_t *cond);
唤醒至少一个正在等待该条件变量的线程(具体唤醒哪个线程由系统调度决定)。
cond
:指向要发送信号的条件变量的指针。
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒所有正在等待该条件变量的线程。、
cond
:指向要广播的条件变量的指针。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
这个类似与锁的使用,静态初始化条件变量,无需调用销毁函数。
大家不要看这些函数这么多,其实使用上与我们的锁具有类似性。
在举例之前,我需要给大家修改一下我们之前封装的Mutex类:
因为他有一些问题:
#ifndef _MUTEX_HPP_#define _MUTEX_HPP_#include namespace MutexModule{ class Mutex { public: Mutex() { pthread_mutex_init(&_mutex, nullptr); } ~Mutex() { pthread_mutex_destroy(&_mutex); } bool lock() { return pthread_mutex_lock(&_mutex) == 0; } bool unlock() { return pthread_mutex_unlock(&_mutex) == 0; } private: pthread_mutex_t _mutex;//互斥量锁 }; class LockGuard//采⽤RAII⻛格,进⾏锁管理 { public: LockGuard(Mutex &mtx):_mtx(mtx)//通过后续使用时定义一个LockGuard类型的局部变量,在局部变量的声明周期内,互斥量会被自动加锁与解锁 { _mtx.lock(); } ~LockGuard() { _mtx.unlock(); } private: Mutex &_mtx; };}#endif
这是之前的代码,可以看见我们的Mutex中没有返回内部的 _mutex的能力,所以我们这里可以给他加一个调用接口返回这个的地址。
并且,锁是不可拷贝的资源。如果允许拷贝,会导致多个 Mutex
对象共享同一个底层锁,引发未定义行为(如重复销毁或并发竞争)。
所以我们必须得禁用拷贝构造函数与拷贝赋值函数:
class Mutex { public: Mutex(const Mutex &) = delete;// 禁用拷贝构造函数 const Mutex &operator=(const Mutex &) = delete;// 禁用拷贝赋值运算符 Mutex() { pthread_mutex_init(&_mutex, nullptr); } ~Mutex() { pthread_mutex_destroy(&_mutex); } bool lock() { return pthread_mutex_lock(&_mutex) == 0; } bool unlock() { return pthread_mutex_unlock(&_mutex) == 0; } pthread_mutex_t *LockPtr() { return &_mutex; } private: pthread_mutex_t _mutex; // 互斥量锁 };
最后,我们可以写一个测试用例来使用一下我们之前学的条件变量接口:
#include #include #include #include\"Mythread.hpp\"#include\"mutex.hpp\"MutexModule::Mutex mutex;//使用我们之前封装的锁类pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//创建一个静态的条件变量int ready = 0; // 条件变量:0表示未就绪,1表示就绪// 线程1:等待条件成立void *thread_wait(void *arg) { MutexModule::LockGuard lockguard(mutex); // 加锁,临时变量销毁时会帮助我们解锁 printf(\"[Thread 1] Waiting for signal...\\n\"); while (ready == 0) { // 必须用 while 防止虚假唤醒 pthread_cond_wait(&cond, mutex.LockPtr()); // 释放锁并等待 } printf(\"[Thread 1] Received signal! ready = %d\\n\", ready); return NULL;}// 线程2:触发条件void *thread_signal(void *arg) { sleep(2); // 模拟耗时操作 MutexModule::LockGuard lockguard(mutex); // 加锁 printf(\"[Thread 2] Sending signal...\\n\"); ready = 1; // 修改条件 pthread_cond_signal(&cond); // 唤醒等待的线程 return NULL;}int main() { pthread_t t1, t2; // 创建线程1(等待条件) pthread_create(&t1, NULL, thread_wait, NULL); // 创建线程2(发送信号) pthread_create(&t2, NULL, thread_signal, NULL); // 等待线程结束 pthread_join(t1, NULL); pthread_join(t2, NULL); // 销毁条件变量(静态初始化可省略) // pthread_cond_destroy(&cond); return 0;}
由此可见,其实这些函数调用接口并没有我们想的这么难,还是要多用。
但是这里有一些问题我们必须要解释说明一下,大家请看这块代码:
MutexModule::LockGuard lockguard(mutex); // 加锁,临时变量销毁时会帮助我们解锁 while (ready == 0) { // 必须用 while 防止虚假唤醒 pthread_cond_wait(&cond, mutex.LockPtr()); // 释放锁并等待 }
这是我们线程1的等待代码。
我想问一下大家,为什么这里不能使用if来判断呢?
这是为了避免被虚假唤醒,我们在注释里也说明了这一原因。即使没有其他线程调用 signal
,pthread_cond_wait
也可能被操作系统唤醒(原因包括信号中断、系统调度等)。
我想再问问大家,整个
void *thread_wait(void *arg) { MutexModule::LockGuard lockguard(mutex); // 加锁,临时变量销毁时会帮助我们解锁 printf(\"[Thread 1] Waiting for signal...\\n\"); while (ready == 0) { // 必须用 while 防止虚假唤醒 pthread_cond_wait(&cond, mutex.LockPtr()); // 释放锁并等待 } printf(\"[Thread 1] Received signal! ready = %d\\n\", ready); return NULL;}
pthread_cond_wait
的调用必须发生在持有锁的临界区内!!
这是为什么?
pthread_cond_wait
的设计要求它必须 在持有锁的情况下调用。
如果不在临界区内调用,检查条件(如 ready == 0
)和进入等待(pthread_cond_wait
)是分离的操作,可能被其他线程打断。(比如我们现在把while变成if,就变成了之前所说的原子性的问题,导致不再具备原子性,所以会被打断。)
这段代码中,哪些地方是临界区资源?
是不是从开始加锁到解锁的全部代码,都是临界区?
所以我们肯定,pthread_cond_wait
的调用必须发生在持有锁的临界区内!!!
并且,我还想问一下大家。
为什么,pthread_cond_wait
的调用传参中还要求我们把锁的地址传进去,以此来找到锁?
试着想象,如果我们没有把锁关闭,那么我们进行阻塞(挂起)时,其他线程还能访问我们的临界区资源吗?(这里的临界区资源指的是被锁保护的数据如ready)
答案是:不能!!!!
所以,我们的等待函数必须要解锁!!!!
只有把锁传进去,我们才能在函数的内部进行解锁操作!!!
我还想问大家,如果我们的线程从阻塞中返回,继续执行代码。我想问一下大家。此时它执行的是什么代码啊?
也是我们的临界区资源啊?
如果没有锁,你就访问临界区资源,这是不是又不安全了啊!!!
所以,我们的等待函数返回时,必须要重新持有锁!!也就是进行加锁操作!!
这,也就是为什么等待函数的作用是:“阻塞当前线程,等待条件变量被唤醒,并自动释放关联的互斥锁。被唤醒后,线程会重新获取锁”的原因!!
四、以阻塞队列实现的生产者消费者模型
我们现在就来模拟实现一下我们的阻塞队列的接口,其中包含了我们对条件变量的封装,与阻塞队列的封装。以此来帮助我们更好的理解!
首先就是我们对条件变量的封装,这跟我们之前讲的类似,所以直接贴出代码大家看一下就行:
Cond.hpp:#pragma once#include #include #include \"mutex.hpp\"namespace CondModule{ using namespace MutexModule; class Cond { public: Cond() { pthread_cond_init(&_cond, nullptr); } ~Cond() { pthread_cond_destroy(&_cond); } void Wait(Mutex &mutex) { pthread_cond_wait(&_cond, mutex.LockPtr()); } void Signal() { pthread_cond_signal(&_cond); } void SignalAll() { pthread_cond_broadcast(&_cond); } private: pthread_cond_t _cond; };}
这里我们使用了之前封装的Mutex类,大家注意哦!
那么如何写我们的阻塞队列呢?
首先既然是队列,那么底层肯定是一个队列实现的,我们规定大小,通过int类型来计数判断,就能知道这个队列是满的还是空的。所以说到底,我们要实现的接口很简单,就是实现这个队列的出和进,也就是equeue与pop。
那么怎么实现呢?
首先我们要明白,这个队列的进和出是代表着生产者与消费者的临界区,因为出和进都会改变这个队列里的资源?
这些资源包括哪些呢?
我们的底层队列是不是要有?
我们互斥锁是不是要有?
我们的生产者与消费者,他们需不需要条件变量?这个条件变量需要几个呢?
需要,并且通常情况下,我们需要两个条件变量。
大家可以理解条件变量是一个人的身份证明,我有了你的,我才能知道你,你有了我的,你才能知道我。
等待的生产者线程数量,等待的消费者线程数量,这些都可以被充当为阻塞队列的变量。
所以我们就先写:
#pragma once#include #include #include #include \"mutex.hpp\"#include \"Cond.hpp\"namespace BlockQueueModule{ static const int gcap = 10; using namespace MutexModule; using namespace CondModule; template class BlockQueue { private: bool IsFull() { return _queue.size() == _cap; } bool IsEmpty() { return _queue.empty(); } public: BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0) { } void Equeue(const T &in) // 生产者 { } void Pop(T *out) // 消费者 { } ~BlockQueue() { } private: std::queue _queue; // 临界区资源 Mutex _mutex; // 互斥锁 int _cap; // bq最大容量 Cond _productor_cond; // 生产者条件变量 Cond _consumer_cond; // 消费者条件变量 int _cwait_num; // 等待的消费者数目 int _pwait_num; // 等待的生产者数目 };}
这个代码就是我们阻塞队列最基本的框架。
那么框架写出来了,我们接下来要做的事就是完善我们的进出接口:
这个进的逻辑很简单,首先我们是不是要加锁?
加了锁之后,如果我们想插入,是不是要先判断是否为满?如果满了,我们就要执行我们的生产者等待逻辑。如果没满,我们就插入,随后我们检测是否该唤醒消费者。此时我们记录的等待的消费者数量就有了。
我们可以通过条件语句来唤醒不同数量的消费者,我这里就简单一点默认只唤醒一个。
这样,一个进的逻辑就写好了。同样的,出的逻辑也是一样:
#pragma once#include #include #include #include \"mutex.hpp\"#include \"Cond.hpp\"namespace BlockQueueModule{ static const int gcap = 10; using namespace MutexModule; using namespace CondModule; template class BlockQueue { private: bool IsFull() { return _queue.size() == _cap; } bool IsEmpty() { return _queue.empty(); } public: BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0) { } void Equeue(const T &in) // 生产者 { LockGuard lockguard(_mutex); // 先判断是否满了,如果满了,就等待 while (IsFull()) { printf(\"生产者进入等待\\n\"); _pwait_num++; // 等待的生产者数量增加 _productor_cond.Wait(_mutex); _pwait_num--; printf(\"生产者退出等待\\n\"); } // 走到这里,一定是不满的,所以我们就可以执行插入 _queue.push(in); // 此时我们判断,是否有消费者还在等待,如果是的话,就可以唤醒消费者了 if (_cwait_num > 0) { printf(\"唤醒一个消费者\\n\"); _consumer_cond.Signal(); } } void Pop(T *out) // 消费者 { LockGuard lockguard(_mutex); // 先判断是否满了,如果满了,就等待 while (IsEmpty()) { printf(\"消费者进入等待\\n\"); _cwait_num++; // 等待的消费者数量增加 _consumer_cond.Wait(_mutex); _cwait_num--; printf(\"消费者退出等待\\n\"); } // 走到这里,一定是不满的,所以我们就可以执行插入 *out = _queue.front(); _queue.pop(); // 此时我们判断,是否有消费者还在等待,如果是的话,就可以唤醒消费者了 if (_pwait_num > 0) { printf(\"唤醒一个生产者\\n\"); _productor_cond.Signal(); } } ~BlockQueue() { } private: std::queue _queue; // 临界区资源 Mutex _mutex; // 互斥锁 int _cap; // bq最大容量 Cond _productor_cond; // 生产者条件变量 Cond _consumer_cond; // 消费者条件变量 int _cwait_num; // 等待的消费者数目 int _pwait_num; // 等待的生产者数目 };}
我们可以写一个测试代码:
#include #include #include #include\"Mythread.hpp\"#include\"mutex.hpp\"#include\" BlockQueue.hpp\"using namespace BlockQueueModule;void *Consumer(void *args){ BlockQueue *bq = static_cast<BlockQueue *>(args); while(true) { int data; // 1. 从bq拿到数据 bq->Pop(&data); // 2.做处理 printf(\"Consumer, 消费了一个数据: %d\\n\", data); }}void *Productor(void *args){ BlockQueue *bq = static_cast<BlockQueue *>(args); int data = 10; while (true) { sleep(2); // 1. 从外部获取数据 // data = 10; // 有数据??? // 2. 生产到bq中 bq->Equeue(data); printf(\"producter 生产了一个数据: %d\\n\", data); data++; }}int main(){ // 交易场所,不仅仅可以用来进行传递数据 // 传递任务!!!v1: 对象 v2 BlockQueue *bq = new BlockQueue(5); // 共享资源 -> 临界资源 // 单生产,单消费 pthread_t c1, p1; //,c2, , p2, p3; pthread_create(&c1, nullptr, Consumer, bq); // pthread_create(&c2, nullptr, Consumer, bq); pthread_create(&p1, nullptr, Productor, bq); // pthread_create(&p2, nullptr, Productor, bq); // pthread_create(&p3, nullptr, Productor, bq); pthread_join(c1, nullptr); // pthread_join(c2, nullptr); pthread_join(p1, nullptr); // pthread_join(p2, nullptr); // pthread_join(p3, nullptr); delete bq; return 0;}
运行结果为:
正确
总结:
我们今天完成的线程同步的有关知识,希望对大家有所帮助!!