c/c++ linux后台开发 3.2.2无锁队列
作者:互联网
无锁队列使用场景
至少每秒十几万读写才考虑使用,否则没有意义
循环数组无锁队列实现
这个队列结构简单,有大小上限,可以适用于多写多读的情况
接口
bool enqueue(const ELEM_T &a_data); // ⼊队列
bool dequeue(ELEM_T &a_data); // 出队列 bool
try_dequeue(ELEM_T &a_data); // 尝试⼊队列
成员
ELEM_T m_thequeue[Q_SIZE];
volatile QUEUE_INT m_count; // 队列的元素格式
volatile QUEUE_INT m_writeIndex;//新元素⼊列时存放位置在数组中的下标
volatile QUEUE_INT m_readIndex;//第一个有效数据
volatile QUEUE_INT m_maximumReadIndex; //指向第一个不能读取的数据(可能无效,可能正在写入)
//即[m_readIndex, m_maximumReadIndex)是可读的数据
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
队列已满判断:(m_writeIndex + 1) %Q_SIZE == m_readIndex
队列为空判断:m_readIndex == m_maximumReadIndex
入队
bool enqueue(&data)
{
do {
curWriteIndex = m_writeIndex;
curReadIndex = m_readIndex;
if(ModQ(curWriteIndex + 1) == ModQ(curReadIndex)) {
reutrn false; //队列已满
}
} while (!CAS(&m_writeIndex, curWriteIndex, curWriteIndex+1); //尝试获取一个curWriteIndex写入数据
//写入数据
m_thequeue[ModQ(curWriteIndex)] = data;
while(!CAS(&m_maximumReadIndex, curWriteIndex, (curWriteIndex + 1))) {
//所有写入数据的线程更新m_maximumReadIndex的顺序要和他们获取curWriteIndex的顺序一致
sched_yield();
}
AtomicAdd(&m_count, 1);
return true;
}
出队
while(1) {
if (queue empty)
return false;
data = m_thequeue[ModQ(curReadIndex)];//先读数据,再增加index
if (CAS(&m_readIndex, curReadIndex, curReadIndex+1)) {
AtomicSub(&m_count, 1);
return true;
}
}
链表队列的实现
如果一个数据一个node,则需要频繁分配内存,分配内存时库里面要加锁,所以不能实现真正的无锁并发。所以我们需要把多个数据从到一个node中,减少内存分配与释放,
并且,node中所有数据被读取后,我们不要急着释放,可以先把node暂存一下,下一次需要新node的时候再循环利用,减少不必要的开销。(队列中数据的总数短时间内大致是不变的)
以下这个队列只支持一写一读的场景,效率较高。
这个队列的实现出自ZMQ,
数据结构分两层,一层只是一个不具备线程安全的队列(yqueue),具有如下结构
begin, back, end的意义和stl相同
(chunk, pos)可以看成一个二维坐标,
(begin_chunk, begin_pos) 是第一个有效元素的位置,(back_chunk, back_pos)是最后一个有效元素位置,(end_chunk, end_pos)是back的后一个位置,指向第一个无效元素。
//init
queue.push();
//enqueue
queue.back() = value_;
queue.push();
//dequeue
data = queue.front();
queue.pop();
数据结构的第二层是ypipe,为yqueue提供了线程安全的功能,使用CAS实现
ypipe内部有四个指针
// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w; //指向第一个未刷新的元素,只被写线程使用
// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r; //指向第一个还没预提取的元素,只被读线程使用
// Points to the first item to be flushed in the future.
T *f; //指向下一轮要被刷新的一批元素中的第一个
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
front() <= r <= c == w <= f <= back() (注意,<=只是逻辑上的,因为有mod Q的存在,实际会不同)
其中c是atomic变量,而且可能是NULL,当c是NULL时表示读端正在睡眠,需要写端唤醒
[front(), r)是可读取的元素(已经预读取的元素,只有读线程才能访问)
[r, c) 是读端可以读取,但是还未预读取的元素,读端预读取(check_read)会导致r = c
c通常情况下等于w,除非读端在睡眠时,会把c设成NULL
[w, f) (已被写入,已经commit,但还未被flush的元素)
当调用flush时,[w, f)之间的元素会被刷新,刷新之后对读端可见
flush意味着 c = w = f
[f, back()] (已被写入但还未commit的元素,也没有被刷新,这些元素只有写线程能访问,而且可以被撤销)当write(data, complete=true)时会f = back()
重点基本就这些了,具体的CAS操作可以看源码,这里略过了
参考
零声教育 3.2.2无锁队列
标签:无锁,队列,data,元素,back,c++,线程,3.2,curWriteIndex 来源: https://blog.csdn.net/jsc723/article/details/122020138