> 文档中心 > 无锁队列的实现

无锁队列的实现

推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

为什么需要无锁队列

无锁队列解决了什么问题?无锁队列解决了锁引起的问题。

cache失效

当CPU要访问主存的时候,这些数据首先要被copy到cache中,因为这些数据在不久的将来可能又会被处理器访问;CPU访问cache的速度要比访问内存要快的多;由于线程频繁切换,会造成cache失效,将导致应用程序性能下降。

阻塞引起的CPU浪费

mutex是阻塞的,在一个负载较重的应用程序中使用阻塞队列来在线程之间传递消息,会导致频繁的线程切换,大量的时间将被浪费在获取mutex,而不是处理任务上。

这就需要非阻塞来解决问题。任务之间不争抢任何资源,在队列中预定一个位置,然后在这个位置上插入或提取数据。这种机制使用了cas(compare and swap)的操作,它是一个原子操作,需要CPU指令支持。它的思想是先比较,再赋值。具体操作如下:它需要3个操作数,m,A, B,其中m是一个内存地址,将m指向的内存中的数据与A比较,如果相等则将B写入到m指向的内存并返回true,如果不相等则什么也不做返回false。

cas语义如下

Compare And Swapif (a == b) {    a = c;}cmpxchg(a, b, c)bool CAS( int * pAddr, int nExpected, int nNew )atomically {    if ( *pAddr == nExpected ) { *pAddr = nNew ; return true ;    }    return false ;}

内存的频繁申请和释放

当一个任务从堆中分配内存时,标准的内存分配机制会阻塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)。malloc本身也是加锁的,保证线程安全。这样也会造成线程之间的竞争。标准队列插入数据的时候,都回导致堆上的动态内存分配,会导致应用程序性能下降。

小结

  • cache失效

  • 阻塞引起的CPU浪费

  • 内存的频繁申请和释放

这3个问题,本质上都是由于线程切换带来的问题。无锁队列就是从这几个方面解决问题。

无锁队列使用场景

无锁队列适用于队列push、pop非常频繁的场景,效率要比mutex高很多; 比如,股票行情,1秒钟至少几十万数据量。

无锁队列一般也会结合mutex + condition使用,如果数据量很小,比如一秒钟几百个、几千个消息,那就会有很多时间是没有消息需要处理的,消费线程就会休眠,等待唤醒;所以对于消息量很小的情况,无锁队列的吞吐量并不会有很大的提升,没有必要使用无锁队列。

无锁队列的实现,主要分为两类:

  1. 链表实现;
  2. 数组实现。

链表实现有一个问题,就是会频繁的从堆上申请内存,所以效率也不会很高。

对于一写一读场景下,各种消息队列的测试结果对比:

无锁队列的实现

zmq无锁队列的实现原理

zmq中实现了一个无锁队列,这个无锁队列只支持单写单读的场景。zmq的无锁队列是十分高效的,号称全世界最快的无锁队列。它的设计是非常优秀的,有很多设计是值得借鉴的。我们可以直接把它用到项目中去,zmq只用了不到600行代码就实现了无锁队列。

zmq的无锁队列主要由yqueue和ypipe组成。yqueue负责队列的数据组织,ypipe负责队列的操作。

原子操作函数

无锁队列的实现,一定是基于原子操作的。

zmq无锁队列使用如下原子操作函数

//  This class encapsulates several atomic operations on pointers.template <typename T> class atomic_ptr_t{public:    inline void set (T *ptr_); //非原子操作     inline T *xchg (T *val_);    //原子操做,设置新值,返回旧值    inline T *cas (T *cmp_, T *val_)//原子操作private:    volatile T *ptr;}
  • set函数,把私有成员ptr指针设置成参数ptr_的值,不是一个原子操作,需要使用者确保执行set过程没有其他线程使用ptr的值。

  • xchg函数,把私有成员ptr指针设置成参数val_的值,并返回ptr设置之前的值。原子操作,线程安全。

  • cas函数,原子操作,线程安全,把私有成员ptr指针与参数cmp_指针比较:

    • 如果相等返回ptr设置之前的值,并把ptr更新为参数val_的值;
    • 如果不相等直接返回ptr值。

chunk机制

每次分配可以存放N个元素的大块内存,减少内存的分配和释放。N值还有元素的类型,是可以根据自己的需要进行设置的。N不能太小,如果太小,就退化成链表方式了,就会有内存频分的申请和释放的问题。

    //  Individual memory chunk to hold N elements.    // 链表结点称之为chunk_t    struct chunk_t    { T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存 chunk_t *prev; chunk_t *next;    };

无锁队列的实现

当队列空间不足时每次分配一个chunk_t,每个chunk_t能存储N个元素。

当一个chunk中的元素都出队后,回收的chunk也不是马上释放,而是根据局部性原理先回收到spare_chunk里面,当再次需要分配chunk_t的时候从spare_chunk中获取。spare_chunk只保存一个chunk,即只保存最新的要回收的chunk;如果spare_chunk现在保存了一个chunk A,如果现在有一个更新的chunk B需要回收,那么spare_chunk会更新为chunk B,chunk A会被释放;这个操作是通过cas完成的。

批量写入

支持批量写入,批量能够提高吞吐量。

    //  Write an item to the pipe.  Don't flush it yet. If incomplete is    //  set to true the item is assumed to be continued by items    //  subsequently written to the pipe. Incomplete items are neverflushed down the stream.    // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。    inline void write(const T &value_, bool incomplete_)    { //  Place the value to the queue, add new terminator element. queue.back() = value_; queue.push(); //  Move the "flush up to here" poiter. if (!incomplete_) {     f = &queue.back(); // 记录要刷新的位置     // printf("1 f:%p, w:%p\n", f, w); } else {     //  printf("0 f:%p, w:%p\n", f, w); }    }

通过第二个参数incomplete_来判断write是否结束。

write(a, true);write(b, true);write(c, false);flush();

flush后才更新到读端。

怎样唤醒读端?

读端没有数据可读,这个时候应该怎么办?

使用mutex + condition进行wait,休眠;

写端怎么唤醒读端去读取数据呢?

很多消息队列,都是每次有消息,都进行notify。如果发送端每发送一个消息都notify,性能会下降。调用notify,涉及到线程切换,内核态与用户态切换,会影响性能;检测到读端处于阻塞状态,在notify,效率才高。

zmq的无锁队列写端只有在读端处于休眠状态的时候才会发送notify,是不是很厉害的样子?写端是怎么检测到读端处于休眠状态的呢?

写端在进行flush的时候,如果返回false,说明读端处于等待唤醒的状态,就可以进行notify。

无锁队列的实现

condition wait和notify,都需要由应用程序自己去做。

我们修改代码,将写端修改为每次flush都notify;经过测试,性能是会明显下降的。

无锁队列的实现

写端为什么可以检测到读端的状态的?

c值是唯一一个读端和写端都要设置的值,通过对c值进行cas操作,写端就可以判断读端是否处于等待唤醒的状态。

无锁队列的实现

无锁队列的实现

yqueue的实现

yqueue主要负责队列的数据组织,通过chunk机制进行管理。

template <typename T, int N>class yqueue_t{public:    inline yqueue_t();    inline ~yqueue_t();    inline T &front();    inline T &back();    inline void push();    inline void unpush();    inline void pop();private:    struct chunk_t    { T values[N]; chunk_t *prev; chunk_t *next;    };    chunk_t *begin_chunk;    int begin_pos;    chunk_t *back_chunk;    int back_pos;    chunk_t *end_chunk;    int end_pos;    atomic_ptr_t<chunk_t> spare_chunk; // 空闲块(把所有元素都已经出队的块称为空闲块),读写线程的共享变量    // 操作的时候使用xchg原子操作    //  Disable copying of yqueue.    yqueue_t(const yqueue_t &);    const yqueue_t &operator=(const yqueue_t &);};

数据的组织

chunk是通过链表进行组织的;

yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:

begin_chunk/begin_pos:begin_chunk指向第一个的chunk;begin_pos是队列第一个元素在当前chunk中的位置;

back_chunk/back_pos:back_chunk指向队列尾所在的chunk;back_pos是队列最后一个元素在当前chunk的位置;

end_chunk/end_pos: end_chunk指向最后一个chunk;end_chunk和back_chunk大部分情况是一致的;end_pos 大部分情况是 back_pos + 1; end_pos主要是用来判断是否要分配新的chunk。

无锁队列的实现

上图中:

由于back_pos已经指向了back_chunk的最后一个元素,所以end_pos就指向了end_chunk的第一个元素。

back、push函数

back函数返回队列尾部元素的引用;

    //  Returns reference to the back element of the queue.    //  If the queue is empty, behaviour is undefined.    // 返回队列尾部元素的引用,调用者可以通过该引用更新元素,结合push实现插入操作。    // 如果队列为空,该函数是不允许被调用。    inline T &back() // 返回的是引用,是个左值,调用者可以通过其修改容器的值    { return back_chunk->values[back_pos];    }

push函数,更新back_chunk、back_pos的值,并且判断是否需要新的chunk;如果需要新的chunk,先看spare_chunk是否为空: 如果spare_chunk有值,则将spare_chunk作为end_chunk; 否则新malloc一个chunk。

    //  Adds an element to the back end of the queue.    inline void push()    { back_chunk = end_chunk; back_pos = end_pos; // if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满     return; chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL if (sc)   // 如果有spare chunk则继续复用它 {     end_chunk->next = sc;     sc->prev = end_chunk; } else // 没有则重新分配 {     // static int s_cout = 0;     // printf("s_cout:%d\n", ++s_cout);     end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk     alloc_assert(end_chunk->next);     end_chunk->next->prev = end_chunk;   } end_chunk = end_chunk->next; end_pos = 0;    }

可以使用back和push函数向队列中插入元素:

 //  Place the value to the queue, add new terminator element. queue.back() = value_; queue.push();

front、pop函数

front函数返回队列头部元素的引用。

    //  Returns reference to the front element of the queue.    //  If the queue is empty, behaviour is undefined.    // 返回队列头部元素的引用,调用者可以通过该引用更新元素,结合pop实现出队列操作。    inline T &front() // 返回的是引用,是个左值,调用者可以通过其修改容器的值    { return begin_chunk->values[begin_pos];    }

pop函数,主要更新begin_pos;如果begin_pos == N,则回收chunk;将chunk保存到spare_chunk中。

    //  Removes an element from the front end of the queue.    inline void pop()    { if (++begin_pos == N) // 删除满一个chunk才回收chunk {     chunk_t *o = begin_chunk;     begin_chunk = begin_chunk->next;     begin_chunk->prev = NULL;     begin_pos = 0;     //  'o' has been more recently used than spare_chunk,     //  so for cache reasons we'll get rid of the spare and     //  use 'o' as the spare.     chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快     free(cs); }    }

可以使用front和pop函数进行出队列操作

 //  There was at least one value prefetched. //  Return it to the caller. *value_ = queue.front(); queue.pop();

这里有两点需要注意:

  1. pop掉的元素,其销毁工作交给调用者完成,即是pop前调用者需要通过front()接口读取并进行销毁(比如动态分配的对象);
  2. 空闲块的保存,要求是原子操作;因为闲块是读写线程的共享变量,因为在push中也使用了spare_chunk。

ypipe的实现

ypipe_t在yqueue_t的基础上构建一个单写单读的无锁队列。

template <typename T, int N>class ypipe_t{public:    //  Initialises the pipe.    inline ypipe_t();    //  The destructor doesn't have to be virtual. It is mad virtual    //  just to keep ICC and code checking tools from complaining.    inline virtual ~ypipe_t()//  Write an item to the pipe.  Don't flush it yet. If incomplete is    //  set to true the item is assumed to be continued by items    //  subsequently written to the pipe. Incomplete items are neverflushed down the stream.    // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。    inline void write(const T &value_, bool incomplete_);    //  Pop an incomplete item from the pipe. Returns true is such    //  item exists, false otherwise.    inline bool unwrite(T *value_);    //  Flush all the completed items into the pipe. Returns false if    //  the reader thread is sleeping. In that case, caller is obliged to    //  wake the reader up before using the pipe again.    // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。    // 批量刷新的机制, 写入批量后唤醒读线程;    // 反悔机制 unwrite    inline bool flush();    //  Check whether item is available for reading.    // 这里面有两个点,一个是检查是否有数据可读,一个是预取    inline bool check_read(); //  Reads an item from the pipe. Returns false if there is no value.    //  available.    inline bool read(T *value_);    protected:    //  Allocation-efficient queue to store pipe items.    //  Front of the queue points to the first prefetched item, back of    //  the pipe points to last un-flushed item. Front is used only by    //  reader thread, while back is used only by writer thread.    yqueue_t<T, N> queue;    //  Points to the first un-flushed item. This variable is used    //  exclusively by writer thread.    T *w; //指向第一个未刷新的元素,只被写线程使用    //  Points to the first un-prefetched item. This variable is used    //  exclusively by reader thread.    T *r; //指向第一个还没预提取的元素,只被读线程使用; 代表可以读取到哪个位置的元素    //  Points to the first item to be flushed in the future.    T *f; //指向下一轮要被刷新的一批元素中的第一个    //  The single point of contention between writer and reader thread.    //  Points past the last flushed item. If it is NULL,    //  reader is asleep. This pointer should be always accessed using    //  atomic operations.    atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)    //  Disable copying of ypipe object.    ypipe_t(const ypipe_t &);    const ypipe_t &operator=(const ypipe_t &);    }

核心思想是通过**w、r、f**指针,通过对**c**值的cas操作,解决读写线程的数据竞争问题。

write

将一个元素入队列;incomplete_表示write是否结束,如果是flase,将f设置为queue.back()。write最终只是更新了f值。

    //  Write an item to the pipe.  Don't flush it yet. If incomplete is    //  set to true the item is assumed to be continued by items    //  subsequently written to the pipe. Incomplete items are neverflushed down the stream.    // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。    inline void write(const T &value_, bool incomplete_)    { //  Place the value to the queue, add new terminator element. queue.back() = value_; queue.push(); //  Move the "flush up to here" poiter. if (!incomplete_) {     f = &queue.back(); // 记录要刷新的位置 } else { }    }

flush

主要是将w更新到f的位置,说明已经写到的位置。

通过cas操作,尝试将c值设置为f。通过flush的返回值,可以判断读端是否处于等待唤醒的状态。

    //  Flush all the completed items into the pipe. Returns false if    //  the reader thread is sleeping. In that case, caller is obliged to    //  wake the reader up before using the pipe again.    // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。    // 批量刷新的机制, 写入批量后唤醒读线程;    // 反悔机制 unwrite    inline bool flush()    { //  If there are no un-flushed items, do nothing. if (w == f) // 不需要刷新,即是还没有新元素加入     return true; //  Try to set 'c' to 'f'. // read时如果没有数据可以读取则c的值会被置为NULL if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置 {     //  Compare-and-swap was unseccessful because 'c' is NULL.     //  This means that the reader is asleep. Therefore we don't     //  care about thread-safeness and update c in non-atomic     //  manner. We'll return false to let the caller know     //  that reader is sleeping.     c.set(f); // 更新为新的f位置     w = f;     return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理 } else  // 读端还有数据可读取 {     //  Reader is alive. Nothing special to do now. Just move     //  the 'first un-flushed item' pointer to 'f'.     w = f;      // 更新f的位置     return true; }    }

check_read

是一种预读的机制,检查是否有数据可读;通过对c值的cas操作,来更新r值;r就是可以读取到的位置。

c值如果和&queue.front()相等,标志没有数据可读,将c值设置为NULL;写端就可以通过c值判断出读端的状态。

    //  Check whether item is available for reading.    // 这里面有两个点,一个是检查是否有数据可读,一个是预取    inline bool check_read()    { //  Was the value prefetched already? If so, return. if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;     return true; //  There's no prefetched value, so let us prefetch more values. //  Prefetching is to simply retrieve the //  pointer from c in atomic fashion. If there are no //  items to prefetch, set c to NULL (using compare-and-swap). // 两种情况 // 1. 如果c值和queue.front()相等, 返回c值并将c值置为NULL,此时没有数据可读 // 2. 如果c值和queue.front()不相等, 返回c值,此时可能有数据读取 r = c.cas(&queue.front(), NULL); //尝试预取数据,r代表可以读取到哪个位置的元素 //  If there are no elements prefetched, exit. //  During pipe's lifetime r should never be NULL, however, //  it can happen during pipe shutdown when items are being deallocated. if (&queue.front() == r || !r) //判断是否成功预取数据     return false; //  There was at least one value prefetched. return true;    }

基于环形数组的无锁队列

基于环形数组的无锁队列,也是利用cas操作解决多线程数据竞争的问题;它支持多谢多读。

template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>class ArrayLockFreeQueue{public:ArrayLockFreeQueue();virtual ~ArrayLockFreeQueue();QUEUE_INT size();bool enqueue(const ELEM_T &a_data);bool dequeue(ELEM_T &a_data);    bool try_dequeue(ELEM_T &a_data);private:ELEM_T m_thequeue[Q_SIZE];volatile QUEUE_INT m_count;volatile QUEUE_INT m_writeIndex;volatile QUEUE_INT m_readIndex;volatile QUEUE_INT m_maximumReadIndex;inline QUEUE_INT countToIndex(QUEUE_INT a_count);};

关键是对于三种下标的操作:

  1. m_writeIndex;//新元素入列时存放位置在数组中的下标
  2. m_readIndex;/ 下一个出列的元素在数组中的下标
  3. m_maximumReadIndex; //最后一个已经完成入列操作的元素在数组中的下标, 即可以读到的最大索引。

通过对这3个下标的cas操作,解决多线程数据竞争的问题。

无锁队列的实现

enqueue

template <typename ELEM_T, QUEUE_INT Q_SIZE>bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data){QUEUE_INT currentWriteIndex;// 获取写指针的位置QUEUE_INT currentReadIndex;    // 1. 获取可写入的位置do{currentWriteIndex = m_writeIndex;currentReadIndex = m_readIndex;if(countToIndex(currentWriteIndex + 1) ==countToIndex(currentReadIndex)){return false;// 队列已经满了} // 目的是为了获取一个能写入的位置} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));    // 获取写入位置后 currentWriteIndex是一个临时变量,保存我们写入的位置// We know now that this index is reserved for us. Use it to save the datam_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把数据更新到对应的位置    // 2. 更新可读的位置,按着currentWriteIndex + 1的操作 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this// operation has to be done in the same order as the previous CASwhile(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))){ // this is a good place to yield the thread in case there are more// software threads than hardware processors and you have more// than 1 producer thread// have a look at sched_yield (POSIX.1b)sched_yield();// 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。}AtomicAdd(&m_count, 1);return true;}

首先判断队列是否已满:(m_writeIndex + 1) %/Q_SIZE == m_readIndex;如果队列已满,则返回false。

enqueu的核心思想是预先占用一个可写的位置,保证同一个位置只有一个线程会进行写操作;并且保证先获取到位置的线程,先操作,保证了操作的顺序性。这两个都是通过cas操作保证的。

dequeue

template <typename ELEM_T, QUEUE_INT Q_SIZE>bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data){QUEUE_INT currentMaximumReadIndex;QUEUE_INT currentReadIndex;do{ // to ensure thread-safety when there is more than 1 producer thread// a second index is defined (m_maximumReadIndex)currentReadIndex = m_readIndex;currentMaximumReadIndex = m_maximumReadIndex;if(countToIndex(currentReadIndex) ==countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置{// the queue is empty or// a producer thread has allocate space in the queue but is // waiting to commit the data into itreturn false;}// retrieve the data from the queuea_data = m_thequeue[countToIndex(currentReadIndex)];// try to perfrom now the CAS operation on the read index. If we succeed// a_data already contains what m_readIndex pointed to before we // increased itif(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))){AtomicSub(&m_count, 1);// 真正读取到了数据return true;}} while(true);assert(0); // Add this return statement to avoid compiler warningsreturn false;}

判断队列是否有数据可读:m_readIndex == m_maximumReadIndex

通过cas操作保证同一个位置,只有一个线程读取。

多写多读测试结果

无锁队列的实现

从测试结果可以看出,这个基于环形数组的队列,比较适合1写多读的场景,性能会有很大的提升。

总结

本文主要介绍了以下内容:

  • 无锁队列所解决的问题;

  • 无锁队列都是利用了cas操作,来解决多线程数据竞争的问题;因为cas操作的粒度要比mutex,spinlock要小很多;

  • zmq无锁队列实现原理,包括chunk机制、批量写入、怎样唤醒读端等;yqueue、ypipe的具体的实现,预读机制、写端如何检测到读端的状态等;它只支持单写单读的场景;

  • 基于环形数组的无锁队列,它支持多写多读的场景;对于1写多读的场景,性能有很大提升;它是如何解决多线程竞争问题的。