高性能存储队列:Disruptor
作者:互联网
文章目录
一、JUC包下队列的缺陷
- 1、JUC包下队列大部分使用的都是ReentrantLock锁方式来保证线程安全的。在高并发的情况下为了防止OOM,只能选择有界队列,这样就会导致一部分请求的丢失;
- 2、加锁方式的等待唤醒机制对内存的开销很大,而且存在死锁的隐患;
- 3、有界队列通常采用数组实现,而数组结构又会导致另一个问题:伪共享,进而导致性能问题;
二、Disruptor为避免缺陷而设计的解决方案
1、存储结构:环形数组、
使用数组可以避免垃圾回收,同时由于空间局部性原理,数组对于处理器的缓存机制更加友好。
2、定位方式:位运算
Disruptor定义的数组长度都是2^n,所以使用的定位方式都是位运算。位运算都是使用二进制的形式实现的,而机器对于二进制的指令显然会更加友好,速度更快。
3、线程安全策略:CAS
Disruptor对数组中的元素进行操作都是通过CAS进行获取的,这样就能大大减少加锁对性能带来的影响。
4、存储方式:缓存填充
缓存填充是为了解决伪共享而设计出来的,它能让每一个缓存行只有一个元素,这样对元素的写入操作就不会影响其它元素的缓存了。
5、任务执行:事件监听机制
使用观察者模式,是为了防止消费者对任务池的不断重试,从而减少这个过程中对CPU性能的消耗。
三、Disruptor的针对数据覆盖的四种策略
1、BlockingWaitStrategy:
常见且默认的等待策略。当这个队列满了,不执行覆盖而是阻塞等待。使用ReentrantLock + Condition实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景。
2、SleepingWaitStrategy:
这是一个循环等待策略,会在循环中不断的等待数据。它会先进行自旋等待,如果等待不成功(没有CAS到数据的写入权限),就会使用Thread.yield()方法让出CPU,并最终使用LockSupport.partNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时,典型的应用场景就是异步日志。
3、YieldingWaitStrategy:
这个策略用于低延时场合。消费者线程会不断的循环监测缓冲区的变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延迟有比较高的要求,可以考虑这种策略。
4、BusySpinWaitStrategy:
该策略采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用。在这个策略下CPU核数必须大于消费者线程数,推荐在线程绑定到固定的CPU的场景下使用。
四、Disruptor的简单使用
需要引入的依赖:
<!-- 引入Disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
代码实现:
1、构建消息载体(事件Event)
package com.muyichen.demo.disruptor.event;
import lombok.Data;
/**
* 消息载体(事件)
*/
@Data
public class OrderEvent {
private long value;
private String name;
}
2、构建消息(事件)生产者
package com.muyichen.demo.disruptor.producer;
import com.lmax.disruptor.RingBuffer;
import com.muyichen.demo.disruptor.event.OrderEvent;
/**
* 消息(事件)生产者
*/
public class OrderEventProducer {
/**
* 事件环形队列
*/
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long value, String name) {
// 获取事件队列的下一个槽
long sequence = ringBuffer.next();
try {
// 获取消息(事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入数据消息
orderEvent.setValue(value);
orderEvent.setName(name);
} catch (Exception e) {
// 异常处理
e.printStackTrace();
} finally {
System.out.println("生产者" + Thread.currentThread().getName() +
"发送数据:value:" + value + ",name:" + name);
// 发布事件
ringBuffer.publish(sequence);
}
}
}
3、构建消息(事件)消费者
package com.muyichen.demo.disruptor.consumer;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;
/**
* 消息(事件)消费者
*/
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
// TODO 消费逻辑
System.out.println("消费者" + Thread.currentThread().getName() +
"消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
}
@Override
public void onEvent(OrderEvent orderEvent) throws Exception {
// TODO 消费逻辑
System.out.println("消费者" + Thread.currentThread().getName() +
"消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
}
}
测试用例
package com.muyichen.demo.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.muyichen.demo.disruptor.consumer.OrderEventHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;
import com.muyichen.demo.disruptor.producer.OrderEventProducer;
import java.util.concurrent.Executors;
/**
* 高性能队列测试
*/
public class DisruptorDemo {
public static void main(String[] args) {
//创建Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
OrderEvent::new, // 等同new OrderEventFactory()
1024 * 1024, // 环形数组容量
Executors.defaultThreadFactory(),
ProducerType.SINGLE, // 单生产者 (生产类型有两种:单生产者、多生产者)
new YieldingWaitStrategy() // 等待策略
);
// 设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
// 设置多消费者,消息会被重复消费
// disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
// 设置多消费者,消费者要实现WorkHandler接口,这样能保证,消息只会被一个消费者消费
// disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
// 启动Disruptor
disruptor.start();
// 构建环形队列
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
// 创建生产者并绑定环形队列
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
// 发送消息
for (int i=0; i<100; i++) {
producer.onData(i, "muyichen" + i);
}
disruptor.shutdown();
}
}
标签:Disruptor,disruptor,队列,高性能,new,muyichen,import,com 来源: https://blog.csdn.net/qq_42697271/article/details/121990656