RocketMQ刷盘机制
作者:互联网
概览
RocketMQ的存储读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中。在根据不同的刷盘策略在不同的时间进行刷盘
。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回。如果是异步刷盘,
在消息追加到内存后立刻,不等待刷盘结果立刻返回存储成功结果给消息发送端。RocketMQ使用一个单独的线程按照一个设定的频率执行刷盘操作。
通过在broker配置文件中配置flushDiskType来设定刷盘方式,ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)。默认为异步刷盘。
本次以Commitlog文件刷盘机制为例来讲解刷盘机制。Consumequeue、IndexFile刷盘原理和Commitlog一直。索引文件的刷盘机制并不是采取定时刷盘机制,
而是每更新一次索引文件就会将上一次的改动刷写到磁盘。
刷盘服务是将commitlog、consumequeue两者中的MappedFile文件中的MappedByteBuffer或者FileChannel中的内存中的数据,刷写到磁盘。
还有将IndexFile中的MappedByteBuffer(this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer())中内存的数据刷写到磁盘。
刷盘服务的入口
刷盘服务的入口是CommitLog类对象,FlushCommitLogService是刷盘服务对象,如果是同步刷盘它被赋值为GroupCommitService,
如果是异步刷盘它被赋值为FlushRealTimeService;还有一个FlushCommitLogService的commitLogService对象,这个是将 TransientStorePoll 中的直接内存ByteBuffer,
写到FileChannel映射的磁盘文件中的服务。
// 异步、同步刷盘服务初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘服务为 GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 异步刷盘服务为 FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到内存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
刷盘方法调用入口
putMessage()方法,将消息写入内存的方式不同,调用的刷盘方式也不同。如果是asyncPutMessage()异步将消息写入内存,submitFlushRequest()方法是刷盘入口。
如果是putMessage()同步将消息写入内存,handleDiskFlush()方法是刷盘入口。handleDiskFlush()和submitFlushRequest()都包含有同步刷盘和异步刷盘的方法。
// 异步的方式存放消息
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 异步存储消息,提交刷盘请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
// 根据刷盘结果副本结果,返回存放消息的结果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
// 同步方式存放消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
// 返回存储消息的结果
return putMessageResult;
}
同步刷盘
一条消息调用一次刷盘服务,等待刷盘结果返回,然后再将结果返回;才能处理下一条刷盘消息。以handleDiskFlush()方法来介绍同步刷盘和异步刷盘,
这里是区分刷盘方式的分水岭。
/**
* 一条消息进行刷盘
* @param result 扩展到内存ByteBuffer的结果
* @param putMessageResult 放入ByteBuffer这个过程的结果
* @param messageExt 存放的消息
*/
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 是否等待服务器将这一条消息存储完毕再返回(等待刷盘完成),还是直接处理其他写队列requestsWrite里面的请求
if (messageExt.isWaitStoreMsgOK()) {
//刷盘请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//放入写请求队列
service.putRequest(request);
// 同步等待获取刷盘结果
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
// 5秒超市等待刷盘结果
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
// 刷盘失败,更新存放消息结果超时
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过CountDownLatch(1)操作,通过控制hasNotified状态来实现写队列和读队列的交换
service.wakeup();
}
}
// 异步
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
同步刷盘会创造一个刷盘请求,然后将请求放入处理写刷盘请求的requestsWrite队列,请求里面封装了CompletableFuture对象用来记录刷盘结果,
利用CompletableFuturee的get方法同步等待获取结果。flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),TimeUnit.MILLISECONDS);
flushStatus为刷盘结果,默认等待5秒超时。
GroupCommitService为一个线程,用来定时处理requestsWrite队列里面的写刷盘请求,进行刷盘;它的requestsWrite和requestsRead队列进行了读写分离,
写GroupCommitRequest请求到requestsWrite队列,读GroupCommitRequest请求从requestsRead读取,读取请求今夕写盘操作。这两个队列,形成了化零为整,
将一个个请求,划分为一批,处理一批的GroupCommitRequest请求,然后requestsWrite和requestsRead队列进行交换,requestsRead作为写队列,
requestsWrite作为读队列,实现读写分离。从中使用CountDownLatch2来实现处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换。
// 同步刷盘服务
class GroupCommitService extends FlushCommitLogService {
// 两个队列,读写请求分离
// 刷盘服务写入请求队列
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
// 刷盘服务读取请求队列
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
// 将请求同步写入requestsWrite
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 唤醒刷盘线程处理请求
this.wakeup();
}
// 写队列和读队列交换
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
// 上锁读请求队列
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
// 每一个请求进行刷盘
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
// 一个落盘请求,处理两次,第一次为false,进行刷盘,一次刷盘的数据是多个offset,并不是只有当前这个offset的值,这个offset的值进行了刷盘,
这个请求的第二次刷盘,这个offset已经已经落盘了,
// flushWhere这个值在flush方法已经更新变大,所以flushOK=true,跳出for循环,通知flushOKFuture已经完成。
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
// 是否已经刷过,false未刷,true已刷
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// false 刷盘
if (!flushOK) {
//0代码立刻刷盘,不管缓存中消息有多少
CommitLog.this.mappedFileQueue.flush(0);
}
}
// flushOK:true,返回ok,已经刷过盘了,不用再刷盘;false:刷盘中,返回超时
// 唤醒等待刷盘结果的线程
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
// 更新checkpoint的刷盘commitlog的最后刷盘时间,但是只写写到了checkpoint的内存ByteBuffer,并没有刷盘
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 清空队列
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
// 因为个别的消息不是同步刷盘的,所以它回到这里进行处理
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 线程是否停止
while (!this.isStopped()) {
try {
// 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,出事化为10ms,finally设置hasNotified为未被通知,
交换写对队列和读队列
this.waitForRunning(10);
// 进行刷盘服务处理,一次处理一批请求,单个请求返回给等待刷盘服务结果的线程
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// 处理非正常停机,sleep10ms,交换写请求队列和读请求队列,等待数据处理
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
// 进行请求处理
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
protected void onWaitEnd() {
// 写队列和读队列交换
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}
// 5 分钟
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
处理刷盘请求线程和提交刷盘请求之前的协调
# org.apache.rocketmq.common.ServiceThread
// 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换
public void wakeup() {
// hasNotified默认值是false,未被唤醒,这个操作之后唤醒了,处理刷盘请求
if (hasNotified.compareAndSet(false, true)) {
// waitPoint默认是1,然后其他线程处理
waitPoint.countDown(); // notify
}
}
/**
* 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,finally设置hasNotified为未被通知,交换写对队列和读队列
* @param interval 200ms
*/
protected void waitForRunning(long interval) {
// compareAndSet(except,update);如果真实值value==except,设置value值为update,返回true;如果真实值value !=except,真实值不变,返回false;
// 如果hasNotified真实值为true,那么设置真实值为false,返回true;hasNotified真实值为false,那就返回false,真实值不变
// 如果已经通知了,那就状态变为未通知,如果是同步刷盘任务,交换写请求队列和读请求队列
if (hasNotified.compareAndSet(true, false)) {
// 同步刷盘:写队列和读队列交换
this.onWaitEnd();
return;
}
// 重置countDownLatch对象,等待接受刷盘请求的线程写入请求到requestsRead,写完后,waitPoint.countDown,唤醒处理刷盘请求的线程,开始刷盘
//entry to wait
waitPoint.reset();
try {
// 等待interval毫秒
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
// 设置是否通知为false
hasNotified.set(false);
this.onWaitEnd();
}
}
// 等待这个方法的步骤完成。比如:同步刷盘:写队列和读队列交换
protected void onWaitEnd() {
}
异步刷盘
异步刷盘根据是否开启TransientStorePool暂存池,来区分是否有commit操作。开启TransientStorePool会将writerBuffer中的数据commit到FileChannel中(fileChannel.write(writerBuffer))
然后再将FileChannel中的数据通过flush操作(fileChannel.force())到磁盘中;
如果为开启TransientStorePool,就不会有commit操作,直接flush(MappedByteBuffer.force())到磁盘中。
// 异步刷盘
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//执行flush操作
flushCommitLogService.wakeup();
} else {
//执行commit操作,然后唤醒执行flush操作
commitLogService.wakeup();
}
CommitRealTimeService
定时将 transientStorePool 中的直接内存 ByteBuffer,提交到FileChannel中,然后唤醒刷盘操作。
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到FileChannel中
class CommitRealTimeService extends FlushCommitLogService {
private long lastCommitTimestamp = 0;
@Override
public String getServiceName() {
return CommitRealTimeService.class.getSimpleName();
}
@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 刷盘线程是否停止
while (!this.isStopped()) {
// writerBuffer写数据到FileChannel时间间隔200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// writerBuffer写数据到FileChannel页数大小4
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// writerBuffer写数据到FileChannel跨度时间间隔200ms
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
// 开始时间
long begin = System.currentTimeMillis();
// 触发commit机制有两种方式:1.commit时间超过了两次commit时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
// 本次commit时间>上次commit时间+两次commit时间间隔,则进行commit,不用关心commit页数的大小,设置commitDataLeastPages=0
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
// result=false,表示提交了数据,多与上次提交的位置;表示此次有数据提交;result=true,表示没有新的数据被提交
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
// result = false means some data committed.表示此次有数据提交,然后进行刷盘
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
// 唤起刷盘线程,进行刷盘
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
// 暂停200ms,再运行
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
// 正常关机,循环10次,进行10次的有数据就提交的操作
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
FlushRealTimeService
异步刷盘服务
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
// 刷盘次数
private long printTimes = 0;
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 默认值为false,表示await方法等待,如果为true,表示使用Thread.sleep方法等待
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 刷盘任务时间间隔,多久刷一次盘500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 一次刷写任务至少包含页数,如果待刷写数据不足,小于该参数配置的值,将忽略本次刷写任务,默认4页
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 两次真实刷写任务最大跨度,默认10s
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
// 打印记录日志标志
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
// 触发刷盘机制有两种方式:1.刷盘时间超过了两次刷盘时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
// 本次刷盘时间>上次刷盘时间+两次刷盘时间间隔,则进行刷盘,不用关心刷盘页数的大小,设置commitDataLeastPages=0
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
// 每间隔10次记录一次刷盘日志
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
// 刷盘之前,进行线程sleep
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
// 打印记录日志
if (printFlushProgress) {
this.printFlushProgress();
}
// 刷盘开始时间
long begin = System.currentTimeMillis();
// 刷盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
// 更新checkpoint最后刷盘时间
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// while循环结束,正常关机,保证所有的数据刷写到磁盘
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
// 打印日志
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return FlushRealTimeService.class.getSimpleName();
}
private void printFlushProgress() {
// CommitLog.log.info("how much disk fall behind memory, "
// + CommitLog.this.mappedFileQueue.howMuchFallBehind());
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
刷盘是否开启TransientStorePool的区别
这里讲一下刷盘是否开启TransientStorePool的区别。
image.png不开启TransientStorePool:
MappedByteBuffer是直接内存,它暂时存储了message消息,MappedFile.mapp()方法做好MappedByteBuffer对象直接内存和落盘文件的映射关系,
然后flush()方法执行MappedByteBuffer.force():强制将ByteBuffer中的任何内容的改变写入到磁盘文件。
开启TransientStorePool:
MappedFile的writerBuffer为直接开辟的内存,然后MappedFile的初始化操作,做好FileChannel和磁盘文件的映射,commit()方法实质是执行fileChannel.write(writerBuffer),
将writerBuffer的数据写入到FileChannel映射的磁盘文件,flush操作执行FileChannel.force():将映射文件中的数据强制刷新到磁盘。
TransientStorePool的作用
TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,
避免直接内存被交换到swap分区。
作者:93张先生
链接:https://www.jianshu.com/p/6ef2f03c0ff6
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
标签:false,请求,队列,result,机制,CommitLog,RocketMQ,刷盘 来源: https://www.cnblogs.com/zxy-come-on/p/15975266.html