系统相关
首页 > 系统相关> > c/c++ linux后台开发 3.2.2无锁队列

c/c++ linux后台开发 3.2.2无锁队列

作者:互联网

无锁队列使用场景

至少每秒十几万读写才考虑使用,否则没有意义

循环数组无锁队列实现

这个队列结构简单,有大小上限,可以适用于多写多读的情况
接口

bool enqueue(const ELEM_T &a_data); // ⼊队列 
bool dequeue(ELEM_T &a_data); // 出队列 bool 
try_dequeue(ELEM_T &a_data); // 尝试⼊队列

成员

ELEM_T m_thequeue[Q_SIZE]; 
volatile QUEUE_INT m_count; // 队列的元素格式 
volatile QUEUE_INT m_writeIndex;//新元素⼊列时存放位置在数组中的下标 
volatile QUEUE_INT m_readIndex;//第一个有效数据
volatile QUEUE_INT m_maximumReadIndex; //指向第一个不能读取的数据(可能无效,可能正在写入)
//即[m_readIndex, m_maximumReadIndex)是可读的数据
inline QUEUE_INT countToIndex(QUEUE_INT a_count);

队列已满判断:(m_writeIndex + 1) %Q_SIZE == m_readIndex

队列为空判断:m_readIndex == m_maximumReadIndex

入队

bool enqueue(&data)
{
	do {
		curWriteIndex = m_writeIndex;
		curReadIndex = m_readIndex;
		if(ModQ(curWriteIndex + 1) == ModQ(curReadIndex)) {
			reutrn false; //队列已满
		}
	} while (!CAS(&m_writeIndex, curWriteIndex, curWriteIndex+1); //尝试获取一个curWriteIndex写入数据
	//写入数据
	m_thequeue[ModQ(curWriteIndex)] = data;
	while(!CAS(&m_maximumReadIndex, curWriteIndex, (curWriteIndex + 1))) {
		//所有写入数据的线程更新m_maximumReadIndex的顺序要和他们获取curWriteIndex的顺序一致
		sched_yield();
	}
    AtomicAdd(&m_count, 1); 
    return true; 
} 
	

出队

while(1) {
	if (queue empty)
		return false;
	data = m_thequeue[ModQ(curReadIndex)];//先读数据,再增加index
	if (CAS(&m_readIndex, curReadIndex, curReadIndex+1)) {
		AtomicSub(&m_count, 1);
		return true;
	}
}

链表队列的实现

如果一个数据一个node,则需要频繁分配内存,分配内存时库里面要加锁,所以不能实现真正的无锁并发。所以我们需要把多个数据从到一个node中,减少内存分配与释放,
并且,node中所有数据被读取后,我们不要急着释放,可以先把node暂存一下,下一次需要新node的时候再循环利用,减少不必要的开销。(队列中数据的总数短时间内大致是不变的)
以下这个队列只支持一写一读的场景,效率较高。

这个队列的实现出自ZMQ,
数据结构分两层,一层只是一个不具备线程安全的队列(yqueue),具有如下结构
在这里插入图片描述
begin, back, end的意义和stl相同
(chunk, pos)可以看成一个二维坐标,
(begin_chunk, begin_pos) 是第一个有效元素的位置,(back_chunk, back_pos)是最后一个有效元素位置,(end_chunk, end_pos)是back的后一个位置,指向第一个无效元素。

//init
        queue.push();
//enqueue
        queue.back() = value_;
        queue.push();
//dequeue
		data = queue.front();
		queue.pop();

数据结构的第二层是ypipe,为yqueue提供了线程安全的功能,使用CAS实现
ypipe内部有四个指针

//  Points to the first un-flushed item. This variable is used
    //  exclusively by writer thread.
    T *w; //指向第一个未刷新的元素,只被写线程使用

    //  Points to the first un-prefetched item. This variable is used
    //  exclusively by reader thread.
    T *r; //指向第一个还没预提取的元素,只被读线程使用

    //  Points to the first item to be flushed in the future.
    T *f; //指向下一轮要被刷新的一批元素中的第一个

    //  The single point of contention between writer and reader thread.
    //  Points past the last flushed item. If it is NULL,
    //  reader is asleep. This pointer should be always accessed using
    //  atomic operations.
    atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)

front() <= r <= c == w <= f <= back() (注意,<=只是逻辑上的,因为有mod Q的存在,实际会不同)
其中c是atomic变量,而且可能是NULL,当c是NULL时表示读端正在睡眠,需要写端唤醒

[front(), r)是可读取的元素(已经预读取的元素,只有读线程才能访问)

[r, c) 是读端可以读取,但是还未预读取的元素,读端预读取(check_read)会导致r = c

c通常情况下等于w,除非读端在睡眠时,会把c设成NULL

[w, f) (已被写入,已经commit,但还未被flush的元素)
当调用flush时,[w, f)之间的元素会被刷新,刷新之后对读端可见
flush意味着 c = w = f

[f, back()] (已被写入但还未commit的元素,也没有被刷新,这些元素只有写线程能访问,而且可以被撤销)当write(data, complete=true)时会f = back()

重点基本就这些了,具体的CAS操作可以看源码,这里略过了

参考

零声教育 3.2.2无锁队列

标签:无锁,队列,data,元素,back,c++,线程,3.2,curWriteIndex
来源: https://blog.csdn.net/jsc723/article/details/122020138