RocketMQ存储之MappedFileQueue
作者:互联网
文章目录
一、概述
上一篇我们分析了MappedFile的实现细节,MappedFile实现了文件内存映射的功能。本篇我们分析MappedFileQueue的实现。
MappedFileQueue的作用是:将多个MappedFile按顺序组织起来,并且提供MappedFile的“增删查”操作等作用。由于MappedFileQueue的实现逻辑并不复杂,本篇只分析一部分源码实现。
二、实现细节
首先来看一下MappedFileQueue的创建和加载:
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
// 按顺序加载到队列中
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
实现过程很简单:将指定文件夹下的所有文件都加载映射为MappedFile,然后添加到队列中。
根据时间获取MappedFile:
/**
* 获取第一个更新时间大于timestamp的MappedFile,没有的话就返回最新的MappedFile
* @param timestamp
* @return
*/
public MappedFile getMappedFileByTime(final long timestamp) {
// mappedFiles队列的副本
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
根据更新时间获取MappedFile的实现很简单,通过遍历mappedFiles队列查找第一个修改时间大于timestamp的MappedFile对象并返回,如果没有,就返回队列中的最后一个对象。
删除大于offset的数据:
/**
*
* 删除大于offset的记录
* @param offset
*/
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
// 当前file的最大offset
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
// 当前文件最大的offset大于offset,说明有需要删除的记录
if (offset >= file.getFileFromOffset()) {
// 当前文件的最小offset小于等于offset,说明当前文件有部分记录需要删除
// 将MappedFile的三个指针都设置到offset对应的位置,使大于offset的数据可以被覆盖
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
// 当前文件的最小offset大于offset,说明当前文件所有记录都需要删除
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
删除大于offset的记录的处理过程也很简单:遍历所有的mappedFiles,检查每一个mappedFile的最大指针是否大于offset,如果大于说明该文件中包含需要删除的数据。然后判断mappedFile的最小指针是否大于offset,如果是,则说明该文件中的所有数据都是待删除数据。则整个文件执行销毁逻辑。如果不是,则说明部分数据需要删除,则把当前mappedFile的三个指针(上篇MappedFile分析过)设置为offset,使大于offset的空间可以被覆盖。
获取队列中最新的MappedFile:
/**
* 获取最后一个MappedFile,如果needCreate是true,则保证最后一个MappedFile不是空,且没有被写满
* @param startOffset 整个MappedFileQueue是空时,如果需要创建第一个MappedFile,这个MappedFile的起始offset
* @param needCreate 是否需要创建新的MappedFile
* @return
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
// 如果当前MappedFileQueue是空的,则要创建的文件的起始offset为不大于startOffset的最大能被mappedFileSize整除的数
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 如果当前MappedFileQueue不是空的,且最新的MappedFile被写满了
// 则下一个要创建的文件的起始offset为当前最后一个MappedFile的起始offset+mappedFileSize
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
// 待创建的文件路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 下一个待创建的文件路径(有可能会提前预创建一个文件)
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 创建文件
if (this.allocateMappedFileService != null) {
// 通过allocateMappedFileService创建
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
// 直接new创建
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
这个方法的主要作用就是获取队列中最新的MappedFile,并且参数列表中提供了一个needCreate的选项,控制当队列中没有MappedFile或者最新的MappedFile已经被写满时,是否需要创建新的MappedFile。
这个过程中有一个有意思的逻辑,就是预创建MappedFile。可以看到,当allocateMappedFileService不是空的时候,就会使用allocateMappedFileService来提交创建MappedFile的任务,具体过程如下:
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 内存暂存池中可用的buffer个数
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 将nextFilePath需要创建的文件封装成AllocateRequest创建请求,添加到请求列表
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
if (canSubmitRequests <= 0) {
// 内存暂存池中没有可用buffer对象,分配失败
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
// 内存暂存池中没有可用buffer对象,将需要创建的文件请求,添加到请求队列
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
// 内存暂存池中可用buffer对象减去1
canSubmitRequests--;
}
//预分配nextNextFilePath
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
// 将创建预分配的任务并添加到任务队列
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 等待文件创建线程执行完nextFilePath文件创建
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
我们可以看到,MappedFile的暂存缓冲来自于messageStore提供的transientStorePool。当transientStorePool中没有可用的缓冲buffer,则无法提交创建MappedFile的任务。
在提交创建请求前,需要先校验是否已经提交了该路径的MappedFile创建任务,如果已经有,则不重复添加。随后还会添加一个预分配的任务,将一下一个要创建的MappedFile也作为任务提交。然后等待nextFilePath的MappedFile创建完成。
而MappedFile创建任务的执行也很简单:
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
// 文件预热
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
从任务线程首先任务队列中取出任务,然后执行MappedFile的创建过程:如果开启了暂存缓冲池的功能,就通过:ServiceLoader.load(MappedFile.class)方式创建MappedFile对象,否则就使用构造方法创建MappedFile,不管使用哪种方法创建,都会调用init方法来初始化MappedFile。如果开启了文件预热功能的话,还会调用MappedFile的warmMappedFile方法进行文件预热(文件预热的功能在上篇已经分析)。
根据数据位置重置数据:
public boolean resetOffset(long offset) {
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast != null) {
long lastOffset = mappedFileLast.getFileFromOffset() +
mappedFileLast.getWrotePosition();
long diff = lastOffset - offset;
// 可以重置的前提是,当前写入指针提前与想要重置到的指针offset的偏移量不超过两个文件的额大小
final int maxDiff = this.mappedFileSize * 2;
if (diff > maxDiff)
return false;
}
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
while (iterator.hasPrevious()) {
mappedFileLast = iterator.previous();
// 从队列后往前遍历,如果当前mappedFile的起始指针大于offset,则需要丢弃该文件。
// 否则第一次遇到mappedFile的起始指针不大于offset,就将当前文件的三个指针设置为offset,然后跳出循环
// 即丢弃offset以后的所有数据
if (offset >= mappedFileLast.getFileFromOffset()) {
int where = (int) (offset % mappedFileLast.getFileSize());
mappedFileLast.setFlushedPosition(where);
mappedFileLast.setWrotePosition(where);
mappedFileLast.setCommittedPosition(where);
break;
} else {
iterator.remove();
}
}
return true;
}
重置数据的过程也很简单,但是重置数据有一个限制,是只能重置当前写指针之前2*mappedFileSize以内的数据。
数据从暂存缓冲中提交到文件(commit):
/**
* 提交数据
* @param commitLeastPages 最少提交的页数
* @return 是否数据全部提交完成
*/
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
// offset是在mappedFile里的位置
int offset = mappedFile.commit(commitLeastPages);
// where是在mappedFileQueue中的绝对位置
long where = mappedFile.getFileFromOffset() + offset;
// 本次提交之后的位置等于提交前的位置,说明全部数据都提交了,没有待提交的数据
result = where == this.committedWhere;
this.committedWhere = where;
}
return result;
}
数据提交的逻辑也很简单:根据已提交位置找到第一个还未提交的MappedFile,然后提交该MappedFile,最后更新MappedFileQueue的committedWhere指针。
数据的flush和commit逻辑类似,这里不再分析。
总结
经过分析可知,MappedFileQueue的作用是:将一个文件目录下的MappedFile组织成一个根据文件名排序的文件队列。并且可以控制队列中的文件的创建、文件内容的提交、刷盘、文件的过期剔除。
标签:存储,null,return,mappedFile,req,offset,MappedFileQueue,RocketMQ,MappedFile 来源: https://blog.csdn.net/feijianke666/article/details/117233329