编程语言
首页 > 编程语言> > 546-C++线程间的同步通信(生产者-消费者模型)

546-C++线程间的同步通信(生产者-消费者模型)

作者:互联网

多线程编程两个问题

1.线程间的互斥
多线程运行时存在竟态条件 =》 临界区代码段 =》 保证其原子操作 =》添加互斥锁mutex
轻量级的无锁实现CAS

在linux中执行 strace ./a.out mutex命令跟踪程序
=>发现底层是用 pthread_mutex_t

2.线程间的同步通信
(线程间不通信的话,每个线程受CPU的调度,没有任何执行上的顺序可言,线程1和线程2是根据CPU调度算法来的,两个线程都有可能先运行,是不确定的,线程间的运行顺序是不确定的,所以多线程程序出问题,难以复现,因为谁也不知道当时线程执行的先后顺序,我们一般可以得到每个线程的线程栈信息来分析是否发生死锁的问题之类的。我们要保证线程间的运行顺序)
通信就是:线程1和线程2一起运行,线程2要做的事情必须先依赖于线程1完成部分的事情,然后告诉线程2这部分东西做好了,线程2就可以继续向下执行了。或者是线程1接下来要做某些操作,这些操作需要线程2把另外一部分事情做完,然后通知一下线程1它做完了,然后线程1才能做这些操作)
生产者,消费者线程模型

生产者-消费者线程模型

生产者生产一个物品,通知消费者消费一个物品;消费者消费完了,消费者再通知生产者继续生产物品
在这里插入图片描述

#include <iostream>
#include <thread>//多线程的头文件 
#include <mutex>//互斥锁的头文件 
#include <condition_variable>//条件变量的头文件 
#include <queue>//C++ STL所有的容器都不是线程安全
using namespace std;

std::mutex mtx;//定义互斥锁,做线程间的互斥操作
std::condition_variable cv;//定义条件变量,做线程间的同步通信操作

//生产者生产一个物品,通知消费者消费一个;消费完了,消费者再通知生产者继续生产物品
class Queue
{
public:
	void put(int val)//生产物品
	{
		unique_lock<std::mutex> lck(mtx);//unique_ptr
		while (!que.empty())
		{
			//que不为空,生产者应该通知消费者去消费,消费者消费完了,生产者再继续生产
			//生产者线程进入#1等待状态,并且#2把mtx互斥锁释放掉
			cv.wait(lck);//当前线程挂起,处于等待状态,并且释放当前锁 lck.lock()  lck.unlock
		}
		que.push(val);
		/* 
		notify_one:通知唤醒另外的一个线程的 
		notify_all:通知唤醒其它所有线程的
		通知其它所有的线程,我生产了一个物品,你们赶紧消费吧
		其它线程得到该通知,就会从等待状态 =》 到阻塞状态 =》 但是要获取互斥锁才能继续向下执行
		*/
		cv.notify_all(); 
		cout << "生产者 生产:" << val << "号物品" << endl;
	}
	int get()//消费物品
	{
		//lock_guard<std::mutex> guard(mtx);//相当于scoped_ptr
		unique_lock<std::mutex> lck(mtx);//相当于unique_ptr 更安全 
		while (que.empty())
		{
			//消费者线程发现que是空的,通知生产者线程先生产物品
			//#1 挂起,进入等待状态 #2 把互斥锁mutex释放
			cv.wait(lck);
		}//如果其他线程执行notify了,当前线程就会从等待状态 =》到阻塞状态 =》但是要获取互斥锁才能继续向下执行 
		int val = que.front();
		que.pop();
		cv.notify_all();//通知其它线程我消费完了,赶紧生产吧
		cout << "消费者 消费:" << val << "号物品" << endl;
		return val;
	}
private:
	queue<int> que;
};

void producer(Queue *que)//生产者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->put(i);
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
void consumer(Queue *que)//消费者线程
{
	for (int i = 1; i <= 10; ++i)
	{
		que->get();
		std::this_thread::sleep_for(std::chrono::milliseconds(100));//睡眠100毫秒 
	}
}
int main()
{
	Queue que;//两个线程共享的队列

	std::thread t1(producer, &que);//开启生产者线程 
	std::thread t2(consumer, &que);//开启消费者线程 

	t1.join();
	t2.join();

	return 0;
}

在这里插入图片描述

标签:std,线程,消费者,生产者,C++,互斥,546,que
来源: https://blog.csdn.net/LINZEYU666/article/details/120242144