编程语言
首页 > 编程语言> > sofa-bolt源码阅读(2)-客户端的启动

sofa-bolt源码阅读(2)-客户端的启动

作者:互联网

sofa客户端访问服务器分为两步,第一步是初始化工作,第二步是建立连接。典型的代码是

// 1. create a rpc client
RpcClient client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.startup();
// 4. invoke
RequestBody req = new RequestBody(2, "hello world sync");
try {
    String res = (String) client.invokeSync(addr, req, 3000);
    System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
    String errMsg = "RemotingException caught in oneway!";
    logger.error(errMsg, e);
    Assert.fail(errMsg);
} catch (InterruptedException e) {
    logger.error("interrupted!");
}
// 5. close
client.shutdown();

RpcClient.startup完成了初始化工作,包括连接的管理(ConnectionManager)、监控(DefaultConnectionMonitor)和重连(ReconnectManager)。

2.1 连接管理

ConnectionManager负责对连接进行管理。每一个url对应一个poolkey,每一个poolkey创建一个ConnectionPool,

一个ConnectionPool维护多个连接。如上图所示,当需要一个连接时,ConnectionManager从ConnectionPool中按照连接选择策略ConnectionSelectStrategy获取一个连接。

2.2 连接监控

if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
    if (monitorStrategy == null) {
        connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(),
            this.connectionManager);
    } else {
        connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,
            this.connectionManager);
    }
    connectionMonitor.startup();
    logger.warn("Switch on connection monitor");
}

DefaultConnectionMonitor对连接进行监控,其在startup方法中启动一个 ScheduledThreadPoolExecutor ,调用ConnectionMonitorStrategy.monitor方法对连接池进行处理。

@Override
public void startup() throws LifeCycleException {
    super.startup();

    /* initial delay to execute schedule task, unit: ms */
    long initialDelay = ConfigManager.conn_monitor_initial_delay();

    /* period of schedule task, unit: ms*/
    long period = ConfigManager.conn_monitor_period();

    this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
        "ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
    this.executor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager
                    .getConnPools();
                strategy.monitor(connPools);
            } catch (Exception e) {
                logger.warn("MonitorTask error", e);
            }
        }
    }, initialDelay, period, TimeUnit.MILLISECONDS);
}

例如ScheduledDisconnectStrategy监控连接池中连接的数量,当数量超过配置指定的长度时,随机关闭连接池中的一个连接。

public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) {
    try {
        if (connPools == null || connPools.size() == 0) {
            return;
        }

        for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : connPools
            .entrySet()) {
            String poolKey = entry.getKey();
            ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);

            List<Connection> serviceOnConnections = new ArrayList<Connection>();
            List<Connection> serviceOffConnections = new ArrayList<Connection>();
            for (Connection connection : pool.getAll()) {
                if (isConnectionOn(connection)) {
                    serviceOnConnections.add(connection);
                } else {
                    serviceOffConnections.add(connection);
                }
            }

            if (serviceOnConnections.size() > connectionThreshold) {
                Connection freshSelectConnect = serviceOnConnections.get(random
                    .nextInt(serviceOnConnections.size()));
                freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
                    Configs.CONN_SERVICE_STATUS_OFF);
                serviceOffConnections.add(freshSelectConnect);
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].",
                        poolKey, serviceOnConnections.size(), connectionThreshold);
                }
            }

            for (Connection offConn : serviceOffConnections) {
                if (offConn.isInvokeFutureMapFinish()) {
                    if (offConn.isFine()) {
                        offConn.close();
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Address={} won't close at this schedule turn",
                            RemotingUtil.parseRemoteAddress(offConn.getChannel()));
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error("ScheduledDisconnectStrategy monitor error", e);
    }
}

2.3 连接重连

if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
    reconnectManager = new ReconnectManager(connectionManager);
    reconnectManager.startup();

    connectionEventHandler.setReconnector(reconnectManager);
    logger.warn("Switch on reconnect manager");
}

与监控不同,重连在startup方法中启动了一个Thread,该线程作为一个消费者,处理添加的ReconnectTask

com.alipay.remoting.ReconnectManager#startup
@Override
public void startup() throws LifeCycleException {
    super.startup();

    this.healConnectionThreads = new Thread(new HealConnectionRunner());
    this.healConnectionThreads.start();
}
com.alipay.remoting.ReconnectManager.HealConnectionRunner#run
@Override
public void run() {
    while (isStarted()) {
        long start = -1;
        ReconnectTask task = null;
        try {
            if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) {
                Thread.sleep(HEAL_CONNECTION_INTERVAL);
            }

            try {
                task = ReconnectManager.this.tasks.take();
            } catch (InterruptedException e) {
                // ignore
            }

            if (task == null) {
                continue;
            }

            start = System.currentTimeMillis();
            if (!canceled.contains(task.url)) {
                task.run();
            } else {
                logger.warn("Invalid reconnect request task {}, cancel list size {}",
                    task.url, canceled.size());
            }
            this.lastConnectTime = System.currentTimeMillis() - start;
        } catch (Exception e) {
            if (start != -1) {
                this.lastConnectTime = System.currentTimeMillis() - start;
            }

            if (task != null) {
                logger.warn("reconnect target: {} failed.", task.url, e);
                tasks.add(task);
            }
        }
    }
}

2.4 建立连接

回到客户端启动的代码

RequestBody req = new RequestBody(2, "hello world sync");
try {
    String res = (String) client.invokeSync(addr, req, 3000);
    System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
    String errMsg = "RemotingException caught in oneway!";
    logger.error(errMsg, e);
    Assert.fail(errMsg);
} catch (InterruptedException e) {
    logger.error("interrupted!");
}

客户端调用invokeSync方法,发起对服务器的访问

com.alipay.remoting.rpc.RpcClient#invokeSync
@Override
public Object invokeSync(final String address, final Object request,
                         final InvokeContext invokeContext, final int timeoutMillis)                                throws RemotingException,InterruptedException {
    return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcRemoting#invokeSync
public Object invokeSync(final String addr, final Object request,
                             final InvokeContext invokeContext, final int timeoutMillis)
                            throws RemotingException,InterruptedException {
        //调用addressParser解析地址为Url对象
        Url url = this.addressParser.parse(addr);
        return this.invokeSync(url, request, invokeContext, timeoutMillis);
    }

com.alipay.remoting.rpc.RpcClientRemoting#invokeSync    
@Override
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException,InterruptedException {
    //1.创建连接对象
    final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
    //2.检查连接对象
    this.connectionManager.check(conn);
    //3.触发调用
    return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}

标签:url,sofa,Connection,源码,bolt,new,null,final,conn
来源: https://www.cnblogs.com/huiyao/p/12404208.html