rocketmq源码解析之NamesrvController启动②创建mqclient①
作者:互联网
说在前面
接上次
源码解析
返回到这个方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST://服务只启动,不创建
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
// 创建mqclient对象 =》
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
// 注册管理服务处理器=》
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ "] has created already, specifed another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
// 启动mqclient =》
mqClientInstance.start();
log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
进入这个方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)创建mqclient对象
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
// 从本地缓存中获取client对象,简单的一般会concurrentHashMap当本地缓存,性能很高
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
进入到这个方法org.apache.rocketmq.client.impl.factory.MQClientInstance#registerAdminExt注册管理服务
public boolean registerAdminExt(final String group, final MQAdminExtInner admin) {
if (null == group || null == admin) {
return false;
}
MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
if (prev != null) {
log.warn("the admin group[{}] exist already.", group);
return false;
}
return true;
}
进入这个方法启动mqclient,org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST://仅创建不启动
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server 如果启动的时候命令行没有指定name server的地址,就去获取
if (null == this.clientConfig.getNamesrvAddr()) {
// 监测连接是否可用
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel 启动请求响应的channel =》
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动调度任务
this.startScheduledTask();
// Start pull service 启动pull服务
this.pullMessageService.start();
// Start rebalance service 启动负载均衡服务
this.rebalanceService.start();
// Start push service 启动push服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
启动mqclient,进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// 设置请求、响应消息大小值默认是65535
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 添加事件组
defaultEventExecutorGroup,
// 注册netty编码器 =》
new NettyEncoder(),
// 注册netty解码器=》
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// netty连接管理handler=》
new NettyConnectManageHandler(),
// 注册netty client handler=》
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 扫描废弃的请求=》
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
netty编码器
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 对消息头进行编码,rocketmq只对消息头进行了编码
ByteBuffer header = remotingCommand.encodeHeader();
// 往buf中写消息头
out.writeBytes(header);
// 获取消息体
byte[] body = remotingCommand.getBody();
if (body != null) {
// 写消息体
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
// 出现异常关闭channel
RemotingUtil.closeChannel(ctx.channel());
}
}
}
对消息头进行编码,进入这个方法org.apache.rocketmq.remoting.protocol.RemotingCommand#encodeHeader(int)
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size 消息头长度
int length = 4;
// 2> header data length
byte[] headerData;
// 消息头数据编码
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
// 分配缓冲区
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
进入这个方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerEncode
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
// mq代理编码
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
// json编码
return RemotingSerializable.encode(this);
}
}
进入这个方法mq协议编码org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolEncode
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
// map形式的参数序列化
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
// 计算总长
int totalLen = calTotalLen(remarkLen, extLen);
// 分配缓冲区
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
}
创建netty解码器
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
// 这里通过netty LengthFieldBasedFrameDecoder解决粘包问题,可以方便在消息头中定义一些规则,根据一定的规则进行解码,比如总消息长度、编解码方式
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
进入这个方法,消息解码org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(java.nio.ByteBuffer)
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
// 源消息头长度
int oriHeaderLen = byteBuffer.getInt();
// 消息头长度
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
// 根据消息头中传入的序列化类型解码 =》
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
消息头解码,进入这个方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerDecode
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON: //header json形式解码
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ: //mq代理反序列化
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
mq协议解码,进入这个方法org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolDecode
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
// 把消息头byte数组包装成缓冲区
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
// map形式数据反序列化
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
未完待续。
说在最后
本次解析仅代表个人观点,仅供参考。
技术交流群
标签:mqclient,int,cmd,length,源码,headerBuffer,new,NamesrvController,null 来源: https://blog.csdn.net/xunzhaotianhe/article/details/99676069