rocketmq源码解析发送消息处理器①
作者:互联网
说在前面
发送消息处理器
源码解析
进入这个方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
// 消费者发送消息返回=》
return this.consumerSendMsgBack(ctx, request);
default:
// 解析请求消息头,队序列化做了些优化,消息头中字段过多,字段过长并发情况下会对序列化效率产生影响=》
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// 构建消息上下文=》
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 发送消息之前执行钩子方法=》
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
// 批量消息发送=》
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 发送消息=》
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
// 执行发送消息之后的钩子方法=》
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
进入这个方法,消费者发送消息返回,org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
// 执行消费消息的钩子方法
this.executeConsumeMessageHookAfter(context);
}
// 获取消费组的订阅配置信息=》
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
// 无写权限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
// 重试topic=%RETRY%+消费组
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 在发送消息返回后创建topic配置信息=》
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
// 无写权限
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
// 按offset查询消息=》
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
// 最大重试次数,默认16次
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
// 重试16次后
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 创建%DLQ%+消费组 topic
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
// 创建topic配置信息在发送消息返回=》
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 存储消息=》
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
进入这个方法,执行消费消息的钩子方法,org.apache.rocketmq.broker.processor.SendMessageProcessor#executeConsumeMessageHookAfter
public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
if (hasConsumeMessageHook()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
// 客户可以实现自己的消费消息后的钩子方法
hook.consumeMessageAfter(context);
} catch (Throwable e) {
// Ignore
}
}
}
}
往上返回到这个方法,获取消费组的订阅配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig前面介绍过了。
往上返回到这个方法,在发送消息返回后创建topic配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod
public TopicConfig createTopicInSendMessageBackMethod(
final String topic,
final int clientDefaultTopicQueueNums,
final int perm,
final int topicSysFlag) {
// 获取topic配置信息
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
log.info("create new topic {}", topicConfig);
// 存储重试的tpic配置信息
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
// 修饰数据的版本号
this.dataVersion.nextVersion();
// 持久化=》
this.persist();
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageBackMethod exception", e);
}
// 如果topic配置信息是重新创建的,注册到broker集群中=》
if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
}
进入这个方法,如果topic配置信息是重新创建的,注册到broker集群中,org.apache.rocketmq.broker.BrokerController#registerBrokerAll前面介绍过了。
往上返回到这个方法,按offset查询消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)前面介绍过了。
往上返回到这个方法,创建topic配置信息在发送消息返回,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod前面介绍过了。
往上返回到这个方法,存储消息,org.apache.rocketmq.store.DefaultMessageStore#putMessage前面介绍过了。
往上返回到这个方法,解析请求消息头,队序列化做了些优化,消息头中字段过多,字段过长并发情况下会对序列化效率产生影响,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
default:
break;
}
return requestHeader;
}
返回到这个方法,发送消息之前执行钩子方法,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#executeSendMessageHookBefore
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
SendMessageContext context) {
if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
// 解析消息头,对序列化的优化
final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (null != requestHeader) {
context.setProducerGroup(requestHeader.getProducerGroup());
context.setTopic(requestHeader.getTopic());
context.setBodyLength(request.getBody().length);
context.setMsgProps(requestHeader.getProperties());
context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
context.setBrokerAddr(this.brokerController.getBrokerAddr());
context.setQueueId(requestHeader.getQueueId());
}
// 执行发送消息之前的钩子方法
hook.sendMessageBefore(context);
if (requestHeader != null) {
requestHeader.setProperties(context.getMsgProps());
}
} catch (Throwable e) {
// Ignore
}
}
}
}
返回到这个方法,批量消息发送,org.apache.rocketmq.broker.processor.SendMessageProcessor#sendBatchMessage
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
// 开始接受发送请求的时间,默认是立即发送
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
// 消息检查=》
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
int queueIdInt = requestHeader.getQueueId();
// 查询topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// topic长度大于127,非法
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " + requestHeader.getTopic().length());
return response;
}
// 如果是重试topic,非法
if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
return response;
}
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
messageExtBatch.setSysFlag(sysFlag);
messageExtBatch.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// 存储批量消息=》
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
// 解析存储结果=》
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
}
进入这个方法,消息检查,org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
// 不是自动创建的topic不能自动发送消息=》
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
// 查询topic配置
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
// 根据是不是重试topic设置topicSysFlag
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
// 创建topic配置在发送消息之后=》
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
// 如果是重试topic
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// 在发送消息之后创建topic配置=》
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
进入这个方法,创建topic配置在发送消息之后,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageMethod前面介绍过了。
接下篇。
说在最后
本次解析仅代表个人观点,仅供参考。
钉钉技术群
标签:topicConfig,request,topic,requestHeader,源码,处理器,return,response,rocketmq 来源: https://blog.csdn.net/qq_23283355/article/details/100075488