其他分享
首页 > 其他分享> > disruptor 学习笔记

disruptor 学习笔记

作者:互联网

一、disruptor 简介:

高效无锁内存队列,使用无锁方式实现一个环形队列 代替 线性队列。

 

相对于普通队列,环形队列不需要维护头尾两个指针,只需维护一个当前位置就可以完成入队操作。

环形队列大小不能扩展。整个业务逻辑处理器完全运行在内存中,架构单线程可处理每秒600W流水。非常适合哪种实时性高、延迟率低、业务流水量大的应用场景。

 

二、Disruptor 框架核心类

 Disruptor : 用于控制整个消费者 - 生产者模型的处理器

 RingBuffer : 用于存放数据 (环形缓冲区)

 EventHandler : 一个用于处理事件的接口(可当生产者,也可当消费者)

 EventFactory : 事件工厂类

 WaitStrategy : 用于实现事件处理等待 RingBuffer 游标策略的接口

 SequeueBarrier :队列屏障,用于处理访问 RingBuffer 的序列

 用于运行Disruptor 的线程或者线程池

 

三、Disruptor 编写步骤

 定义事件

 定义事件工厂

 消费者 - 定义事件处理的具体实现

 定义用于事件处理(消费者)的线程池

 指定等待策略

 BlockingWaitStrategy 最低效策略

 SleepingWaitStrategy 适用于异步日志场景

 YieldingWaitStrategy 性能最好,适用于低延迟系统

 生产(发布)消息

 关闭 disruptor 业务逻辑处理器

 

四、ringbuffer 环形缓冲区 (ring : 环,戒指。 buffer :缓冲区)

 没有尾指针,只维护一个指向下一个可用位置的序号。不删除buffer中的数据,知道新数据覆盖他们。

 RingBuffer 优点: 它是数组,比链表块,而且有容易预测的访问模式(数组内元素内存地址是连续存储的)。另外可以为数组预先分配内存,数组对象一直存在,就不用花大量时间用于垃圾回收。(不像链表要为每一个添加到上面的对象创造节点,删除节点时还要执行内存清理)。

 ringbuffer 是由一个大数组组成的

 所有 ringbuffer 的 “指针(也称为序列或游标)”,是Java long类型的(64 位有符号数),指针采用往上计数自增的方式。(别担心溢出,100W/s,30W年 才溢出 )

 对 RingBuffer 中的指针进行按 ringbuffer 的 size 取模 找出数组下标来定位入口(类似于HashMap的entry )。通常将 RingBuffer 的size 设置成实际使用的 2 倍,可通过位运算(bit-mask)方式计算出数组下标。

 写入数据先后顺序由线程抢占位置决定,不是由提交先后决定。

 

 

五、从 RingBuffer 读取数据

原文学习地址:http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/ 

ConsumerBarrier ( 消费屏障 ) 与消费者

ConsumerBarrier  : 一个由 RingBuffer 创建 并且代表消费者与 RingBuffer 进行交互的对象。

消费者调用 ConsumerBarrier 对象的 waitFor() 方法,传递它所需要的下一个序号:

代码  :  final long availableSeq = consumerBarrier.waitFor(nextSequence) ;

ConsumerBarrier 会返回 RingBuffer 的最大可访问序号

【ConsumerBarrier 有一个 WaitStrategy 方法来决定等待序号 - 最大访问序号】

 

六、写入 RingBuffer

原文学习地址:http://ifeve.com/disruptor-writing-ringbuffer/ 

RingBuffer 提供了 ProducterBuffer(生产整流器、生产者壁垒) 对象。让生产者通过它写入 RingBuffer。

写入过程涉及 两阶段提交(two-phase commit):

 生产者需要申请 buffer 里下一个节点

 生产者向节点写完数据,会调用 ProducerBarrier 的 commit 方法。

 

第一步  -   申请下个节点,就是简单调用 ProducerBarrier 中的 nextEntry()  方法,方法返回的 Entry 对象就是下一个节点。这里 ProducerBarrier 需要 防止 RingBuffer 重叠 。

ConsumerTrackingProducerBarrier 对象拥有所有正在访问 RingBuffer 的消费者列表。Disruptor中由消费者通知它处理到了那个序列号。

当 生产者申请的环形节点的下一个节点被其他消费者占用时,ProducerBuffer 停下 自旋(spins),等待,直到那个消费者离开。消费者离开后,ProducerBarrier 会发现该节点可用,抢占该节点的 Entry ,并更新节点 Entry 上的序号,再把 Entry 返回给生产者。生产者往 Entry 里写数据。

第二部  -  提交 。生产者往 Entry 写入数据后,会要求 ProducerBarrier 把数据提交到 RingBuffer。

单生产者时:

ProducerBarrier 先等待 RingBuffer 的游标追上当前位置后,更新 RingBuffer 游标到刚写入的Entry序号 并提交Entry。

然后 ProducerBarrier 调用 ConsumerBarrier 上的 WaitStrategy 对象,让消费者直到 buffer 中有新东西了。消费者就可以读取最新的 Entry 内容。

ProducerBarrier 的批处理,ProducerBarrier 知道 RingBuffer的大小,知道最慢消费者的位置,也知道当前那些节点可用。在最慢消费者的节点前,不需要再检查消费者位置,生产者就可以连续写入节点。

多个生产者时:

ProducerBarrier 拿到的下一个序号是生产者通过 ClaimStrategy 申请获取的。

多个生产者向ClaimStrategy 申请可用节点,如果一个生产者在写入 RingBuffer 时候暂停了,只有当它解除暂停,并写入RingBuffer,RingBuffer 游标移动到该节点 后,后面节点等待的提交才会立即执行。

 

七、Disruptor 依赖关系

原文学习地址:http://ifeve.com/dissecting-disruptor-wiring-up-cn/ 

用 Disruptor 实现菱形结构,

菱形结构创建代码:

 

ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier();

BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);

BatchConsumer consumer2 =  new BatchConsumer(consumerBarrier1, handler2);

ConsumerBarrier consumerBarrier2 = ringBuffer.createConsumerBarrier(consumer1, consumer2);

BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);

ProducerBarrier producerBarrier = ringBuffer.createProducerBarrier(consumer3);

 

RingBuffer 可以有多个 ConsumerBarrier。

节点(Entry)对象的每一个字段应该只允许一个消费者写入,避免产生并发写入冲突(write-contention)减慢整个处理过程。  实际中:有个例子 FizzBuzzEntry 有两个字段:fizz 和 buzz 。如果消费者是 Fizz Consumer ,只写入字段 fizz,如果是 Buzz Consumer 只写入字段 buzz ,第三个消费者 FizzBuzz 只读取这两个字段。【用于 生产者插入 Entry 后,消费者1获取到Entry 数据,对数据进行操作后传递给消费者2 再次对Entry 的数据进行处理的情景。最终 消费者1、消费者2修改后的Entry 被消费者 3读取。】

总结:关联Disruptor 与相互依赖(等待)的多个消费者。关键点:

 使用多个ConsumerBarrier 来管理消费者之间的依赖(等待)关系

 使用 ProducerBarrier 监视结构图中最后一个消费者

 只允许一个消费者更新数据节点(Entry)的一个独立字段。

 

标签:disruptor,消费者,生产者,笔记,学习,ProducerBarrier,RingBuffer,Entry,节点
来源: https://www.cnblogs.com/wgy1/p/10754960.html