其他分享
首页 > 其他分享> > RocketMQ刷盘机制

RocketMQ刷盘机制

作者:互联网

概览

RocketMQ的存储读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中。在根据不同的刷盘策略在不同的时间进行刷盘

。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回。如果是异步刷盘,

在消息追加到内存后立刻,不等待刷盘结果立刻返回存储成功结果给消息发送端。RocketMQ使用一个单独的线程按照一个设定的频率执行刷盘操作。

通过在broker配置文件中配置flushDiskType来设定刷盘方式,ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)。默认为异步刷盘。

本次以Commitlog文件刷盘机制为例来讲解刷盘机制。Consumequeue、IndexFile刷盘原理和Commitlog一直。索引文件的刷盘机制并不是采取定时刷盘机制,

而是每更新一次索引文件就会将上一次的改动刷写到磁盘。

刷盘服务是将commitlog、consumequeue两者中的MappedFile文件中的MappedByteBuffer或者FileChannel中的内存中的数据,刷写到磁盘。

还有将IndexFile中的MappedByteBuffer(this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer())中内存的数据刷写到磁盘。

刷盘服务的入口

刷盘服务的入口是CommitLog类对象,FlushCommitLogService是刷盘服务对象,如果是同步刷盘它被赋值为GroupCommitService,

如果是异步刷盘它被赋值为FlushRealTimeService;还有一个FlushCommitLogService的commitLogService对象,这个是将 TransientStorePoll 中的直接内存ByteBuffer,

写到FileChannel映射的磁盘文件中的服务。

// 异步、同步刷盘服务初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    // 同步刷盘服务为 GroupCommitService
    this.flushCommitLogService = new GroupCommitService();
} else {
    // 异步刷盘服务为 FlushRealTimeService
    this.flushCommitLogService = new FlushRealTimeService();
}

// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到内存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
刷盘方法调用入口

putMessage()方法,将消息写入内存的方式不同,调用的刷盘方式也不同。如果是asyncPutMessage()异步将消息写入内存,submitFlushRequest()方法是刷盘入口。

如果是putMessage()同步将消息写入内存,handleDiskFlush()方法是刷盘入口。handleDiskFlush()和submitFlushRequest()都包含有同步刷盘和异步刷盘的方法。

// 异步的方式存放消息
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {

    // 异步存储消息,提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
    // 根据刷盘结果副本结果,返回存放消息的结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}
// 同步方式存放消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    // handle 硬盘刷新
    handleDiskFlush(result, putMessageResult, msg);
    // handle 高可用
    handleHA(result, putMessageResult, msg);
    // 返回存储消息的结果
    return putMessageResult;
}

同步刷盘

一条消息调用一次刷盘服务,等待刷盘结果返回,然后再将结果返回;才能处理下一条刷盘消息。以handleDiskFlush()方法来介绍同步刷盘和异步刷盘,

这里是区分刷盘方式的分水岭。

/**
 * 一条消息进行刷盘
 * @param result 扩展到内存ByteBuffer的结果
 * @param putMessageResult 放入ByteBuffer这个过程的结果
 * @param messageExt 存放的消息
 */
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        // 是否等待服务器将这一条消息存储完毕再返回(等待刷盘完成),还是直接处理其他写队列requestsWrite里面的请求
        if (messageExt.isWaitStoreMsgOK()) {
            //刷盘请求
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //放入写请求队列
            service.putRequest(request);
            // 同步等待获取刷盘结果
            CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
            PutMessageStatus flushStatus = null;
            try {
                // 5秒超市等待刷盘结果
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                        TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            // 刷盘失败,更新存放消息结果超时
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            // 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过CountDownLatch(1)操作,通过控制hasNotified状态来实现写队列和读队列的交换
            service.wakeup();
        }
    }
    // 异步
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

同步刷盘会创造一个刷盘请求,然后将请求放入处理写刷盘请求的requestsWrite队列,请求里面封装了CompletableFuture对象用来记录刷盘结果,

利用CompletableFuturee的get方法同步等待获取结果。flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),TimeUnit.MILLISECONDS);

flushStatus为刷盘结果,默认等待5秒超时。

GroupCommitService为一个线程,用来定时处理requestsWrite队列里面的写刷盘请求,进行刷盘;它的requestsWrite和requestsRead队列进行了读写分离,

写GroupCommitRequest请求到requestsWrite队列,读GroupCommitRequest请求从requestsRead读取,读取请求今夕写盘操作。这两个队列,形成了化零为整,

将一个个请求,划分为一批,处理一批的GroupCommitRequest请求,然后requestsWrite和requestsRead队列进行交换,requestsRead作为写队列,

requestsWrite作为读队列,实现读写分离。从中使用CountDownLatch2来实现处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换。

// 同步刷盘服务
class GroupCommitService extends FlushCommitLogService {
    // 两个队列,读写请求分离
    // 刷盘服务写入请求队列
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    // 刷盘服务读取请求队列
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
    // 将请求同步写入requestsWrite
    public synchronized void putRequest(final GroupCommitRequest request) {
        synchronized (this.requestsWrite) {
            this.requestsWrite.add(request);
        }
        // 唤醒刷盘线程处理请求
        this.wakeup();
    }
    // 写队列和读队列交换
    private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }

    private void doCommit() {
        // 上锁读请求队列
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                // 每一个请求进行刷盘
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    // 一个落盘请求,处理两次,第一次为false,进行刷盘,一次刷盘的数据是多个offset,并不是只有当前这个offset的值,这个offset的值进行了刷盘,
这个请求的第二次刷盘,这个offset已经已经落盘了, // flushWhere这个值在flush方法已经更新变大,所以flushOK=true,跳出for循环,通知flushOKFuture已经完成。 boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { // 是否已经刷过,false未刷,true已刷 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // false 刷盘 if (!flushOK) { //0代码立刻刷盘,不管缓存中消息有多少 CommitLog.this.mappedFileQueue.flush(0); } } // flushOK:true,返回ok,已经刷过盘了,不用再刷盘;false:刷盘中,返回超时 // 唤醒等待刷盘结果的线程 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } // 更新checkpoint的刷盘commitlog的最后刷盘时间,但是只写写到了checkpoint的内存ByteBuffer,并没有刷盘 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 清空队列 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process // 因为个别的消息不是同步刷盘的,所以它回到这里进行处理 CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 线程是否停止 while (!this.isStopped()) { try { // 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,出事化为10ms,finally设置hasNotified为未被通知,
交换写对队列和读队列 this.waitForRunning(10); // 进行刷盘服务处理,一次处理一批请求,单个请求返回给等待刷盘服务结果的线程 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 处理非正常停机,sleep10ms,交换写请求队列和读请求队列,等待数据处理 // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } // 进行请求处理 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override protected void onWaitEnd() { // 写队列和读队列交换 this.swapRequests(); } @Override public String getServiceName() { return GroupCommitService.class.getSimpleName(); } // 5 分钟 @Override public long getJointime() { return 1000 * 60 * 5; } }
处理刷盘请求线程和提交刷盘请求之前的协调
# org.apache.rocketmq.common.ServiceThread
// 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换
public void wakeup() {
    // hasNotified默认值是false,未被唤醒,这个操作之后唤醒了,处理刷盘请求
    if (hasNotified.compareAndSet(false, true)) {
        // waitPoint默认是1,然后其他线程处理
        waitPoint.countDown(); // notify
    }
}

/**
 * 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,finally设置hasNotified为未被通知,交换写对队列和读队列
 * @param interval 200ms
 */
protected void waitForRunning(long interval) {
    // compareAndSet(except,update);如果真实值value==except,设置value值为update,返回true;如果真实值value !=except,真实值不变,返回false;
    // 如果hasNotified真实值为true,那么设置真实值为false,返回true;hasNotified真实值为false,那就返回false,真实值不变
    // 如果已经通知了,那就状态变为未通知,如果是同步刷盘任务,交换写请求队列和读请求队列
    if (hasNotified.compareAndSet(true, false)) {
        // 同步刷盘:写队列和读队列交换
        this.onWaitEnd();
        return;
    }
    // 重置countDownLatch对象,等待接受刷盘请求的线程写入请求到requestsRead,写完后,waitPoint.countDown,唤醒处理刷盘请求的线程,开始刷盘
    //entry to wait
    waitPoint.reset();

    try {
        // 等待interval毫秒
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 设置是否通知为false
        hasNotified.set(false);
        this.onWaitEnd();
    }
}
// 等待这个方法的步骤完成。比如:同步刷盘:写队列和读队列交换
protected void onWaitEnd() {
}

异步刷盘

异步刷盘根据是否开启TransientStorePool暂存池,来区分是否有commit操作。开启TransientStorePool会将writerBuffer中的数据commit到FileChannel中(fileChannel.write(writerBuffer))

然后再将FileChannel中的数据通过flush操作(fileChannel.force())到磁盘中;
如果为开启TransientStorePool,就不会有commit操作,直接flush(MappedByteBuffer.force())到磁盘中。

// 异步刷盘
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    //执行flush操作
    flushCommitLogService.wakeup();
} else {
    //执行commit操作,然后唤醒执行flush操作
    commitLogService.wakeup();
}
CommitRealTimeService

定时将 transientStorePool 中的直接内存 ByteBuffer,提交到FileChannel中,然后唤醒刷盘操作。

// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到FileChannel中
class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        // 刷盘线程是否停止
        while (!this.isStopped()) {
            // writerBuffer写数据到FileChannel时间间隔200ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
            // writerBuffer写数据到FileChannel页数大小4
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            // writerBuffer写数据到FileChannel跨度时间间隔200ms
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
            // 开始时间
            long begin = System.currentTimeMillis();
            // 触发commit机制有两种方式:1.commit时间超过了两次commit时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
            // 本次commit时间>上次commit时间+两次commit时间间隔,则进行commit,不用关心commit页数的大小,设置commitDataLeastPages=0
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                // result=false,表示提交了数据,多与上次提交的位置;表示此次有数据提交;result=true,表示没有新的数据被提交
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                // result = false means some data committed.表示此次有数据提交,然后进行刷盘
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    // 唤起刷盘线程,进行刷盘
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                // 暂停200ms,再运行
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        // 正常关机,循环10次,进行10次的有数据就提交的操作
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
}
FlushRealTimeService

异步刷盘服务

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    // 刷盘次数
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            // 默认值为false,表示await方法等待,如果为true,表示使用Thread.sleep方法等待
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            // 刷盘任务时间间隔,多久刷一次盘500ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // 一次刷写任务至少包含页数,如果待刷写数据不足,小于该参数配置的值,将忽略本次刷写任务,默认4页
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 两次真实刷写任务最大跨度,默认10s
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            // 打印记录日志标志
            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            // 触发刷盘机制有两种方式:1.刷盘时间超过了两次刷盘时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
            // 本次刷盘时间>上次刷盘时间+两次刷盘时间间隔,则进行刷盘,不用关心刷盘页数的大小,设置commitDataLeastPages=0
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                // 每间隔10次记录一次刷盘日志
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                // 刷盘之前,进行线程sleep
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
                // 打印记录日志
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
                // 刷盘开始时间
                long begin = System.currentTimeMillis();
                // 刷盘
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                // 更新checkpoint最后刷盘时间
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
        // while循环结束,正常关机,保证所有的数据刷写到磁盘
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        // 打印日志
        this.printFlushProgress();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return FlushRealTimeService.class.getSimpleName();
    }

    private void printFlushProgress() {
        // CommitLog.log.info("how much disk fall behind memory, "
        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}   

刷盘是否开启TransientStorePool的区别

这里讲一下刷盘是否开启TransientStorePool的区别。

  image.png
不开启TransientStorePool:

MappedByteBuffer是直接内存,它暂时存储了message消息,MappedFile.mapp()方法做好MappedByteBuffer对象直接内存和落盘文件的映射关系,

然后flush()方法执行MappedByteBuffer.force():强制将ByteBuffer中的任何内容的改变写入到磁盘文件。

开启TransientStorePool:

MappedFile的writerBuffer为直接开辟的内存,然后MappedFile的初始化操作,做好FileChannel和磁盘文件的映射,commit()方法实质是执行fileChannel.write(writerBuffer),

将writerBuffer的数据写入到FileChannel映射的磁盘文件,flush操作执行FileChannel.force():将映射文件中的数据强制刷新到磁盘。

TransientStorePool的作用

TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,

避免直接内存被交换到swap分区。



作者:93张先生
链接:https://www.jianshu.com/p/6ef2f03c0ff6
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

标签:false,请求,队列,result,机制,CommitLog,RocketMQ,刷盘
来源: https://www.cnblogs.com/zxy-come-on/p/15975266.html