consumeQueue 和 indexFile 文件
作者:互联网
broker 把消息写入 commitLog 后,还需要把消息的索引写入 consumeQueue 文件 和 indexFile 文件
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService @Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); } private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } // 读取 commitLog 文件 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 解析消息,生成 DispatchRequest 对象 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 把 DispatchRequest 分发出去 DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
consumeQueue 文件,一个 entry 20 字节,8 + 4 + 8,8 字节 commitLog offset,4 字节消息 size,8 字节 tag hashcode,当消息为延时消息时,tag 字段存放超时时间戳
// org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean) // Timing message processing { String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); } if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp); } } }
处理 DispatchRequest 的逻辑
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } } class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } }
broker 为消息的 UNIQ_KEY 和 key 字段建立索引,索引保存在 indexFile 文件中
public void buildIndex(DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return; } final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: break; case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: return; } if (req.getUniqKey() != null) { indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } } } } else { log.error("build index error, stop building index"); } }
标签:文件,DefaultMessageStore,indexFile,reputFromOffset,MessageSysFlag,consumeQueue,dis 来源: https://www.cnblogs.com/allenwas3/p/11993971.html