其他分享
首页 > 其他分享> > disruptor笔记之三:环形队列的基础操作(不用Disruptor类)

disruptor笔记之三:环形队列的基础操作(不用Disruptor类)

作者:互联网

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor笔记》系列链接

  1. 快速入门
  2. Disruptor类分析
  3. 环形队列的基础操作(不用Disruptor类)
  4. 事件消费知识点小结
  5. 事件消费实战
  6. 常见场景
  7. 等待策略
  8. 知识点补充(终篇)

本篇概览

  1. 100个事件,单个消费者消费;
  2. 100个事件,三个消费者,每个都独自消费这个100个事件;
  3. 100个事件,三个消费者共同消费这个100个事件;

前文回顾

为了完成本篇的实战,前文《disruptor笔记之二:Disruptor类分析》已做了充分的研究分析,建议观看,这里简单回顾以下Disruptor类的几个核心功能,这也是咱们编码时要实现的:

  1. 创建环形队列(RingBuffer对象)
  2. 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
  3. 创建BatchEventProcessor,负责消费事件
  4. 绑定BatchEventProcessor对象的异常处理类
  5. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
  6. 启动独立线程,用来执行消费事件的业务逻辑

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

在这里插入图片描述

在这里插入图片描述

开发

  1. 100个事件,单个消费者消费;
  2. 100个事件,三个消费者,每个都独自消费这个100个事件;
  3. 100个事件,三个消费者共同消费这个100个事件;
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LowLevelOperateApplication {
	public static void main(String[] args) {
		SpringApplication.run(LowLevelOperateApplication.class, args);
	}
}
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {
    private String value;
}
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {
    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {

    // 存储数据的环形队列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {

        // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件添加业务信息
            stringEvent.setValue(content);
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
package com.bolingcavalry.service;

public interface LowLevelOperateService {
    /**
     * 消费者数量
     */
    int CONSUMER_NUM = 3;

    /**
     * 环形缓冲区大小
     */
    int BUFFER_SIZE = 16;

    /**
     * 发布一个事件
     * @param value
     * @return
     */
    void publish(String value);

    /**
     * 返回已经处理的任务总数
     * @return
     */
    long eventCount();
}

100个事件,单个消费者消费

package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    private ExecutorService executors;

    @PostConstruct
    private void init() {
        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 创建环形队列实例
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        // 准备线程池
        executors = Executors.newFixedThreadPool(1);

        //创建SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        // 创建事件处理的工作类,里面执行StringEventHandler处理事件
        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                sequenceBarrier,
                new StringEventHandler(eventCountPrinter));

        // 将消费者的sequence传给环形队列
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 在一个独立线程中取事件并消费
        executors.submit(batchEventProcessor);

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  1. 自己创建环形队列RingBuffer实例
  2. 自己准备线程池,里面的线程用来获取和消费消息
  3. 自己动手创建BatchEventProcessor实例,并把事件处理类传入
  4. 通过ringBuffer创建sequenceBarrier,传给BatchEventProcessor实例使用
  5. 将BatchEventProcessor的sequence传给ringBuffer,确保ringBuffer的生产和消费不会出现混乱
  6. 启动线程池,意味着BatchEventProcessor实例在一个独立线程中不断的从ringBuffer中获取事件并消费;
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {

    @Autowired
    @Qualifier("oneConsumer")
    LowLevelOperateService oneConsumer;

    private static final int EVENT_COUNT = 100;

    private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
        for(int i=0;i<eventCount;i++) {
            log.info("publich {}", i);
            service.publish(String.valueOf(i));
        }

        // 异步消费,因此需要延时等待
        Thread.sleep(10000);

        // 消费的事件总数应该等于发布的事件数
        assertEquals(expectEventCount, service.eventCount());
    }

    @Test
    public void testOneConsumer() throws InterruptedException {
        log.info("start testOneConsumerService");
        testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
    }

在这里插入图片描述

在这里插入图片描述

100个事件,三个消费者,每个都独自消费这个100个事件

package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    /**
     * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息
     * @param executorService
     */
    private void addProcessor(ExecutorService executorService) {
        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                ringBuffer.newBarrier(),
                new StringEventHandler(eventCountPrinter));

        // 将当前消费者的sequence实例传给ringBuffer
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 启动独立线程获取和消费事件
        executorService.submit(batchEventProcessor);
    }

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        // 创建多个消费者,并在独立线程中获取和消费事件
        for (int i=0;i<CONSUMER_NUM;i++) {
            addProcessor(executorService);
        }

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
 	@Autowired
    @Qualifier("multiConsumer")
    LowLevelOperateService multiConsumer;

    @Test
    public void testMultiConsumer() throws InterruptedException {
        log.info("start testMultiConsumer");
        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
    }

在这里插入图片描述

100个事件,三个消费者共同消费这个100个事件

package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {

    public StringWorkHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event) throws Exception {
        log.info("work handler event : {}", event);

        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];

        // 创建多个StringWorkHandler实例,放入一个数组中
        for (int i=0;i < CONSUMER_NUM;i++) {
            handlers[i] = new StringWorkHandler(o -> {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            });
        }

        // 创建WorkerPool实例,将StringWorkHandler实例的数组传进去,代表共同消费者的数量
        WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);

        // 这一句很重要,去掉就会出现重复消费同一个事件的问题
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(executorService);

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  1. StringWorkHandler数组传入给WorkerPool后,每个StringWorkHandler实例都放入一个新的WorkProcessor实例,WorkProcessor实现了Runnable接口,在执行workerPool.start时,会将WorkProcessor提交到线程池中;

  2. 和前面的独立消费相比,共同消费最大的特点在于只调用了一次ringBuffer.addGatingSequences方法,也就是说三个消费者共用一个sequence实例;

 	@Autowired
    @Qualifier("workerPoolConsumer")
    LowLevelOperateService workerPoolConsumer;
    
    @Test
    public void testWorkerPoolConsumer() throws InterruptedException {
        log.info("start testWorkerPoolConsumer");
        testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
    }

在这里插入图片描述

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

标签:disruptor,Disruptor,之三,ringBuffer,import,100,com,public
来源: https://www.cnblogs.com/bolingcavalry/p/15336337.html