Linux-同步-条件变量接-生产者消费者模型

1. 同步

  1.作用:让多个执行流在访问临界资源的时候是合理访问的。

2. 条件变量

2.1 本质

  一个PCB等待队列,条件变量 = PCB等待队列 + 一堆接口
PCB等待队列: 当线程发现资源不可用的时候,调用条件变量接口,将自己放在PCB等待队列,等待被唤醒。

2.2 条件变量接口

条件变量的类型: pthread_cond_t

2.2.1 初始化

静态初始化:
Linux-同步-条件变量接-生产者消费者模型
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

动态初始化:
Linux-同步-条件变量接-生产者消费者模型
  int pthread_cond_t init(pthread_cond_t* cond, const pthread_condattr_t *attr);
cond: 待要初始化的“条件变量”的变量
  一般情况下,传递一个pthread_cond_t类型变量的地址
attr: 一般情况下给NULL,采用默认属性

2.2.2 等待接口

Linux-同步-条件变量接-生产者消费者模型
  int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *mutex);
cond: 条件变量
mutex: 互斥锁
作用: 如果一个执行流调用了该接口,就会将执行流对应的PCB放到参数cond的PCB等待队列当中。

2.2.3 唤醒接口

Linux-同步-条件变量接-生产者消费者模型
  int pthread_cond_signal(pthread_cond_t *cond);
作用: 通知(唤醒)PCB等待队列当中的线程,如果被通知(唤醒)的线程收到了,则从PCB等待队列当中出队操作,正常执行代码。
注意: 至少唤醒一个PCB等待队列当中的线程。

Linux-同步-条件变量接-生产者消费者模型
  int pthread_cond_broadcast(pthread_cond_t *cond);
注意: 唤醒所有PCB等待队列当中的线程

2.2.4 销毁接口

Linux-同步-条件变量接-生产者消费者模型
  int pthread_cond_destroy(pthread_cond_t *cond);

3. 生产者与消费者模型

3.1 123规则

1.一个线程安全的队列
  队列:先进先出
  线程安全:当前这个队列在被其他线程操作的时候出队和入队是保证原子性的
2.两种角色的线程
  消费者线程:从线程安全队列当中获取元素,进行处理
  生产者线程:生产元素放到线程安全的队列当中进行处理
3.三种关系
  消费者与消费者互斥,生产者和生产者互斥,消费者与消费者互斥+同步
Linux-同步-条件变量接-生产者消费者模型

3.2 作用

  1.支持忙闲不均,可以提高程序运行效率。
  2.队列提供了一个缓冲区的作用,可以缓冲待要处理的元素。

4. 面试题

4.1 条件变量的等待接口为什么需要互斥锁?

  传递互斥锁的原因是由于需要在pthread_cond_wait函数内部进行解锁,解锁之后,其他的执行流就能获得这把互斥锁,否则,如果在调用pthread_cond_wait的线程,在进行等待的时候,不释放互斥锁,那么其他线程也不会获取到互斥锁,程序就没办法继续向前了。

4.2 在调用该接口的时候,pthread_cond_wait函数的实现逻辑?

  1.放在PCB等待队列
  2.释放互斥锁
  3.等待被唤醒

4.3 如果一个线程在等待的时候,被唤醒之后,需要做什么事情?

  1.移动出PCB等待队列再进行抢锁
  2.抢互斥锁
    抢到了:pthread_cond_wait函数返回了。
    没抢到:pthread_cond_wait函数没有返回,等待抢锁。

5. 代码实现

#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h>  #define THREAD_NUM 4  int g_bowl = 1; pthread_mutex_t g_lock; //eat pthread_cond_t g_cond; //make pthread_cond_t g_make_cond;  //eat void* MyThreadEat(void* arg) {     while(1)     {         pthread_mutex_lock(&g_lock);         //判断是否能够吃         while(g_bowl == 0)         {             //等待             pthread_cond_wait(&g_cond, &g_lock);         }         printf("i eat %d, i am %pn", g_bowl, pthread_self());         g_bowl--;         pthread_mutex_unlock(&g_lock);         //通知做面         pthread_cond_signal(&g_make_cond);     }     return NULL; }  void* MyThreadMake(void* arg) {     while(1)     {         pthread_mutex_lock(&g_lock);         //判断是否能够继续做面         while(g_bowl == 1)         {             //等待             pthread_cond_wait(&g_make_cond, &g_lock);         }         printf("i make %d, i am %pn", g_bowl, pthread_self());         g_bowl++;         pthread_mutex_unlock(&g_lock);         //通知吃面         pthread_cond_signal(&g_cond);     }     return NULL; }  int main() {    pthread_mutex_init(&g_lock, NULL);    pthread_cond_init(&g_cond, NULL);    pthread_cond_init(&g_make_cond, NULL);    pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];    for(int i = 0; i < THREAD_NUM; i++)    {        int ret = pthread_create(&tid_A[i], NULL, MyThreadEat, NULL);        if(ret < 0)        {            perror("pthread_create");            exit(0);        }        ret = pthread_create(&tid_B[i], NULL, MyThreadMake, NULL);        if(ret < 0)        {            perror("pthread_create");            exit(0);        }    }    for(int i = 0; i < THREAD_NUM; i++)    {        pthread_join(tid_A[i], NULL);        pthread_join(tid_B[i], NULL);    }     pthread_mutex_destroy(&g_lock);    pthread_cond_destroy(&g_cond);    pthread_cond_destroy(&g_make_cond);     return 0; } 

  

#include <stdio.h> #include <unistd.h> #include <pthread.h>  #include <iostream> #include <queue>  using namespace std;  #define THREAD_NUM 1  class RingQueue {     public:         RingQueue()         {             //设置容量             capacity_ = 10;             //互斥锁初始化             pthread_mutex_init(&lock_, NULL);             //生产者的条件变量初始化             pthread_cond_init(&cons_cond_, NULL);             //消费者的条件变量初始化             pthread_cond_init(&prod_cond_, NULL);         }          ~RingQueue()         {             //销毁互斥锁资源             pthread_mutex_destroy(&lock_);             //销毁生产者条件变量             pthread_cond_destroy(&cons_cond_);             //销毁消费者条件变量             pthread_cond_destroy(&prod_cond_);         }          void Push(int data)         {             //加锁             pthread_mutex_lock(&lock_);             //队列中元素个数大于等于容量             //不能再继续插入,将生产线程放入PCB等待队列             while(que_.size() >= capacity_)             {                 pthread_cond_wait(&prod_cond_, &lock_);             }             //插入数据,入队             que_.push(data);             printf("i product %d, i am %pn", data, pthread_self());             //解锁             pthread_mutex_unlock(&lock_);             //唤醒PCB等待队列的消费者             pthread_cond_signal(&cons_cond_);         }          void Pop(int* data)         {             //加锁             pthread_mutex_lock(&lock_);             //判断队列中的是否还有元素             //如果没有消费者进入PCB等待队列             while(que_.empty())             {                 pthread_cond_wait(&cons_cond_, &lock_);             }             //将数据传递给出参             *data = que_.front();             printf("i consume %d, i am %pn", data, pthread_self());             //将数据出队             que_.pop();             //解锁             pthread_mutex_unlock(&lock_);             //唤醒PCB等待队列的生产者             pthread_cond_signal(&prod_cond_);         }     private:         //模板 实例化一个int类型的队列         queue<int> que_;         //设置队列大小         size_t capacity_;          //定义一个锁资源         pthread_mutex_t lock_;         //定义一个生产者的条件变量         pthread_cond_t cons_cond_;         //定义一个消费者的条件变量         pthread_cond_t prod_cond_;          std::vector<int> vec_; };  void* ConsumeStart(void* arg) {     RingQueue* rq = (RingQueue*)arg;     while(1)     {         int data;         rq->Pop(&data);     }     return NULL; }  int data = 1; pthread_mutex_t g_data_lock;  void* ProductStart(void* arg) {     RingQueue* rq = (RingQueue*)arg;     while(1)     {         //这块需要加锁的原因是data是临界资源         pthread_mutex_lock(&g_data_lock);         rq->Push(data++);         pthread_mutex_unlock(&g_data_lock);     }     return NULL; }  int main() {     pthread_mutex_init(&g_data_lock, NULL);     RingQueue* rq = new RingQueue();     if(rq == NULL)     {         return 0;     }      pthread_t cons[THREAD_NUM];     pthread_t prod[THREAD_NUM];      for(int i = 0; i < THREAD_NUM; i++)     {         int ret = pthread_create(&cons[i], NULL, ConsumeStart, (void*)rq);         if(ret < 0)         {             perror("pthread_create fail");             return 0;         }          ret = pthread_create(&prod[i], NULL, ProductStart, (void*)rq);         if(ret < 0)         {             perror("pthread_create fail");             return 0;         }     }      for(int i = 0; i < THREAD_NUM; i++)     {         pthread_join(cons[i], NULL);         pthread_join(prod[i], NULL);     }      delete rq;     pthread_mutex_destroy(&g_data_lock);      return 0; } 

版权声明:玥玥 发表于 2021-05-15 0:17:28。
转载请注明:Linux-同步-条件变量接-生产者消费者模型 | 女黑客导航