其他分享
首页 > 其他分享> > RocketMQ存储之MappedFileQueue

RocketMQ存储之MappedFileQueue

作者:互联网

文章目录

一、概述

上一篇我们分析了MappedFile的实现细节,MappedFile实现了文件内存映射的功能。本篇我们分析MappedFileQueue的实现。

MappedFileQueue的作用是:将多个MappedFile按顺序组织起来,并且提供MappedFile的“增删查”操作等作用。由于MappedFileQueue的实现逻辑并不复杂,本篇只分析一部分源码实现。

二、实现细节

首先来看一下MappedFileQueue的创建和加载:

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }

    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            // 按顺序加载到队列中
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

实现过程很简单:将指定文件夹下的所有文件都加载映射为MappedFile,然后添加到队列中。

根据时间获取MappedFile:

    /**
     * 获取第一个更新时间大于timestamp的MappedFile,没有的话就返回最新的MappedFile
     * @param timestamp
     * @return
     */
    public MappedFile getMappedFileByTime(final long timestamp) {
    	// mappedFiles队列的副本
        Object[] mfs = this.copyMappedFiles(0);

        if (null == mfs)
            return null;

        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }

        return (MappedFile) mfs[mfs.length - 1];
    }

根据更新时间获取MappedFile的实现很简单,通过遍历mappedFiles队列查找第一个修改时间大于timestamp的MappedFile对象并返回,如果没有,就返回队列中的最后一个对象。

删除大于offset的数据:

    /**
     *
     * 删除大于offset的记录
     * @param offset
     */
    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
            // 当前file的最大offset
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {
                // 当前文件最大的offset大于offset,说明有需要删除的记录
                if (offset >= file.getFileFromOffset()) {
                    // 当前文件的最小offset小于等于offset,说明当前文件有部分记录需要删除

                    // 将MappedFile的三个指针都设置到offset对应的位置,使大于offset的数据可以被覆盖
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    // 当前文件的最小offset大于offset,说明当前文件所有记录都需要删除
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

删除大于offset的记录的处理过程也很简单:遍历所有的mappedFiles,检查每一个mappedFile的最大指针是否大于offset,如果大于说明该文件中包含需要删除的数据。然后判断mappedFile的最小指针是否大于offset,如果是,则说明该文件中的所有数据都是待删除数据。则整个文件执行销毁逻辑。如果不是,则说明部分数据需要删除,则把当前mappedFile的三个指针(上篇MappedFile分析过)设置为offset,使大于offset的空间可以被覆盖。

获取队列中最新的MappedFile:

    /**
     * 获取最后一个MappedFile,如果needCreate是true,则保证最后一个MappedFile不是空,且没有被写满
     * @param startOffset 整个MappedFileQueue是空时,如果需要创建第一个MappedFile,这个MappedFile的起始offset
     * @param needCreate 是否需要创建新的MappedFile
     * @return
     */
     public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {
            // 如果当前MappedFileQueue是空的,则要创建的文件的起始offset为不大于startOffset的最大能被mappedFileSize整除的数
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) {

            // 如果当前MappedFileQueue不是空的,且最新的MappedFile被写满了
            // 则下一个要创建的文件的起始offset为当前最后一个MappedFile的起始offset+mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        if (createOffset != -1 && needCreate) {
            // 待创建的文件路径
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            // 下一个待创建的文件路径(有可能会提前预创建一个文件)
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            // 创建文件
            if (this.allocateMappedFileService != null) {
               // 通过allocateMappedFileService创建
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    // 直接new创建
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

这个方法的主要作用就是获取队列中最新的MappedFile,并且参数列表中提供了一个needCreate的选项,控制当队列中没有MappedFile或者最新的MappedFile已经被写满时,是否需要创建新的MappedFile
这个过程中有一个有意思的逻辑,就是预创建MappedFile。可以看到,当allocateMappedFileService不是空的时候,就会使用allocateMappedFileService来提交创建MappedFile的任务,具体过程如下:

    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                // 内存暂存池中可用的buffer个数
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            }
        }

        // 将nextFilePath需要创建的文件封装成AllocateRequest创建请求,添加到请求列表
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        if (nextPutOK) {
            if (canSubmitRequests <= 0) {
                // 内存暂存池中没有可用buffer对象,分配失败
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            // 内存暂存池中没有可用buffer对象,将需要创建的文件请求,添加到请求队列
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }

            // 内存暂存池中可用buffer对象减去1
            canSubmitRequests--;
        }

        //预分配nextNextFilePath
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        // 将创建预分配的任务并添加到任务队列
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }

        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                // 等待文件创建线程执行完nextFilePath文件创建
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }

        return null;
    }

我们可以看到,MappedFile的暂存缓冲来自于messageStore提供的transientStorePool。当transientStorePool中没有可用的缓冲buffer,则无法提交创建MappedFile的任务。
在提交创建请求前,需要先校验是否已经提交了该路径的MappedFile创建任务,如果已经有,则不重复添加。随后还会添加一个预分配的任务,将一下一个要创建的MappedFile也作为任务提交。然后等待nextFilePath的MappedFile创建完成。

MappedFile创建任务的执行也很简单:

    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }

            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
                if (elapsedTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }

                // pre write mappedFile
                // 文件预热
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMappedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();
        }
        return true;
    }

从任务线程首先任务队列中取出任务,然后执行MappedFile的创建过程:如果开启了暂存缓冲池的功能,就通过:ServiceLoader.load(MappedFile.class)方式创建MappedFile对象,否则就使用构造方法创建MappedFile,不管使用哪种方法创建,都会调用init方法来初始化MappedFile。如果开启了文件预热功能的话,还会调用MappedFile的warmMappedFile方法进行文件预热(文件预热的功能在上篇已经分析)。

根据数据位置重置数据:

    public boolean resetOffset(long offset) {
        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast != null) {
            long lastOffset = mappedFileLast.getFileFromOffset() +
                mappedFileLast.getWrotePosition();
            long diff = lastOffset - offset;
            // 可以重置的前提是,当前写入指针提前与想要重置到的指针offset的偏移量不超过两个文件的额大小
            final int maxDiff = this.mappedFileSize * 2;
            if (diff > maxDiff)
                return false;
        }

        ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();

        while (iterator.hasPrevious()) {
            mappedFileLast = iterator.previous();

            // 从队列后往前遍历,如果当前mappedFile的起始指针大于offset,则需要丢弃该文件。
            // 否则第一次遇到mappedFile的起始指针不大于offset,就将当前文件的三个指针设置为offset,然后跳出循环
            // 即丢弃offset以后的所有数据
            if (offset >= mappedFileLast.getFileFromOffset()) {
                int where = (int) (offset % mappedFileLast.getFileSize());
                mappedFileLast.setFlushedPosition(where);
                mappedFileLast.setWrotePosition(where);
                mappedFileLast.setCommittedPosition(where);
                break;
            } else {
                iterator.remove();
            }
        }
        return true;
    }

重置数据的过程也很简单,但是重置数据有一个限制,是只能重置当前写指针之前2*mappedFileSize以内的数据。

数据从暂存缓冲中提交到文件(commit):

    /**
     * 提交数据
     * @param commitLeastPages 最少提交的页数
     * @return 是否数据全部提交完成
     */
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            // offset是在mappedFile里的位置
            int offset = mappedFile.commit(commitLeastPages);
            // where是在mappedFileQueue中的绝对位置
            long where = mappedFile.getFileFromOffset() + offset;
            // 本次提交之后的位置等于提交前的位置,说明全部数据都提交了,没有待提交的数据
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }

数据提交的逻辑也很简单:根据已提交位置找到第一个还未提交的MappedFile,然后提交该MappedFile,最后更新MappedFileQueue的committedWhere指针。
数据的flush和commit逻辑类似,这里不再分析。

总结

经过分析可知,MappedFileQueue的作用是:将一个文件目录下的MappedFile组织成一个根据文件名排序的文件队列。并且可以控制队列中的文件的创建、文件内容的提交、刷盘、文件的过期剔除。

标签:存储,null,return,mappedFile,req,offset,MappedFileQueue,RocketMQ,MappedFile
来源: https://blog.csdn.net/feijianke666/article/details/117233329