rocketmq源码解析消息拉取处理器①
作者:互联网
说在前面
消息拉取处理器
源码解析
进入这个方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
进入这个方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
// 请求编号
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, {}", request);
// 没有读取权限
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
// 找到消费组的订阅信息=》
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
// 如果消费组订阅信息不允许消费
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
// 查找topic配置信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
// 没有topic配置信息读的权限
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
// 构建订阅信息=》
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
// 如果订阅类型是tasg,构建过滤消费的数据=》
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
// 获取消费组信息
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
// 订阅信息中不是广播消费是集群消费,消费组信息的消息模式是广播消费
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
// 从消费组信息中找到订阅数据
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
// 订阅数据中版本号小于请求数据中版本号,订阅数据不是最新的
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
// 订阅信息中表达式类型不是tag类型
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
// 按topic过滤的数据中获取要过滤的消费数据=》
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
}
// 消费过滤数据中的版本号小于请求的版本号,过滤数据不是最新的
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
}
}
}
// 消息过滤模式不是tag,broker配置中不允许过滤
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 获取消息=》
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine 消费太慢直接转向另一台机器
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
context.setTopic(requestHeader.getTopic());
context.setQueueId(requestHeader.getQueueId());
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
switch (response.getCode()) {
case ResponseCode.SUCCESS:
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
context.setCommercialRcvTimes(incValue);
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
context.setCommercialOwner(owner);
break;
case ResponseCode.PULL_NOT_FOUND:
if (!brokerAllowSuspend) {
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
}
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
break;
default:
assert false;
break;
}
// 执行消息消费的钩子方法在消费消息之前=》
this.executeConsumeMessageHookBefore(context);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}
response = null;
}
break;
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
// 不是长轮询拉取消息获取短轮询的超时时间
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 构建拉取消息请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 暂停消息拉取服务=》
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
// 提交offset=》
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
进入这个方法,找到消费组的订阅信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
// 从缓存中获取组的订阅信息
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
// 自动创建消费组或者是系统自用的消费组
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
if (null == preConfig) {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
// 更新数据的版本号
this.dataVersion.nextVersion();
// 持久化=》
this.persist();
}
}
return subscriptionGroupConfig;
}
进入这个方法,构建订阅信息,org.apache.rocketmq.common.filter.FilterAPI#build
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
if (ExpressionType.TAG.equals(type) || type == null) {
// =》
return buildSubscriptionData(null, topic, subString);
}
if (subString == null || subString.length() < 1) {
throw new IllegalArgumentException("Expression can't be null! " + type);
}
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
subscriptionData.setExpressionType(type);
return subscriptionData;
}
进入这个方法,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
// 订阅所有的topic
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// 解析tags,多个tag用|分开
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// 解析订阅信息中的tag
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
往上返回到这个方法,如果订阅类型是tag,构建过滤消费的数据,org.apache.rocketmq.broker.filter.ConsumerFilterManager#build
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
ConsumerFilterData consumerFilterData = new ConsumerFilterData();
consumerFilterData.setTopic(topic);
consumerFilterData.setConsumerGroup(consumerGroup);
consumerFilterData.setBornTime(System.currentTimeMillis());
consumerFilterData.setDeadTime(0);
consumerFilterData.setExpression(expression);
consumerFilterData.setExpressionType(type);
consumerFilterData.setClientVersion(clientVersion);
try {
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
} catch (Throwable e) {
log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());
return null;
}
return consumerFilterData;
}
往上返回到这个方法,按topic过滤的数据中获取要过滤的消费数据,org.apache.rocketmq.broker.filter.ConsumerFilterManager#get(java.lang.String, java.lang.String)
public ConsumerFilterData get(final String topic, final String consumerGroup) {
if (!this.filterDataByTopic.containsKey(topic)) {
return null;
}
if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
return null;
}
return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);
}
往上返回到这个方法,获取消息,org.apache.rocketmq.store.DefaultMessageStore#getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// 消息存储服务关闭了
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
}
// 消息存储不能读取
if (!this.runningFlags.isReadable()) {
log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
return null;
}
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
// 从commitLog中获取最大的offset=》
final long maxOffsetPy = this.commitLog.getMaxOffset();
// 找到指定topic、queueId的消息队列=》
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
// 找到最小的offset
minOffset = consumeQueue.getMinOffsetInQueue();
// 找到最大的offset=》
maxOffset = consumeQueue.getMaxOffsetInQueue();
// 消息队列中没有消息
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
// offset小于最小的offset了
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
// offset太大,大于最大的offset了
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
// offset太大,超过最大的offset了
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
// 获取消费队列的buffer=》
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
// 检查磁盘
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
// 扩展消息存储文件=》
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
// 从commitLog中找到offset和大小的buffer=》
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
// 消息删除
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
// 根据offset找到下个映射文件=》
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
进入这个方法,从commitLog中获取最大的offset,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() {
// 获取存储映射文件队列中索引位置最大的映射文件=》
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
// 映射文件的起始offset+映射文件的可读取的索引位置
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
// 如果队列中没有存储映射文件直接返回0
return 0;
}
进入这个方法,获取存储映射文件队列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
往上返回到这个方法,找到指定topic、queueId的消息队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
// 找到topic的所有消息队列
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
// 按queue id查找消费者队列
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
// 消费者队列存储地址 user.home/store/consumequeue
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
// 每个文件存储默认30W
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
往上返回到这个方法,找到最大的offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue
public long getMaxOffsetInQueue() {
// =》
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
进入这个方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset上面介绍过了。
往上返回到这个方法,获取消费队列的buffer,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
// 获取最小的物理offset
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
// 根据offset查询映射文件 =》
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
进入这个方法,根据offset查询映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)
public MappedFile findMappedFileByOffset(final long offset) {
// =》
return findMappedFileByOffset(offset, false);
}
进入这个方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
// 获取队列中第一个映射文件
MappedFile firstMappedFile = this.getFirstMappedFile();
// 获取队列中最后一个映射文件
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
// 如果offset不在索引文件的offset范围内
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
// 找到映射文件在队列中的索引位置
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
// 获取索引文件
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
// offset在目标文件的起始offset和结束offset范围内
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
// 如果按索引在队列中找不到映射文件就遍历队列查找映射文件
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
// 如果offset=0获取队列中第一个映射文件,个人感觉这个逻辑是否放在前面判断更为合理,还是放在这里另有深意
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
往上返回到这个方法,从commitLog中找到offset和大小的buffer,org.apache.rocketmq.store.CommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
// 根据offset找到映射文件 =》
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
接下篇。
说在最后
本次解析仅代表个人观点,仅供参考。
钉钉技术群
标签:return,拉取,topic,requestHeader,源码,offset,null,response,rocketmq 来源: https://blog.csdn.net/qq_23283355/article/details/100075415