编程语言
首页 > 编程语言> > java – LMAX Disruptor – 什么决定批量大小?

java – LMAX Disruptor – 什么决定批量大小?

作者:互联网

我最近一直在学习LMAX Disruptor并且正在做一些实验.令我困惑的一件事是EventHandler的onEvent处理程序方法的endOfBatch参数.请考虑以下代码.首先,我称为Test1和Test1Worker的虚拟消息和消费者类:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

请注意,我已经延迟了500毫秒,以替代一些现实世界的工作.我也在控制台中打印了序列号

然后我的驱动程序类(作为生产者)称为DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

在这里,在初始化所需的东西之后,我向RingBuffer(缓冲区大小8)提供10条消息,并试图监视一些事情 – 生产者声称RingBuffer中的下一个插槽的延迟以及带有序列号的消息在消费者方面,以及特定序列是否被视为批次结束.

现在,有趣的是,处理每条消息涉及500毫秒的延迟,这就是我得到的输出:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

但是,如果我删除500毫秒的等待时间,这就是我得到的:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

因此,看起来某个消息是否被认为是批处理结束(即,批处理的大小)是否受到消费者的消息处理延迟的影响.可能我在这里很傻,但它应该是怎么回事?这背后的原因是什么?一般来说,什么决定了批量?提前致谢.如果我的问题中的任何内容都不清楚,请告诉我.

解决方法:

批量大小仅由可用元素的数量决定.因此,如果当前有更多可用元素,那么它将包含在批处理中.例如,如果Disruptor调用您的代码并且队列中只有一个元素,那么您将使用endOfBatch = true进行一次调用.如果队列中有8个元素,那么它将收集所有8个元素并在一个批处理中发送它们.

您可以在下面的代码中看到队列中的“可用”条目被提取,并且可能比“下一个”项目多得多.例如,您当前是5,等待插槽6,然后3个事件到达,可用将是8,并且您将批量接收多个呼叫(对于6,7,8).

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

关于元素9的500ms暂停,请注意Disruptor是使用环形缓冲区构建的,并且缓冲区中指定的插槽数为8(请参阅此处的第二个参数):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

如果并非所有消费者都使用了元素,并且环形缓冲区处于容量状态(所有8个元素已满),则生产者将被阻止向缓冲区发布新事件.您可以尝试增加缓冲区大小,例如200万个对象,或确保您的消费者比生产者更快,因此队列不会填满(移除已经证明的睡眠).

标签:java,multithreading,producer-consumer,disruptor-pattern,lmax
来源: https://codeday.me/bug/20190611/1221145.html