sofa-bolt源码阅读(2)-客户端的启动
作者:互联网
sofa客户端访问服务器分为两步,第一步是初始化工作,第二步是建立连接。典型的代码是
// 1. create a rpc client
RpcClient client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.startup();
// 4. invoke
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
// 5. close
client.shutdown();
RpcClient.startup完成了初始化工作,包括连接的管理(ConnectionManager)、监控(DefaultConnectionMonitor)和重连(ReconnectManager)。
2.1 连接管理
ConnectionManager负责对连接进行管理。每一个url对应一个poolkey,每一个poolkey创建一个ConnectionPool,
一个ConnectionPool维护多个连接。如上图所示,当需要一个连接时,ConnectionManager从ConnectionPool中按照连接选择策略ConnectionSelectStrategy获取一个连接。
初始化连接管理器
com.alipay.remoting.rpc.RpcClient#startup ConnectionSelectStrategy connectionSelectStrategy = option(BoltGenericOption.CONNECTION_SELECT_STRATEGY); if (connectionSelectStrategy == null) { connectionSelectStrategy = new RandomSelectStrategy(switches()); } this.connectionManager = new DefaultClientConnectionManager(connectionSelectStrategy, new RpcConnectionFactory(userProcessors, this), connectionEventHandler, connectionEventListener, switches()); this.connectionManager.setAddressParser(this.addressParser); this.connectionManager.startup();
连接选择策略 ConnectionSelectStrategy
在获取Connection的时候,ConnectPool会调用ConnectionSelectStrategy来选择某一个Connection。
com.alipay.remoting.ConnectionPool#get public Connection get() { //更新访问时间 markAccess(); if (null != connections) { List<Connection> snapshot = new ArrayList<Connection>(connections); if (snapshot.size() > 0) { //按选择策略选择Connection return strategy.select(snapshot); } else { return null; } } else { return null; } }
以随机选择策略RandomSelectStrategy为例,使用一个随机变量随机从传入的连接列表中获取一个正常的连接
com.alipay.remoting.RandomSelectStrategy#select public Connection select(List<Connection> connections) { //从connections列表中随机获取一个正常的连接 Connection result = randomGet(connections); return result; } com.alipay.remoting.RandomSelectStrategy#randomGet private Connection randomGet(List<Connection> connections) { if (null == connections || connections.isEmpty()) { return null; } int size = connections.size(); int tries = 0; Connection result = null; while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) { result = connections.get(this.random.nextInt(size)); } if (result != null && !result.isFine()) { result = null; } return result; }
地址解析器 RpcAddressParser
RpcAddressParser将一个url字符串解析为一个Url对象,例如
连接工厂类 RpcConnectionFactory
用来创建ConnectionPool
事件处理器 ConnectionEventHandler
实现了ChannelHandler,会被注册到netty的pipeline里面。当产生连接打开或连接关闭事件时,转发给ConnectionEventListener处理
事件监听器 ConnectionEventListener
配合ConnectionEventHandler使用,实现事件产生时具体的业务处理
启动连接管理器
com.alipay.remoting.DefaultClientConnectionManager#startup @Override public void startup() throws LifeCycleException { //更新状态为启动 super.startup(); this.connectionEventHandler.setConnectionManager(this); this.connectionEventHandler.setConnectionEventListener(connectionEventListener); this.connectionFactory.init(connectionEventHandler); }
startup方法依次执行
更新状态为启动
初始化事件处理器
初始化连接工厂类
connectionFactory.init方法完成对客户端netty的配置
com.alipay.remoting.connection.AbstractConnectionFactory#init public void init(final ConnectionEventHandler connectionEventHandler) { bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass()) .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay()) .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr()) .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive()); // init netty write buffer water mark initWriteBufferWaterMark(); // init byte buf allocator if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); } bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); boolean idleSwitch = ConfigManager.tcp_idle_switch(); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS)); pipeline.addLast("heartbeatHandler", heartbeatHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", handler); } }); }
可以看到,客户端netty的pipeline与服务器的pipeline是几乎一样的。细微的差别是,客户端需要发送心跳,服务器端需要检测心跳。
//client pipeline.addLast("heartbeatHandler", heartbeatHandler); //server pipeline.addLast("serverIdleHandler", serverIdleHandler);
2.2 连接监控
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
if (monitorStrategy == null) {
connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(),
this.connectionManager);
} else {
connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,
this.connectionManager);
}
connectionMonitor.startup();
logger.warn("Switch on connection monitor");
}
DefaultConnectionMonitor对连接进行监控,其在startup方法中启动一个 ScheduledThreadPoolExecutor ,调用ConnectionMonitorStrategy.monitor方法对连接池进行处理。
@Override
public void startup() throws LifeCycleException {
super.startup();
/* initial delay to execute schedule task, unit: ms */
long initialDelay = ConfigManager.conn_monitor_initial_delay();
/* period of schedule task, unit: ms*/
long period = ConfigManager.conn_monitor_period();
this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
"ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager
.getConnPools();
strategy.monitor(connPools);
} catch (Exception e) {
logger.warn("MonitorTask error", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
}
例如ScheduledDisconnectStrategy监控连接池中连接的数量,当数量超过配置指定的长度时,随机关闭连接池中的一个连接。
public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) {
try {
if (connPools == null || connPools.size() == 0) {
return;
}
for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : connPools
.entrySet()) {
String poolKey = entry.getKey();
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
List<Connection> serviceOnConnections = new ArrayList<Connection>();
List<Connection> serviceOffConnections = new ArrayList<Connection>();
for (Connection connection : pool.getAll()) {
if (isConnectionOn(connection)) {
serviceOnConnections.add(connection);
} else {
serviceOffConnections.add(connection);
}
}
if (serviceOnConnections.size() > connectionThreshold) {
Connection freshSelectConnect = serviceOnConnections.get(random
.nextInt(serviceOnConnections.size()));
freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
Configs.CONN_SERVICE_STATUS_OFF);
serviceOffConnections.add(freshSelectConnect);
} else {
if (logger.isInfoEnabled()) {
logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].",
poolKey, serviceOnConnections.size(), connectionThreshold);
}
}
for (Connection offConn : serviceOffConnections) {
if (offConn.isInvokeFutureMapFinish()) {
if (offConn.isFine()) {
offConn.close();
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Address={} won't close at this schedule turn",
RemotingUtil.parseRemoteAddress(offConn.getChannel()));
}
}
}
}
} catch (Exception e) {
logger.error("ScheduledDisconnectStrategy monitor error", e);
}
}
2.3 连接重连
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
reconnectManager = new ReconnectManager(connectionManager);
reconnectManager.startup();
connectionEventHandler.setReconnector(reconnectManager);
logger.warn("Switch on reconnect manager");
}
与监控不同,重连在startup方法中启动了一个Thread,该线程作为一个消费者,处理添加的ReconnectTask
com.alipay.remoting.ReconnectManager#startup
@Override
public void startup() throws LifeCycleException {
super.startup();
this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
}
com.alipay.remoting.ReconnectManager.HealConnectionRunner#run
@Override
public void run() {
while (isStarted()) {
long start = -1;
ReconnectTask task = null;
try {
if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) {
Thread.sleep(HEAL_CONNECTION_INTERVAL);
}
try {
task = ReconnectManager.this.tasks.take();
} catch (InterruptedException e) {
// ignore
}
if (task == null) {
continue;
}
start = System.currentTimeMillis();
if (!canceled.contains(task.url)) {
task.run();
} else {
logger.warn("Invalid reconnect request task {}, cancel list size {}",
task.url, canceled.size());
}
this.lastConnectTime = System.currentTimeMillis() - start;
} catch (Exception e) {
if (start != -1) {
this.lastConnectTime = System.currentTimeMillis() - start;
}
if (task != null) {
logger.warn("reconnect target: {} failed.", task.url, e);
tasks.add(task);
}
}
}
}
ReconnectTask的创建
创建由两种情况,一是连接异常断开后,二是重连失败后
1. 连接异常断开的情况 com.alipay.remoting.ConnectionEventHandler#channelInactive // add reconnect task if (this.globalSwitch != null && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { Connection conn = (Connection) attr.get(); if (reconnectManager != null) { reconnectManager.reconnect(conn.getUrl()); } }
ReconnectTask的取消
当客户端强制关闭连接的时候会取消重连,并设置ReconnectManager#canceled变量
com.alipay.remoting.rpc.RpcClient#closeConnection @Override public void closeConnection(Url url) { if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) { reconnectManager.disableReconnect(url); } this.connectionManager.remove(url.getUniqueKey()); }
com.alipay.remoting.ReconnectManager#disableReconnect @Override public void disableReconnect(Url url) { canceled.add(url); }
2.4 建立连接
回到客户端启动的代码
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
客户端调用invokeSync方法,发起对服务器的访问
com.alipay.remoting.rpc.RpcClient#invokeSync
@Override
public Object invokeSync(final String address, final Object request,
final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException {
return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcRemoting#invokeSync
public Object invokeSync(final String addr, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,InterruptedException {
//调用addressParser解析地址为Url对象
Url url = this.addressParser.parse(addr);
return this.invokeSync(url, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcClientRemoting#invokeSync
@Override
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException,InterruptedException {
//1.创建连接对象
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
//2.检查连接对象
this.connectionManager.check(conn);
//3.触发调用
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
创建连接对象
com.alipay.remoting.rpc.RpcClientRemoting#getConnectionAndInitInvokeContext protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) throws RemotingException, InterruptedException { long start = System.currentTimeMillis(); Connection conn; try { //调用ConnectManager创建Connection对象 conn = this.connectionManager.getAndCreateIfAbsent(url); } finally { if (null != invokeContext) { invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start)); } } return conn; }
com.alipay.remoting.DefaultConnectionManager#getAndCreateIfAbsent @Override public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException { // 1. 创建连接池,后面可以看到poolkey就是Url的uniqueKey ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url)); if (null != pool) { //2. 获取连接 return pool.get(); } else { logger.error("[NOTIFYME] bug detected! pool here must not be null!"); return null; } }
获取连接可以参考连接选择策略
创建连接池
com.alipay.remoting.DefaultConnectionManager#getConnectionPoolAndCreateIfAbsent private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable)throws RemotingException,InterruptedException { RunStateRecordedFutureTask<ConnectionPool> initialTask; ConnectionPool pool = null; int retry = Constants.DEFAULT_RETRY_TIMES; int timesOfResultNull = 0; int timesOfInterrupt = 0; for (int i = 0; (i < retry) && (pool == null); ++i) { initialTask = this.connTasks.get(poolKey); if (null == initialTask) { RunStateRecordedFutureTask<ConnectionPool> newTask = new RunStateRecordedFutureTask<ConnectionPool>( callable); initialTask = this.connTasks.putIfAbsent(poolKey, newTask); if (null == initialTask) { initialTask = newTask; //运行task,实际调用的是ConnectionPoolCall.call() initialTask.run(); } } try { pool = initialTask.get(); ... } catch (ExecutionException e) { ... } } return pool; }
com.alipay.remoting.DefaultConnectionManager.ConnectionPoolCall#call @Override public ConnectionPool call() throws Exception { final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy); if (whetherInitConnection) { try { //初始化连接池 doCreate(this.url, pool, this.getClass().getSimpleName(), 1); } catch (Exception e) { pool.removeAllAndTryClose(); throw e; } } return pool; }
doCreate方法创建连接池中连接对象,可以同步和异步创建部分连接对象
private void doCreate(final Url url, final ConnectionPool pool, final String taskName,final int syncCreateNumWhenNotWarmup) throws RemotingException { final int actualNum = pool.size(); final int expectNum = url.getConnNum(); if (actualNum >= expectNum) { return; } if (logger.isDebugEnabled()) { logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum, taskName); } if (url.isConnWarmup()) { for (int i = actualNum; i < expectNum; ++i) { Connection connection = create(url); pool.add(connection); } } else { if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) { throw new IllegalArgumentException( "sync create number when not warmup should be [0," + url.getConnNum() + "]"); } // 同步创建对象 if (syncCreateNumWhenNotWarmup > 0) { for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) { Connection connection = create(url); pool.add(connection); } if (syncCreateNumWhenNotWarmup >= url.getConnNum()) { return; } } pool.markAsyncCreationStart();// mark the start of async try { //异步创建连接对象 this.asyncCreateConnectionExecutor.execute(new Runnable() { @Override public void run() { try { for (int i = pool.size(); i < url.getConnNum(); ++i) { Connection conn = null; try { conn = create(url); } catch (RemotingException e) { } pool.add(conn); } } finally { pool.markAsyncCreationDone();// mark the end of async } } }); } catch (RejectedExecutionException e) { pool.markAsyncCreationDone();// mark the end of async when reject throw e; } } // end of NOT warm up }
创建连接对象
通过连接工厂类创建连接对象
com.alipay.remoting.DefaultConnectionManager#create public Connection create(Url url) throws RemotingException { Connection conn; try { conn = this.connectionFactory.createConnection(url); } catch (Exception e) { throw new RemotingException("Create connection failed. The address is " + url.getOriginUrl(), e); } return conn; }
com.alipay.remoting.connection.AbstractConnectionFactory#createConnection @Override public Connection createConnection(Url url) throws Exception { Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout()); Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url); channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); return conn; }
com.alipay.remoting.connection.AbstractConnectionFactory#doCreateConnection protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception { // prevent unreasonable value, at least 1000 connectTimeout = Math.max(connectTimeout, 1000); String address = targetIP + ":" + targetPort; if (logger.isDebugEnabled()) { logger.debug("connectTimeout of address [{}] is [{}].", address, connectTimeout); } bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort)); future.awaitUninterruptibly(); ... return future.channel(); }
检查连接对象
- 检查connection存在
- 检查通道存活
- 检查通道可写
com.alipay.remoting.DefaultConnectionManager#check public void check(Connection connection) throws RemotingException { if (connection == null) { throw new RemotingException("Connection is null when do check!"); } if (connection.getChannel() == null || !connection.getChannel().isActive()) { this.remove(connection); throw new RemotingException("Check connection failed for address: " + connection.getUrl()); } if (!connection.getChannel().isWritable()) { // No remove. Most of the time it is unwritable temporarily. throw new RemotingException("Check connection failed for address: " + connection.getUrl() + ", maybe write overflow!"); } }
触发调用
public Object invokeSync(final Connection conn, final Object request, final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException { RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis); preProcessInvokeContext(invokeContext, requestCommand, conn); ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis); responseCommand.setInvokeContext(invokeContext); Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel())); return responseObject; }
- 将请求转为requestCommand
- 预处理调用上下文
- 发送请求
- 获取responseCommand并返回
发送请求是将requestCommand写入到通道里
com.alipay.remoting.BaseRemoting#invokeSync protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,final int timeoutMillis) throws RemotingException, InterruptedException { final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); final int requestId = request.getId(); try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { ... } }); } catch (Exception e) { ... } RemotingCommand response = future.waitResponse(timeoutMillis); if (response == null) { conn.removeInvokeFuture(requestId); response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress()); logger.warn("Wait response, request id={} timeout!", requestId); } return response; }
标签:url,sofa,Connection,源码,bolt,new,null,final,conn 来源: https://www.cnblogs.com/huiyao/p/12404208.html