其他分享
首页 > 其他分享> > RocketMQ学习随笔-Broker启动

RocketMQ学习随笔-Broker启动

作者:互联网

文章目录

Broker启动

入口

public static void main(String[] args) {
    start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
    try {

        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

创建BrokerController对象

Namesrv的启动方式类似,首先调用createBrokerController方法创建BrokerController对象:

public static BrokerController createBrokerController(String[] args) {
    //设置版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//设置netty发送缓存区的大小
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
        NettySystemConfig.socketSndbufSize = 131072;
    }
	//设置netty接收缓存区的大小
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
        NettySystemConfig.socketRcvbufSize = 131072;
    }

    try {
        //PackageConflictDetect.detectFastjson();
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                                              new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
		//broker配置相关对象
        final BrokerConfig brokerConfig = new BrokerConfig();
        //作为netty服务端,与Producer/Consumer通信
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        //作为netty的客户端,与namesrv通信
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
		
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        //服务端监听10911端口
        nettyServerConfig.setListenPort(10911);
        //消息存储配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
		//如果是从节点
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            //AccessMessageInMemoryMaxRatio: 表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存 master ==> 40% slave ==> 30%
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }
		//-c 指定配置文件
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
			
                properties2SystemEnv(properties);
                //将配置文件中对应的值填充到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig四个对象中
                MixAll.properties2Object(properties, brokerConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, messageStoreConfig);

                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
		//必须设置ROCKETMQ_HOME环境变量
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
		//解析namesrv的地址
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }
		//brokerId=0 ==> master brokerId > 0 slave
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }
		//是否选择 dleger技术 后续了解
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }
		//Master监听Slave请求的端口,默认为服务端口+1
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
		//-p: 启动时候日志打印配置信息
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) {//-m:启动时候日志打印导入的配置信息
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }

        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);
		//创建BrokerController对象
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
		//调用BrokerController的initialize方法
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
		//注册JVM钩子,优雅关闭
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        long beginTime = System.currentTimeMillis();
                        controller.shutdown();
                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                    }
                }
            }
        }, "ShutdownHook"));

        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

上面这段代码主要可以分为下边几个部分:

调用start方法

资源加载完毕,调用start方法真正启动:

public void start() throws Exception {
    	//启动消息存储服务
        if (this.messageStore != null) {
            this.messageStore.start();
        }
		//启动netty服务端,监听请求
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
		//VIP通道
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
		//启动TLS证书检测服务
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
		//启动netty客户端,连接namesrv
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
		// 启动PushConsumer的PullRequestHoldService
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
		//启动监控服务,每十秒钟检测producer,consumer,filterserver是否正常
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
		// 启动FilterServer管理计划任务,每三十秒运行一次启动脚本,保持一定数量的FilterServer运行
            // FilterServer会向Broker注册,在Broker和Consumer间起过滤消息的作用,由直接拉取消息变成了:Consumer -> FilterServer -> Broker
            // 大概是想减轻Consumer的过滤压力?或者是过滤掉不应该由Consumer看到到敏感消息?
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }

        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 如果是Master,启动事务消息检查服务
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            // 如果是Slave,通过计划任务,每十秒执行一次,通过VIP通道向Master同步配置并更新本地缓存及持久化,包括TopicConfig,ConsumerOffset,DelayOffset及SubscriptionGroupConfig
            // 这里只同步配置,CommitLog在HAService同步
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            // 发送心跳,向NameServer广播REGISTER_BROKER的单向请求,包含Topic信息,NameServer据此决定路由信息
            this.registerBrokerAll(true, false, true);
        }
		// 计划任务,每十秒一次,向NameServer注册
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
		//启动统计服务
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
		//启动快速失败,定时清理超时客户端请求
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

DefaultMessageStore#start:读取配置,启动核心线程

public void start() throws Exception {
    //	启动前先获取锁
    lock = lockFile.getChannel().tryLock(0, 1, false);
    // 启动失败说明MQ已经正常启动
    if (lock == null || lock.isShared() || !lock.isValid()) {
        throw new RuntimeException("Lock failed,MQ already started");
    }
	//加锁刷盘
    lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
    lockFile.getChannel().force(true);
    {
        //通过commitLog的getMinOffset方法获取最小偏移量 maxPhysicalPosInLogicQueue的最大偏移量
        long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
        //遍历topic
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                    //队列的最大物理偏移量为当前ConsumeQueue的最大物理偏移量
                    maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                }
            }
        }
        if (maxPhysicalPosInLogicQueue < 0) {
            maxPhysicalPosInLogicQueue = 0;
        }
        if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
            maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
            log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
        }
        log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                 maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
        //设置分消息到ConsumeQueue和IndexService的起始偏移量
        this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
        //启动分发服务
        this.reputMessageService.start();
		
        while (true) {
            // 不断检查,直到CommitLog的MaxOffset和ReputMessageService的ReputFromOffset差值小于等于零
            if (dispatchBehindBytes() <= 0) {
                break;
            }
            Thread.sleep(1000);
            log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
        }
        this.recoverTopicQueueTable();
    }

    //默认 
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        //启用高可用服务
        this.haService.start();
        //如果Broker是master,启动ScheduleMessageService,处理延迟消息,否则关闭
        this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
    }
	//将ConsumeQueue刷到磁盘
    this.flushConsumeQueueService.start();
    //启动commitLog服务
    this.commitLog.start();
    //启动统计服务
    this.storeStatsService.start();
	//创建${user.home}/store/abort
    this.createTempFile();
    //添加定时任务
    this.addScheduleTask();
    this.shutdown = false;
}

CommitLog#getMinOffset

public long getMinOffset() {
    //MappedFile类主要是持有文件相关的属性
    MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
    if (mappedFile != null) {
        //获取第一个有效文件的offset
        if (mappedFile.isAvailable()) {
            return mappedFile.getFileFromOffset();
        } else {
            return this.rollNextFile(mappedFile.getFileFromOffset());
        }
    }

    return -1;
}

reputMessageService.start的是启动ReputMessageService类中定义好的run方法,而其核心是调用doReput方法:

从CommitLog读取消息发送至ConsumeQueue

private void doReput() {
    //如果分发的偏移量小于当前commitLog的最小偏移量
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                 this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
		//获取数据
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        //存在待reput的数据
        if (result != null) {
            try {
                //获取文件起始偏移量
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            //调用DefaultMessageStore.doDispatch方法
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
						//如果broker是master&&启用了长轮询
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                         //分发消息到messageArrivingListener,唤醒等待的PullRequest接收消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                                                          dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                                                          dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                                                          dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }
							// 累计更新Reput的起始字节偏移
                         	this.reputFromOffset += size;
                            readSize += size;
                            //如果Broker是Slave,累计
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                            // 是BLANK,读到了MappedFile文件尾
                        } else if (size == 0) {
                            //切换到新的文件
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                        //检验消息出错 可能是commitlog文件损坏
                    } else if (!dispatchRequest.isSuccess()) {
					   //消息 跳过
                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        //空白 填充
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                          this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                // 释放MappedFile的引用计数
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

首先看循环结束条件isCommitLogAvailable方法:

private boolean isCommitLogAvailable() {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

获取当前commitLog对象的最大偏移量:

public long getMaxOffset() {
    return this.mappedFileQueue.getMaxOffset();
}

public long getMaxOffset() {
    //获取最后一个文件
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        //返回物理偏移量+读指针位置
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

CommiLog.getData方法:

public SelectMappedBufferResult getData(final long offset) {
    return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
    //获取单个文件的大小 默认1024*1024*1024 = 1G
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    //根据偏移量获取mappedFile对象
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if (mappedFile != null) {
        //偏移量与文件大小取余
        int pos = (int) (offset % mappedFileSize);
        //读取数据到缓冲区
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }

    return null;
}

MappedFileQueue.findMappedFileByOffset方法:

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        //首先判断offset是否在当前MappedFileQueue的范围内
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                               offset,
                               firstMappedFile.getFileFromOffset(),
                               lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                               this.mappedFileSize,
                               this.mappedFiles.size());
            } else {
                //获取当前偏移量在MappedFileQueue中文件所对应的下表
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    //获取文件
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }
				//进一步检查
                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }
				//如果上面找到的不是目标文件,遍历所有的文件
                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }
			//没找到 是否返回第一个文件
            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

进入循环,调用Commit.checkMessageAndReturnSize方法封装DispatchRequest对象:

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
    try {
        // 1 TOTAL SIZE
        int totalSize = byteBuffer.getInt();

        // 2 MAGIC CODE
        int magicCode = byteBuffer.getInt();
        switch (magicCode) {
            case MESSAGE_MAGIC_CODE:
                break;
            case BLANK_MAGIC_CODE:
                return new DispatchRequest(0, true /* success */);
            default:
                log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
                return new DispatchRequest(-1, false /* success */);
        }

        byte[] bytesContent = new byte[totalSize];

        int bodyCRC = byteBuffer.getInt();

        int queueId = byteBuffer.getInt();

        int flag = byteBuffer.getInt();

        long queueOffset = byteBuffer.getLong();

        long physicOffset = byteBuffer.getLong();

        int sysFlag = byteBuffer.getInt();

        long bornTimeStamp = byteBuffer.getLong();

        ByteBuffer byteBuffer1;
        if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        long storeTimestamp = byteBuffer.getLong();

        ByteBuffer byteBuffer2;
        if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        int reconsumeTimes = byteBuffer.getInt();

        long preparedTransactionOffset = byteBuffer.getLong();

        int bodyLen = byteBuffer.getInt();
        if (bodyLen > 0) {
            if (readBody) {
                byteBuffer.get(bytesContent, 0, bodyLen);

                if (checkCRC) {
                    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
                    if (crc != bodyCRC) {
                        log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
                        return new DispatchRequest(-1, false/* success */);
                    }
                }
            } else {
                byteBuffer.position(byteBuffer.position() + bodyLen);
            }
        }

        byte topicLen = byteBuffer.get();
        byteBuffer.get(bytesContent, 0, topicLen);
        String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);

        long tagsCode = 0;
        String keys = "";
        String uniqKey = null;

        short propertiesLength = byteBuffer.getShort();
        Map<String, String> propertiesMap = null;
        if (propertiesLength > 0) {
            byteBuffer.get(bytesContent, 0, propertiesLength);
            String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
            propertiesMap = MessageDecoder.string2messageProperties(properties);

            keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);

            uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

            String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
            if (tags != null && tags.length() > 0) {
                tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
            }

            // Timing message processing
            {
                String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                    int delayLevel = Integer.parseInt(t);

                    if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                    }

                    if (delayLevel > 0) {
                        tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                                                                                                storeTimestamp);
                    }
                }
            }
        }

        int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
        if (totalSize != readLength) {
            doNothingForDeadCode(reconsumeTimes);
            doNothingForDeadCode(flag);
            doNothingForDeadCode(bornTimeStamp);
            doNothingForDeadCode(byteBuffer1);
            doNothingForDeadCode(byteBuffer2);
            log.error(
                "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
                totalSize, readLength, bodyLen, topicLen, propertiesLength);
            return new DispatchRequest(totalSize, false/* success */);
        }

        return new DispatchRequest(
            topic,
            queueId,
            physicOffset,
            totalSize,
            tagsCode,
            storeTimestamp,
            queueOffset,
            keys,
            uniqKey,
            sysFlag,
            preparedTransactionOffset,
            propertiesMap
        );
    } catch (Exception e) {
    }

    return new DispatchRequest(-1, false /* success */);
}

上面这个方法的功能是从bugger中读取一条消息,消息的结构如下:

1totalSize(4Byte)消息大小
2magicCode(4)设置为daa320a7
3bodyCRC(4)当broker重启recover时会校验
4queueId(4)消息对应的consumeQueueId
5flag(4)rocketmq不做处理,只存储后透传
6queueOffset(8)消息在consumeQueue中的偏移量
7physicalOffset(8)消息在commitlog中的偏移量
8sysFlg(4)事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
9bronTimestamp(8)消息产生端(producer)的时间戳
10bronHost(8)消息产生端(producer)地址(address:port)
11storeTimestamp(8)消息在broker存储时间
12storeHostAddress(8)消息存储到broker的地址(address:port)
13reconsumeTimes(4)消息重试次数
14preparedTransactionOffset(8)事务消息的物理偏移量
15bodyLength(4)消息长度,最长不超过4MB
16body(body length Bytes)消息体内容
17topicLength(1)主题长度,最长不超过255Byte
18topic(topic length Bytes)主题内容
19propertiesLength(2)消息属性长度,最长不超过65535Bytes
20properties(properties length Bytes)消息属性内容

然后调用ReputMessageService.doDispatch方法:

public void doDispatch(DispatchRequest req) {
    //遍历分发器列表
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        //执行分发
        dispatcher.dispatch(req);
    }
}

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        //获取事务类型
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE://非事务消息
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE://事务消息已提交
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

dispatcherList是在DefaultMessageStore初始化时创建的:

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

先看CommitLogDispatcherBuildConsumeQueue中的实现,当满足TRANSACTION_NOT_TYPE,TRANSACTION_COMMIT_TYPE这两个条件时,调用DefaultMessageStore.putMessagePositionInfo方法:

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //根据topic和queueid查找ConsumeQueue,不存在则直接创建
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

调用ConsumeQueue.putMessagePositionInfoWrapper方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {
    final int maxRetries = 30;
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        //是否额外写入信息 默认false
        if (isExtWriteEnable()) {
            //计算扩展的偏移量
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());
			
            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            //判断是否可以扩展
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                         topic, queueId, request.getCommitLogOffset());
            }
        }
        //调用putMessagePositionInfo
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                                                     request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        //写入成功
        if (result) {
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
 //设置StoreCheckpoint为消息存储时间
 this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {
            //写入失败 重试
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                     + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me 重试无效 记录日志
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

ConsumeQueue.putMessagePositionInfo:

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) {
	//判断是否已经处理过了
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
	//充值索引
    this.byteBufferIndex.flip();
    //ConsummeQueue单元大小,20个字节
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //消息在CommitLog的偏移量 8个字节
    this.byteBufferIndex.putLong(offset);
    //消息的大小 4个字节
    this.byteBufferIndex.putInt(size);
    //
    this.byteBufferIndex.putLong(tagsCode);
	//预期的队列偏移量 cqOffset:记录的消息数量  CQ_STORE_UNIT_SIZE:记录一条消息所占用的内存
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
	//获取存储消息信息的mappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
		//如果是第一个文件 且mappedFile还没有被写过
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                     + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            //当前的实际逻辑偏移量
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
			//期望的逻辑偏移量小于实际的逻辑偏移量 表明重复写入直接返回
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                         expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }
			//如果期望的逻辑偏移量不等于实际的逻辑偏移量 可能是个bug
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        //更新maxPhysicOffset,并将暂存在ByteBuffer中的消息偏移信息,追加到MappedFile中
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

MappedFile.appendMessage:

public boolean appendMessage(final byte[] data) {
    //获取当前写指针的位置
    int currentPos = this.wrotePosition.get();
	//追加数据
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        //移动写指针
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

CommitLogDispatcherBuildIndex.dispatch

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
//IndexFile是另一种形式的索引文件 可以根据消息的id等查询消息
    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

回到DefaultMessageStore.start方法,启动一个循环直到消息分发完毕:

public long dispatchBehindBytes() {
    return this.reputMessageService.behind();
}

public long behind() {
    //commitLog最大偏移减去当前偏移
    return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}

接着调用DefaultMessageStore.recoverTopicQueueTable方法:

//消费队列 
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

public void recoverTopicQueueTable() {
    HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
    long minPhyOffset = this.commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            //矫正最小偏移量
            logic.correctMinOffset(minPhyOffset);
        }
    }

    this.commitLog.setTopicQueueTable(table);
}

然后在非Deleger模式下会启用高可用服务.

接着开始执行flushConsumeQueueService.start()方法,启动刷盘线程,:

private void doFlush(int retryTimes) {
    // 默认2
    int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
    // 关闭时传入
    if (retryTimes == RETRY_TIMES_OVER) {
        // 强制Flush
        flushConsumeQueueLeastPages = 0;
    }
    long logicsMsgTimestamp = 0;
    // 默认60秒
    int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
    long currentTimeMillis = System.currentTimeMillis();
    // 离上次刷盘时间超出了间隔
    if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
        // 更新刷盘时间戳
        this.lastFlushTimestamp = currentTimeMillis;
        // 强制Flush
        flushConsumeQueueLeastPages = 0;
        // 更新检查点
        logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
    }
    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    // 遍历Topic
    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
        // 遍历ConsumeQueue
        for (ConsumeQueue cq : maps.values()) {
            boolean result = false;
            for (int i = 0; i < retryTimes && !result; i++) {
                // 刷盘
                result = cq.flush(flushConsumeQueueLeastPages);
            }
        }
    }
    // 如果是强制刷盘
    if (0 == flushConsumeQueueLeastPages) {
        if (logicsMsgTimestamp > 0) {
            DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
        }
        // 检查点也要刷盘
        DefaultMessageStore.this.getStoreCheckpoint().flush();
    }
}

然后调用ConsumeQueue.flush方法:

public boolean flush(final int flushLeastPages) {
    //队列映射文件刷新
    boolean result = this.mappedFileQueue.flush(flushLeastPages);
    //存在consume_ext文件 也要刷盘
    if (isExtReadEnable()) {
        result = result & this.consumeQueueExt.flush(flushLeastPages);
    }

    return result;
}

MappedFileQueue.flush方法:

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    //根据偏移量找到MappedFile对象
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        //更新flushedWhere
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

调用MappedFile.flush:

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            //获取读指针偏移量
            int value = getReadPosition();

            try {
                //追加数据到fileChannel或者mappedByteBuffer
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }

            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

刷盘机制

回到DefaultMessageStore.start()方法,ConsumeQueue刷盘启动后,调用CommitLog.start方法,启动CommitLog:

 public void start() {
     this.flushCommitLogService.start();

     if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
         this.commitLogService.start();
     }
 }

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YXqAiAgc-1624718853632)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘和异步刷盘.png)]

FlushCommitLogService拥有两个实现类GroupCommitServiceFlushRealTimeService分别对应了同步刷盘和异步刷盘.

提交刷盘请求

GroupCommitService中有两个List,用于刷盘时解耦:

private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

public static class GroupCommitRequest {
    private final long nextOffset;
    private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
    private final long startTimestamp = System.currentTimeMillis();
    private long timeoutMillis = Long.MAX_VALUE;

    public GroupCommitRequest(long nextOffset, long timeoutMillis) {
        this.nextOffset = nextOffset;
        this.timeoutMillis = timeoutMillis;
    }

    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset() {
        return nextOffset;
    }

    public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
        this.flushOKFuture.complete(putMessageStatus);
    }

    public CompletableFuture<PutMessageStatus> future() {
        return flushOKFuture;
    }

}

GroupCommitRequest对象在CommitLog.handleDiskFlush方法中被创建,当Broker接收到Producer的消息后会调用此方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        //获取刷盘服务
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            //封装刷盘请求 获取写指针位置以及字节数计算出nextOffset
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //提交刷盘请求
            service.putRequest(request);
            //获取请求结果
            CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
            PutMessageStatus flushStatus = null;
            try {
                //阻塞五秒,等待刷盘结束
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                          + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // 异步刷盘
    else {
        //未开启对堆内存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            //开启外堆内存
            commitLogService.wakeup();
        }
    }
}

调用GroupCommitService.putRequest方法:

public synchronized void putRequest(final GroupCommitRequest request) {
    //加锁添加请求
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    //唤醒线程
    this.wakeup();
}


public void wakeup() {
    //原子操作 将hasNotified的值从false改为true
    if (hasNotified.compareAndSet(false, true)) {
        //计数器减一 
        //protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
        //唤醒刷盘线程
        waitPoint.countDown(); // notify
    }
}

同步刷盘

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm7Sb1IZ-1624718853635)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘流程.png)]

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

GroupCommitService.run():

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            //调用waitForRunning方法
            this.waitForRunning(10);
            //调用doCommit方法
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}

异步刷盘

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K2dRDrWF-1624718853636)(E:\源码\学习笔记\RocketMQ学习随笔\img\异步刷盘流程.png)]

开启transientStorePoolEnable后异步刷盘步骤:

  1. 将消息直接追加到ByteBuffer(堆外内存)
  2. CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
  3. MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
  4. commit操作成功返回,将committedPosition位置恢复
  5. FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

  1. 未开启堆外内存FlushRealTimeService.run:

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            //是否使用定时刷盘
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
            //间隔时间 默认500毫秒
            int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            //刷盘页数 默认4页
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    		//彻底刷盘时间间隔 默认10s
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
            //打印刷盘进度
            boolean printFlushProgress = false;
    
            // Print flush progress 获取系统当前时间
            long currentTimeMillis = System.currentTimeMillis();
            //系统当前时间大于等于最后一次刷盘时间点+彻底刷盘时间间隔 则进行一次彻底刷盘
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }
    
            try {
                //启用定时刷盘
                if (flushCommitLogTimed) {
                    //休眠500毫秒
                    Thread.sleep(interval);
                } else {
                    //调用waitForRunning方法 刷盘线程是否被唤醒 进行500毫秒的阻塞
                    this.waitForRunning(interval);
                }
    
                //打印刷盘进程
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
    
                long begin = System.currentTimeMillis();
                //调用flush方法 开始刷盘
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    //更新刷盘时间点
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
    
        // Normal shutdown, to ensure that all the flush before exit
        //正常关闭的情况下 完成刷盘操作
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
    
        this.printFlushProgress();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    

    与同步刷盘不同的是,异步刷盘会传入页数:

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
    
        return result;
    }
    
    

    MappedFile.flush方法中:

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            ...
        }
    
    

    调用isAbleToFlush方法,判断是否进行刷盘:

    private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();
    	//写指针的位置==当前文件的大小
        if (this.isFull()) {
            return true;
        }
    	//最少刷盘页数大于0(同步刷盘不存在这种情况)
        if (flushLeastPages > 0) {
            //异步刷盘必须满足大于等于最小刷盘页数
            //public static final int OS_PAGE_SIZE = 1024 * 4;
    	   // 即在默认情况下 每次刷盘的数据量大于等于 4 * 4K
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }
    
        return write > flush;
    }
    
    
  2. 开启堆外内存CommitRealTimeService.run

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            //间隔时间,默认200ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    		//一次提交的至少页数
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    		//两次真实提交的最大间隔,默认200ms
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
    
            //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交
            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }
    
            try {
                //执行提交操作,将待提交数据提交到物理文件的内存映射区
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    //唤醒刷盘线程
                    flushCommitLogService.wakeup();
                }
    
                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                //调用waitForRunning方法
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    

刷盘操作

调用MappedFile.commit方法:

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //开启堆外内存的清空下 writeBuffer!= null
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    
    //判断是否满足提交条件
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    //所有的数据都被写入 回收writeBuffer
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

MappedFile.isAbleToCommit

private boolean isAbleToFlush(final int flushLeastPages) {
    //刷盘位置的指针
    int flush = this.flushedPosition.get();
    //有效数据的最大指针
    int write = getReadPosition();

    if (this.isFull()) {
        return true;
    }

    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }

    return write > flush;
}

MappedFile.commit0:

//将writerBuffer中的数据写入fileChannel中 更新指针的位置
protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - lastCommittedPosition > commitLeastPages) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

主从复制HA

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

(1)同步复制

同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。

(2)异步复制

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;

(3)配置

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

Master

HAService#Start 服务启动:

public void start() throws Exception {
    // 打开ServerSocketChannel,监听端口,注册OP_ACCEPT事件
    this.acceptSocketService.beginAccept();
    // 启动AcceptSocketService,接收Slave的连接请求
    this.acceptSocketService.start();
     // 启动GroupTransferService,处理同步Master情况下的主从同步,只是起检查通知的作用
    this.groupTransferService.start();
    // 启动HAClient,主动连接Master
    this.haClient.start();
}

HAService$AcceptSocketService#beginAccept:初始化netty服务端设置

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

HAService$AcceptSocketService#run 启动服务端监听线程

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            //获取就绪事件
            Set<SelectionKey> selected = this.selector.selectedKeys();

            if (selected != null) {
                //遍历事件
                for (SelectionKey k : selected) {
                    //连接事件
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        //接收连接请求 返回slave的SocketChannel
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                               + sc.socket().getRemoteSocketAddress());

                            try {
                                //生成高可用连接对象
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                conn.start();
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

HAConnection#start 高可用连接启动

 public void start() {
     //处理slave的读请求
     this.readSocketService.start();
     //处理slave的写请求
     this.writeSocketService.start();
 }

HAConnection$ReadSocketService#run ` :

public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            //事件处理
            boolean ok = this.processReadEvent();
            //处理失败 关闭
            if (!ok) {
                HAConnection.log.error("processReadEvent error");
                break;
            }

            //计算间隔事件
            long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
            //超时未收到slave的请求 关闭
            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                break;
            }
        } catch (Exception e) {
            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    //关闭后续处理 资源回收逻辑
    this.makeStop();

    writeSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    HAConnection.this.haService.getConnectionCount().decrementAndGet();

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

读请求处理

HAConnection$ReadSocketService#processReadEvent 处理读事件:

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    //缓冲已读完
    if (!this.byteBufferRead.hasRemaining()) {
        //重置索引
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }

    //直到缓冲读完
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读取数据到读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            //读到了数据
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                //更新时间戳
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 读缓冲的写索引 - 上次处理位置 >= 8字节的Slave的偏移说明收到了足够的数据包
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    //可能有多个数据包 从第一个未处理的数据包开始
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    // 从读缓冲读取Slave的CommitLog同步偏移,8字节
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    //更新处理未知
                    this.processPosition = pos;

                    //应该响应的slave的CommitLog偏移
                    HAConnection.this.slaveAckOffset = readOffset;
                    //第一次接收到请求
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        //更新slave发送请求时CommitLog的偏移
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    }
//根据Slave响应的同步偏移进度,通知HAService的GroupTransferService判断同步进度,是否解除Broker的Slave同步请求的阻塞
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                //最多重试三次
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}

HAService#notifyTransferSome: 更新已同步的偏移量

 public void notifyTransferSome(final long offset) {
     // Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
     for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
         // 尝试更新偏移
         boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
         if (ok) {
             // 通知已经传输了数据
             this.groupTransferService.notifyTransferSome();
             break;
         } else {
             // 再次获取当前已向Slave传输的CommitLog最大偏移
             value = this.push2SlaveMaxOffset.get();
         }
     }
 }

写请求处理

HAConnection$WriteSocketService#run:

public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
			//初始值=-1 未收到slave的同步请求
            if (-1 == HAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }

            // 初始值
            if (-1 == this.nextTransferFromWhere) {
                //已同步偏移量为0 之前从未同步过
                if (0 == HAConnection.this.slaveRequestOffset) {
                    // 获取当前Master的CommitLog的最大物理偏移
                    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    
                    // 截掉后面的不够单个CommitLog大小的偏移
                    masterOffset =
                        masterOffset
                        - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                           .getMappedFileSizeCommitLog());

                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }

                    // 更新下次传输的偏移起点为最后一个CommitLog文件的起始偏移
                    this.nextTransferFromWhere = masterOffset;
                } else {
                    //更新下次传输的偏移起点为Slave发送请求时的偏移设置
                    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                         + "], and slave request " + HAConnection.this.slaveRequestOffset);
            }

            // 上次向Slave写完了数据
            if (this.lastWriteOver) {

                 // 写间隔时间
                long interval =
                    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

                //间隔时间大于设置的时间 向slave写
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaSendHeartbeatInterval()) {

                    // Build Header
                    //心跳包
                    // 重置Header的写索引
                    this.byteBufferHeader.position(0);
                    //Header大小 12个字节
                    this.byteBufferHeader.limit(headerSize);
                    //偏移量 8字节
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    //消息体大小 4字节
                    this.byteBufferHeader.putInt(0);
                    //切换为读模式
                    this.byteBufferHeader.flip();

                    // 向Slave传输数据/心跳
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
            } else {
                //未写完则继续写入
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }
			// 根据起始偏移获取CommitLog数据
            SelectMappedBufferResult selectResult =
                HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            
            if (selectResult != null) {
                // CommitLog数据的大小
                int size = selectResult.getSize();
                //大于传输限制大小 截断
                if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }

                //本次传输数据起始偏移
                long thisOffset = this.nextTransferFromWhere;
                //下次传输数据起始偏移
                this.nextTransferFromWhere += size;

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // Build Header 封装消息头
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();
				//向slave传输数据
                this.lastWriteOver = this.transferData();
            } else {
				// 等待CommitLog有消息唤醒或超时
                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    //服务关闭 资源释放
    HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    this.makeStop();

    readSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

HAConnection$WriteSocketService#transferData 向slave传输数据:

private boolean transferData() throws Exception {
    int writeSizeZeroTimes = 0;
    // Write Header
    //向SocketChannel写入header 直到写完
    while (this.byteBufferHeader.hasRemaining()) {
        int writeSize = this.socketChannel.write(this.byteBufferHeader);
        if (writeSize > 0) {
            writeSizeZeroTimes = 0;
            //更新写入时间戳
            this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        } else if (writeSize == 0) {
            //最多重试三次
            if (++writeSizeZeroTimes >= 3) {
                break;
            }
        } else {
            throw new Exception("ha master write header error < 0");
        }
    }

    //没有数据就不用写消息体了
    if (null == this.selectMappedBufferResult) {
        return !this.byteBufferHeader.hasRemaining();
    }

    writeSizeZeroTimes = 0;

    // Write Body // 如果Header已经读完,写消息体
    if (!this.byteBufferHeader.hasRemaining()) {
         // 直到要传输的CommitLog数据全部读完
        while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                //做多重试三次
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                throw new Exception("ha master write body error < 0");
            }
        }
    }

    //消息头和消息体全部读完
    boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

    //消息体已读完
    if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
        //释放
        this.selectMappedBufferResult.release();
        this.selectMappedBufferResult = null;
    }

    return result;
}

Slave

HAService$HAClient#run 客户端启动

public void run() {
    while (!this.isStopped()) {
        try {
            // 连接Master成功
            if (this.connectMaster()) {
                // 离上次向Master上报进度的间隔到期了,默认5s
                if (this.isTimeToReportOffset()) {
                    // 向Master发送当前Slave的CommitLog的最大偏移,同时也是心跳
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }
                // 阻塞超时至可读事件就绪
                this.selector.select(1000);
                // 处理读事件,即Master传输的CommitLog数据
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }
                // 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }
                // 上次向Master发送数据的时间间隔
                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                // 认为连接超时,关闭连接
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    this.closeMaster();
                }
            } else {
                // 唤醒后再尝试
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            this.waitForRunning(1000 * 5);
        }
    }
}

HAService$HAClient#connectMaster 连接master

private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {
                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        // 连接Master,返回SocketAddress
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            // 关心可读事件
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }
                // 当前Slave的CommitLog最大物理偏移
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
            return this.socketChannel != null;
        }

写请求处理

HAService$HAClient#reportSlaveMaxOffset 上报当前CommitLog最大偏移量:

private boolean reportSlaveMaxOffset(final long maxOffset) {
    //重置写索引
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    //写入最大偏移量
    this.reportOffset.putLong(maxOffset);
    //重置读索引
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            //上报偏移量
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                      + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    //更新时间戳
    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    //返回缓冲是否发完
    return !this.reportOffset.hasRemaining();
}

读请求处理

HAService$HAClient#processReadEvent

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    // 读缓冲可写
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 将数据从SocketChannel写入读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            // 读取到了数据
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                // 分发读请求
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    return false;
                }
            }
            // 没读到数据
            else if (readSize == 0) {
                // 重试三次没读到数据就退出
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                return false;
            }
        } catch (IOException e) {
            return false;
        }
    }
    return true;
}

HAService$HAClient#dispatchReadRequest 分发读请求:

 private boolean dispatchReadRequest() {
     //消息头大小 12个字节
     final int msgHeaderSize = 8 + 4; // phyoffset + size
     //起始偏移量
     int readSocketPos = this.byteBufferRead.position();

     while (true) {
          dispatchPosition初始为0,可理解为当前消息包的在读缓冲的起始偏移,因为多条消息包是累加到读缓冲上的,而不是读完一条清一条
         // 上条消息处理后,读缓冲有新的数据进来
         int diff = this.byteBufferRead.position() - this.dispatchPosition;
         // 包含了一个完整的Header
         if (diff >= msgHeaderSize) {
              // 从读缓冲的指定位置读取8字节的Master传输的CommitLog片段的起始物理偏移
             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
             //获取消息体大小
             int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
		// Slave当前CommitLog的最大物理偏移
             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

             if (slavePhyOffset != 0) {
                 // 增量同步,如果Slave的CommitLog最大物理偏移和本次Master同步的CommitLog片段起始偏移不一致
                 if (slavePhyOffset != masterPhyOffset) {
                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                               + slavePhyOffset + " MASTER: " + masterPhyOffset);
                     //发生异常 关闭连接
                     return false;
                 }
             }

              // 包含了一个完整的消息头和消息体
             if (diff >= (msgHeaderSize + bodySize)) {
                 //初始化数组 用于存储消息体
                 byte[] bodyData = new byte[bodySize];
                 // 设置读索引
                 this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                 
                 //获取消息体
                 this.byteBufferRead.get(bodyData);

                 // 把Master同步过来的CommitLog添加到Slave的CommitLog
                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                 // 恢复读缓冲的写索引
                 this.byteBufferRead.position(readSocketPos);
                 
                 // 更新处理偏移
                 this.dispatchPosition += msgHeaderSize + bodySize;

                 // 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
                 if (!reportSlaveMaxOffsetPlus()) {
                     //发送失败 关闭连接
                     return false;
                 }

                 continue;
             }
         }

         // 读缓冲写满了
         if (!this.byteBufferRead.hasRemaining()) {
             //重新分配空间
             this.reallocateByteBuffer();
         }

         break;
     }

     return true;
 }

HAService$HAClient#reportSlaveMaxOffsetPlus 同步成功后向Master上报:

private boolean reportSlaveMaxOffsetPlus() {
    boolean result = true;
    long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    // 当前Slave的CommitLog的最大物理偏移 > 上次向Master上报的偏移
    if (currentPhyOffset > this.currentReportedOffset) {
        // 说明本次同步成功,更新下次向Master上报的偏移
        this.currentReportedOffset = currentPhyOffset;
        // 继续上报Master
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
            this.closeMaster();
        }
    }
    return result;
}

主从同步复制

CommitLog#putMessage 消息接收

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 消息写入CommitLog的MappedFile,略
    // 处理刷盘
    handleDiskFlush(result, putMessageResult, msg);
    // 处理高可用,如果Broker是SYNC_MASTER,则等SLAVE接收到数据后才返回,如果是ASYNC_MASTER,交给HAService线程执行同步
    handleHA(result, putMessageResult, msg);
    return putMessageResult;
}

CommitLog#handleHA同步情况下处理高可用

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    //只有设置主从同步才会执行
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        // 如果要等待存储结果是否OK
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait 
            // 存在Slave连接 && Slave和Master进度差不超过1024 * 1024 * 256 = 256MB
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                // 创建提交请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                // 加入HAService的GroupTransferService的队列
                service.putRequest(request);
                // 唤醒HAConnection的WriteSocketService线程,向Slave同步新的CommitLog
                service.getWaitNotifyObject().wakeupAll();
                PutMessageStatus replicaStatus = null;
                try {
                    // 阻塞等同步完成被HAService的GroupTransferService线程唤醒或超时
                    replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                                         TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                }
                //同步失败 通知生产者 同步slave超时
                if (replicaStatus != PutMessageStatus.PUT_OK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                              + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem 没有slave 通知生产者slave不可用
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }

}

HAService$GroupTransferService#putRequest 提交同步复制请求

 /**
         * 请求先提交到写队列,每次处理前,先交换读写队列,然后从读队列获取请求,提交后再清空读队列,避免多线程操作问题
         * CommitLog的内部类GroupCommitService也是同样的处理
         */
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
    // 获取写队列的同步锁,因为swapRequests方法会交换写队列和读队列
    synchronized (this.requestsWrite) {
        // 添加到写队列
        this.requestsWrite.add(request);
    }
    // 新请求加入时,没唤醒就唤醒GroupTransferService线程
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

HAService$GroupTransferService#run

 public void run() {
     while (!this.isStopped()) {
         try {
             // 如果唤醒标志为已唤醒(有请求加入),重置标记为未唤醒,并交换读写队列
             // 如果唤醒标记为未唤醒,则阻塞超时等待唤醒,最后重置标记为未唤醒,并交换读写队列
             this.waitForRunning(10);
             // 执行等待传输
             this.doWaitTransfer();
         } catch (Exception e) {
         }
     }
 }

HAService$GroupTransferService#doWaitTransfer:

private void doWaitTransfer() {
    // 获取读队列的同步锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            // 遍历Slave的同步请求
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                // Slave回复确认的CommitLog最大偏移 > 请求的消息在CommitLog的结束位置
                // 说明同步传输成功
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                // 重试5次或同步传输成功
                for (int i = 0; !transferOK && i < 5; i++) {
                    // 先阻塞1秒,或者HAConnection的ReadSocketService线程收到Slave的同步偏移确认时唤醒
                    this.notifyTransferObject.waitForRunning(1000);
                    // 再检查一次
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                // ...
                // 唤醒提交同步Slave请求的Broker线程,是否同步成功
                req.wakeupCustomer(transferOK);
            }
            // 处理完就清空读队列
            this.requestsRead.clear();
        }
    }
}

HAConnection$ReadSocketService#processReadEvent

private boolean processReadEvent() {
    // ...
    // 直到读完读缓冲
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读取数据到读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            // 读取到了数据
            if (readSize > 0) {
                // ...
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    // ...
                    // 从读缓冲读取Slave的CommitLog同步偏移,8字节
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    // ...
                    HAConnection.this.slaveAckOffset = readOffset;
                    // ...
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                // ...
            } else {
                return false;
            }
        } catch (IOException e) {
            return false;
        }
    }
    return true;
}

HAService#notifyTransferSome

public void notifyTransferSome(final long offset) {
    // Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        // 尝试更新偏移
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            // 通知已经传输了数据
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            // 再次获取当前已向Slave传输的CommitLog最大偏移
            value = this.push2SlaveMaxOffset.get();
        }
    }
}

HAService$GroupTransferService#notifyTransferSome

  public void notifyTransferSome() {
      this.notifyTransferObject.wakeup();
  }

标签:null,log,Broker,final,new,CommitLog,随笔,public,RocketMQ
来源: https://blog.csdn.net/xxy_hl/article/details/118255903