linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

1.线程安区

概念:线程安区描述的是在线程中对于一个临界资源的操作访问是否安全的。
如何实现线程安区:同步与互斥。

1.1互斥实现–>互斥锁

线程互斥:指的是在多个线程间对临界资源进行争抢访问时有可能会造成数据二义,因此通过保证同一时间只有一个线程能够访问临界资源的方式实现线程对临界资源的访问安全性
如何实现:互斥锁。

互斥锁---->本质上是一个只有0/1的计数器,标记临界资源的两种访问状态,在访问之前加锁,这样别的线程再要访问时会先判断发现不满足访问条件则阻塞,访问完资源之后进行解锁。
需要注意的是:互斥锁本身就是一种临界资源,需要多个线程共同访问一个互斥锁才能实现互斥,而互斥锁本身的计数操作应该为原子操作。

互斥锁的实现流程和接口

  1. 定义互斥锁变量:
pthread_mutex_t mutex; //定义一个互斥锁mutex 
  1. 初始化互斥锁:
    法一:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    或者调用函数
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t * mutex,               const pthread_mutexattr_t *attr); 

参数:mutex为互斥锁,attr传NULL即可。

  1. 访问临界资源前加锁:
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); 

pthread_mutex_lock();阻塞加锁,锁被别人加了自己则会一直等待直到能加锁。
pthread_mutex_trylock();非阻塞加锁,如果已经被加锁了则会报错。

  1. 访问完毕资源之后解锁:
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
  1. 销毁互斥锁:
int pthread_mutex_destroy(pthread_mutex_t *mutex); 

注意事项:

  1. 加锁之后,在任意可能退出线程之前的位置都应该解锁。
  2. 加锁尽量只保护临界资源的操作。

1.1.2 死锁

死锁指的是程序流程无法继续运行,卡死的状态。
死锁是如何产生的呢?有四个必要条件:

  1. 互斥条件—>线程X加了锁,别的线程就不能再加了。
  2. 不可剥夺条件---->线程X加的锁只有线程X能解锁,别的进程无法解锁。
  3. 请求与保持条件----->线程X加了锁A,请求加锁B,请求不到就不释放锁A;
  4. 环路等待条件---->线程X加了锁A,请求加锁B,但线程Y加了锁B,请求加锁A。配合上第三个条件就会产生这样的效果:线程X加不到锁B还不释放锁A,线程Y加不到锁A还不释放锁B。程序卡死。

以上这四个条件加起来就会产生死锁。
那么该如何预防死锁的产生? 只要破坏上面的四个必要条件即可,但是1和2条件是锁自带的属性无法破坏,可以考虑破坏3或4条件。

  1. 保存多方加锁顺序一致可避免环路等待条件的产生。
  2. 采用非阻塞加锁,枷锁失败则释放已有的锁。
  3. 避免产生死锁的算法:银行家算法等

1.1.3 练习-黄牛抢票

火车站有1000张票;有四个黄牛党抢票–有票则抢,没票退出(对抢票的过程加锁,同一时间只能有一个黄牛抢到票)

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <utime.h>  int tickets=1000; pthread_mutex_t mutex;  //定义互斥锁(公共资源),都能访问到  void* scalpers(void *arg ){ //进程入口函数   while(1){     pthread_mutex_lock(&mutex);  //抢票前加锁,保证抢票过程中不会有别的黄牛来抢票(原子操作)     if(tickets>0){       usleep(5);       tickets--;       printf("抢到一张票%p,剩余总票数为:%dn",pthread_self(),tickets);       pthread_mutex_unlock(&mutex);//解锁     }     else{       pthread_mutex_unlock(&mutex);  //可能退出的地方都要解锁       printf("票已经无了!n");       pthread_exit(NULL);  //无票则退出线程。     }     usleep(10);   }   return NULL;  }   int main() {   int ret ;   pthread_t tid[4];   pthread_mutex_init(&mutex,NULL);  //初始化互斥锁   for(int i=0;i<4;i++){   //创建四个黄牛进程,进程入口函数(抢票)相同     ret = pthread_create(&tid[i],NULL,scalpers,NULL);     if(ret!=0){       perror("pthread_creat errorn");       return -1;     }   }    for(int i=0;i<4;i++){  //线程等待     pthread_join(tid[i],NULL);   }    pthread_mutex_destroy(&mutex);  //摧毁互斥锁   return 0; }  
linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

1.2同步实现–>条件变量

通过条件变量可实现进程的同步,条件变量提供了一个pcb等待队列(临界资源,需要加锁保护),和使线程阻塞和唤醒的函数接口。
程序员在程序中进行判断当前进程获取资源是否合理,不合理则调用阻塞接口使线程阻塞(加入pcb等待队列)。其他线程当条件满足之后调用唤醒函数唤醒阻塞的线程。
条件变量一般是和互斥锁一起使用的。

操作流程及接口:

  1. 定义条件变量
pthread_cond_t cond;     //定义一个名为cond的条件变量 
  1. 初始化条件变量

法一(记不住,不推荐):

pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 

法二:调用函数

#include <pthread.h> int pthread_cond_init(pthread_cond_t *restrict cond,               const pthread_condattr_t *restrict attr);        

参数:
restrict cond:定义的条件变量
restrict attr:条件变量的属性,传NULL即可。

  1. 阻塞线程(获取资源条件不满足时)

法一:

int pthread_cond_wait(pthread_cond_t *restrict cond,               pthread_mutex_t *restrict mutex); 

参数:
restrict cond:定义的条件变量
restrict mutex:互斥锁
功能:永久阻塞线程,直到被唤醒。
这里的阻塞操作可分解为{解锁,休眠}和被唤醒后加锁这三个操作,这个函数保证了{解锁,休眠}这两个操作为原子操作。举个反例:线程A运行到解锁这里还未休眠被线程B的操作打断,线程B操作了一通之后唤醒线程A,但是线程A不在pcb等待队列,时间片切换到线程A执行下一步操作休眠,休眠之后,不会再有线程B再次唤醒线程A,线程A永久休眠。

法二:

 int pthread_cond_timedwait(pthread_cond_t *restrict cond,               pthread_mutex_t *restrict mutex,               const struct timespec *restrict abstime); 

参数restrict abstime:时间
功能:阻塞线程一段时间,这段时间内若线程还未被唤醒,则报错返回。

  1. 唤醒被阻塞的线程

法一:

int pthread_cond_broadcast(pthread_cond_t *cond); 

功能:唤醒所有被阻塞的线程

法二:

   int pthread_cond_signal(pthread_cond_t *cond)

功能:唤醒**至少一个(也可能多个)**被阻塞的线程

  1. 摧毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond); 

注意事项:

  1. 程序员在对线程获取资源条件是否满足的判断需要用循环(while)来判断。
  2. 有多种角色线程时,需要使用多个条件变量(使用多个等待队列分开等待,分开唤醒,避免唤醒错了)。多个角色线程如果使用同一个条件变量(一个等待队列),由于唤醒操作是唤醒队列上至少一个线程,可能唤醒错了线程。

1.3 信号量(posix标准)

通过信号量也可实现同步和互斥。
信号量本质:计数器+ pcb等待队列
作用:实现进程或线程间的同步和互斥。

操作:

P操作:计数-1,判读计数是否大于1,不大于则阻塞进程或线程
V操作:计数+1,唤醒一个阻塞的线程或进程。

实现同步:通过计数器对资源进行计数。获取资源前先访问信号量进行P操作,产生资源之后进行V操作。
互斥的实现:访问资源前进行P操作,访问后V操作.

使用流程及接口:

  1. 定义信号量:
 sem_t sem;  //定义一个信号量,后面传参用 
  1. 初始化信号:
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 

参数:
sem:信号量
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值。(有多少资源设置为多少)

  1. P操作:
#include <semaphore.h> int sem_wait(sem_t *sem); 

功能:等待信号量,会将信号量的值减1,小于0则阻塞。

 int sem_trywait(sem_t *sem); 

功能: 非阻塞等待信号量,信号量的值减1。

 int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); 

功能:限制等待时长的阻塞塞等待信号量,信号量的值减1。

  1. V操作
int sem_post(sem_t *sem); 

功能:发送信号,表示资源使用完毕,可以归还资源了。将信号量值加1。

  1. 销毁信号量
int sem_destroy(sem_t *sem); 

2 生产者与消费者模型及实现

这是一种典型的设计模式,应用于大量数据的产生和处理的场景。

linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

2.1 用条件变量和互斥锁实现生产者与消费者模型

分析:生产者与消费者就是两种角色的线程,需要将产生的数据存入到一个缓冲区,且对这块缓冲区的存入和取数据必须为安全的操作–>原子操作(同步和互斥实现),针对这块缓冲区的数据操作应满足先入先出(队列)。故需要设计一个队列,对这个队列的存入和取数据进行加锁保护(互斥),并判断存入和取出数据是否合理(同步),不合理则阻塞并唤醒另一个进程进行操作。

代码:

#include <iostream> #include <unistd.h> #include <pthread.h> #include <queue> #include <stdio.h>  #define  MAX_QUEUE 10  //队列最大容量   class BlockQueue{   public:     BlockQueue(int que_capacity=MAX_QUEUE):_capacity(que_capacity)     {         pthread_mutex_init(&_mutex,NULL);       pthread_cond_init(&_cond_p,NULL);       pthread_cond_init(&_cond_c,NULL);     }      ~BlockQueue(){       pthread_mutex_destroy(&_mutex);       pthread_cond_destroy(&_cond_p);       pthread_cond_destroy(&_cond_c);     }      bool push(const int& data){       pthread_mutex_lock(&_mutex);  //加锁       if(_queue.size()==_capacity){         pthread_cond_wait(&_cond_p,&_mutex);//队列已满则等待条件满足再放数据到队列       }       _queue.push(data);       printf("productor %p push data: %d queue-size: %d n",pthread_self(),data,_queue.size());        //这里打印一下线程ID和队列大小,方便查看结果。       pthread_cond_signal(&_cond_c); // 唤醒       pthread_mutex_unlock(&_mutex);  //解锁       return true;     }      bool pop(int *data){       pthread_mutex_lock(&_mutex);       if(_queue.empty()){         pthread_cond_wait(&_cond_c,&_mutex);//无数据则等待       }       *data = _queue.front();       _queue.pop();       printf("customer %p pop data: %d  queue-size: %d :n",pthread_self(),*data,_queue.size());         pthread_cond_signal(&_cond_p);       pthread_mutex_unlock(&_mutex);       return true;     }    private:     int _capacity;     std::queue<int> _queue;       pthread_cond_t _cond_p; //生产者条件变量     pthread_cond_t _cond_c;  //消费者条件变量     pthread_mutex_t _mutex;  //互斥锁 };    void* productor(void* arg){     BlockQueue *q = (BlockQueue*)arg;   int data=1;   while(1){     q->push(data++);    usleep(1000000);   }   return NULL; }  void* customer(void* arg){   BlockQueue *q = (BlockQueue*)arg;   while(1){    int data=0;//获取pop的数据     q->pop(&data);     usleep(1000000);   }   return NULL; }  int  main () {     BlockQueue q;   int ret ;   pthread_t ptid[4],ctid[4];   for(int i=0;i<4;i++){ //这里创建了四个priductor线程     ret=pthread_create(&ptid[i],NULL,productor,&q);     if(ret!=0){       perror("pthread_create errorn");       return -1;     }   }    for(int i=0;i<4;i++){//创建四个costomer线程     ret=pthread_create(&ctid[i],NULL,customer,&q);     if(ret!=0){       perror("pthread_create errorn");       return -1;     }   }    for(int i=0;i<4;i++){     pthread_join(ctid[i],NULL);     pthread_join(ptid[i],NULL);   }    return 0; } 
linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现
linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

2.2 用信号量实现生产者与消费者模型

分析:
队列使用环形队列(线程安区的)。
还应该使用两个信号量:
一个是对空闲节点计数---->生产者放数据。
对于生产者来说空闲节点是资源。
操作流程:

  1. 对空闲节点P操作(如果计数小于0则阻塞,空闲节点-1),
  2. P之后,添加数据前加锁保护添加过程,(P之前加锁会造成死锁)
  3. 向队列添加数据
  4. 添加完解锁
  5. 对数据节点数据进行V操作(多了一个数据节点)

另一个是对数据节点进行计数-> 消费者取数据。
对于消费者来说数据是资源。
操作流程:

  1. 对数据节点进行P操作
  2. 加锁
  3. 数据出队
  4. 解锁
  5. 对空闲节点进行V操作

其中环形队列可用数组取模来模拟:
linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

这是应该定义的成员变量:

Class RingQueue { 	int _capacity;//容量 	int _step_read;//读指针 	int _step_write;//写指针 	std::vector=tor<int> _arry; 	sem_t  _sem_data;//数据节点计数 	sem_t  _sem_idle;//空闲节点计数 	sem_t  _sem_lock;//用于实现互斥 }   

代码实现:

#include <cstdio> #include <unistd.h> #include <vector> #include <string> #include <semaphore.h> #include <pthread.h>  #define  MAX_QUEUE 5  class RingQueue {   public:     RingQueue(int cap = MAX_QUEUE):        _capacity(cap) , _step_read(0),_step_write(0),_arry(cap)   {      sem_init(&_sem_data,0,0);     sem_init(&_sem_idle,0, cap);     sem_init(&_sem_lock,0 ,1);//用于实现互斥,初始信号量为1   }     ~RingQueue(){       sem_destroy(&_sem_data);       sem_destroy(&_sem_lock);       sem_destroy(&_sem_idle);     }      bool Push(const int &data){       sem_wait(&_sem_idle); //对空闲节点P操作,空闲节点-1, 小于0则阻塞       sem_wait(&_sem_lock);//加锁       //放数据       _arry[_step_write] =data;       _step_write=(_step_write+1)% _capacity;//写指针加一取模       sem_post(&_sem_lock); //放完数据解锁,唤醒一个消费者       sem_post(&_sem_data); //对数据节点V操作,数据节点+1       return true;     }     bool Pop(int *buf){       sem_wait(&_sem_data); //数据节点 -1           sem_wait(&_sem_lock); //加锁       *buf= _arry[_step_read];//取数据       _step_read =(_step_read+1)%_capacity;//读指针+1取模       sem_post(&_sem_lock);//解锁       sem_post(&_sem_idle);//空闲节点+1       return true;     }    private:     int _capacity;//容量     int _step_read;//读指针     int _step_write;//写指针     std::vector<int> _arry;     sem_t  _sem_data;//数据节点计数     sem_t  _sem_idle;//空闲节点计数     sem_t  _sem_lock;//用于实现互斥 };  void* productor(void *arg) {   RingQueue*q =(RingQueue*)arg;   int data =0;   while(1){     q->Push(data);     printf("push data : %d n",data++);     sleep(1);   }   return NULL; } void* customer(void *arg) {   RingQueue*q =(RingQueue*)arg;   int data =0;   while(1){     q->Pop(&data);     printf("pop data : %d n", data);     sleep(1);   }   return NULL; }  int main () {   RingQueue q ;   int ret;   pthread_t tid_p[4],tid_c[4];   for(int i=0;i<4;++i){     ret = pthread_create(&tid_p[i],NULL,productor,&q);     if(ret !=0){       perror("thread errorn");     }   }    for(int i=0;i<4;++i){     ret = pthread_create(&tid_c[i],NULL,customer,&q);     if(ret !=0){       perror("thread errorn");     }   }    for(int i=0;i<4;++i){     pthread_join(tid_c[i],NULL);     pthread_join(tid_p[i],NULL);   }    return 0; } 

结果:
linux多线程(2)----线程安全(互斥锁,条件变量,信号量)+生产者与消费者模型实现

如图结果可能会出现连续pop或者push数据,原因是线程入口函数void* productor(void arg)和void customer(void *arg)函数里的和Pop或者push操作和之后的打印操作并非原子操作,可能出现push数据之后还没打印就切换到另一个线程执行pop操作了,下一次push操作时就会多打印一次push data。

条件变量和互斥锁的区别

通过两种不同的生产者与消费者模型的实现可以看出条件变量和互斥锁的区别:

  1. 条件变量要搭配互斥锁使用;信号量的使用不需要搭配互斥锁使用。
  2. 条件变量中获取资源条件是否满足需要程序员自己判断(whilke()循环判断);而信号量的判断通过自身完成,计数操作为原子操作。

为保证并发情况下线程对临界资源的合理安区的访问,除了互斥锁还有很多其他的锁:悲观锁(PCC),乐观锁(CAS锁),可重入锁和不可重入锁,读写锁等;下一篇将介绍这几种锁和实现。