其他分享
首页 > 其他分享> > 高性能存储队列:Disruptor

高性能存储队列:Disruptor

作者:互联网

文章目录


一、JUC包下队列的缺陷

  • 1、JUC包下队列大部分使用的都是ReentrantLock锁方式来保证线程安全的。在高并发的情况下为了防止OOM,只能选择有界队列,这样就会导致一部分请求的丢失;
  • 2、加锁方式的等待唤醒机制对内存的开销很大,而且存在死锁的隐患;
  • 3、有界队列通常采用数组实现,而数组结构又会导致另一个问题:伪共享,进而导致性能问题;

二、Disruptor为避免缺陷而设计的解决方案

1、存储结构:环形数组、

使用数组可以避免垃圾回收,同时由于空间局部性原理,数组对于处理器的缓存机制更加友好。

2、定位方式:位运算

Disruptor定义的数组长度都是2^n,所以使用的定位方式都是位运算。位运算都是使用二进制的形式实现的,而机器对于二进制的指令显然会更加友好,速度更快。

3、线程安全策略:CAS

Disruptor对数组中的元素进行操作都是通过CAS进行获取的,这样就能大大减少加锁对性能带来的影响。

4、存储方式:缓存填充

缓存填充是为了解决伪共享而设计出来的,它能让每一个缓存行只有一个元素,这样对元素的写入操作就不会影响其它元素的缓存了。

5、任务执行:事件监听机制

使用观察者模式,是为了防止消费者对任务池的不断重试,从而减少这个过程中对CPU性能的消耗。

三、Disruptor的针对数据覆盖的四种策略

1、BlockingWaitStrategy:

常见且默认的等待策略。当这个队列满了,不执行覆盖而是阻塞等待。使用ReentrantLock + Condition实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景

2、SleepingWaitStrategy:

这是一个循环等待策略,会在循环中不断的等待数据。它会先进行自旋等待,如果等待不成功(没有CAS到数据的写入权限),就会使用Thread.yield()方法让出CPU,并最终使用LockSupport.partNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时,典型的应用场景就是异步日志

3、YieldingWaitStrategy:

这个策略用于低延时场合。消费者线程会不断的循环监测缓冲区的变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延迟有比较高的要求,可以考虑这种策略。

4、BusySpinWaitStrategy:

该策略采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用。在这个策略下CPU核数必须大于消费者线程数,推荐在线程绑定到固定的CPU的场景下使用

四、Disruptor的简单使用

需要引入的依赖:

<!-- 引入Disruptor -->
<dependency>
	<groupId>com.lmax</groupId>
	<artifactId>disruptor</artifactId>
	<version>3.3.4</version>
</dependency>

代码实现:

1、构建消息载体(事件Event)

package com.muyichen.demo.disruptor.event;

import lombok.Data;

/**
 * 消息载体(事件)
 */
@Data
public class OrderEvent {

    private long value;

    private String name;

}

2、构建消息(事件)生产者

package com.muyichen.demo.disruptor.producer;

import com.lmax.disruptor.RingBuffer;
import com.muyichen.demo.disruptor.event.OrderEvent;

/**
 * 消息(事件)生产者
 */
public class OrderEventProducer {

    /**
     * 事件环形队列
     */
    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long value, String name) {
        // 获取事件队列的下一个槽
        long sequence = ringBuffer.next();

        try {
            // 获取消息(事件)
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 写入数据消息
            orderEvent.setValue(value);
            orderEvent.setName(name);
        } catch (Exception e) {
            // 异常处理
            e.printStackTrace();
        } finally {
            System.out.println("生产者" + Thread.currentThread().getName() +
                    "发送数据:value:" + value + ",name:" + name);
            // 发布事件
            ringBuffer.publish(sequence);
        }

    }

}

3、构建消息(事件)消费者

package com.muyichen.demo.disruptor.consumer;


import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;

/**
 * 消息(事件)消费者
 */
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() +
                "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
    }

    @Override
    public void onEvent(OrderEvent orderEvent) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() +
                "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
    }
}

测试用例

package com.muyichen.demo.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.muyichen.demo.disruptor.consumer.OrderEventHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;
import com.muyichen.demo.disruptor.producer.OrderEventProducer;

import java.util.concurrent.Executors;

/**
 * 高性能队列测试
 */
public class DisruptorDemo {

    public static void main(String[] args) {
        //创建Disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
                OrderEvent::new, // 等同new OrderEventFactory()
                1024 * 1024,    // 环形数组容量
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,    // 单生产者 (生产类型有两种:单生产者、多生产者)
                new YieldingWaitStrategy()  // 等待策略
        );

        // 设置消费者用于处理RingBuffer的事件
        disruptor.handleEventsWith(new OrderEventHandler());
        // 设置多消费者,消息会被重复消费
        // disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
        // 设置多消费者,消费者要实现WorkHandler接口,这样能保证,消息只会被一个消费者消费
        // disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

        // 启动Disruptor
        disruptor.start();

        // 构建环形队列
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        // 创建生产者并绑定环形队列
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        // 发送消息
        for (int i=0; i<100; i++) {
            producer.onData(i, "muyichen" + i);
        }
        disruptor.shutdown();

    }

}

标签:Disruptor,disruptor,队列,高性能,new,muyichen,import,com
来源: https://blog.csdn.net/qq_42697271/article/details/121990656