编程语言
首页 > 编程语言> > 源码分析---SOFARPC客户端服务调用

源码分析---SOFARPC客户端服务调用

作者:互联网

在这里插入图片描述
我们首先看看BoltClientProxyInvoker的关系图
在这里插入图片描述
(想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)

所以当我们用BoltClientProxyInvoker#invoke的时候实际上是调用了父类的invoke方法

ClientProxyInvoker#invoke
    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        Throwable throwable = null;
        try {
            RpcInternalContext.pushContext();
            RpcInternalContext context = RpcInternalContext.getContext();
            context.setProviderSide(false);
            // 包装request请求
            decorateRequest(request);
            try {
                // 产生开始调用事件
                if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                    EventBus.post(new ClientStartInvokeEvent(request));
                }
                // 得到结果
                response = cluster.invoke(request);
            } catch (SofaRpcException e) {
                throwable = e;
                throw e;
            } finally {
                // 产生调用结束事件
                if (!request.isAsync()) {
                    if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                        EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
                    }
                }
            }
            // 包装响应
            decorateResponse(response);
            return response;
        } finally {
            RpcInternalContext.removeContext();
            RpcInternalContext.popContext();
        }
    }

这个方法主要做了几件事:

  1. 包装request请求,设置必要的参数
  2. 调用FailOverCluster的invoke方法,将reques请求发送出去,并得到response相应
  3. 包装response响应

我们在调用FailOverCluster的时候实际上是调用的父类AbstractCluster的invoker方法,FailOverCluster关系图如下:
在这里插入图片描述
所以我们进入到AbstractCluster的invoker方法中:

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
try {
// 做一些初始化检查,例如未连接可以连接
checkClusterState();
// 开始调用
countOfInvoke.incrementAndGet(); // 计数+1
response = doInvoke(request);
return response;
} catch (SofaRpcException e) {
// 客户端收到异常(客户端自己的异常)
throw e;
} finally {
countOfInvoke.decrementAndGet(); // 计数-1
}
}

checkClusterState方法主要是用来校验是否已销毁了,或是调用了init方法进行初始化了。
然后会在调用之前记一下数。
然后我们进入到doInvoke方法中:

public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
String methodName = request.getMethodName();
int retries = consumerConfig.getMethodRetries(methodName);
int time = 0;
SofaRpcException throwable = null;// 异常日志
List invokedProviderInfos = new ArrayList(retries + 1);
do {
//负载均衡
ProviderInfo providerInfo = select(request, invokedProviderInfos);
try {
//调用过滤器链
SofaResponse response = filterChain(providerInfo, request);
if (response != null) {
if (throwable != null) {
if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
LOGGER.warnWithApp(consumerConfig.getAppName(),
LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
throwable.getClass() + “:” + throwable.getMessage(),
invokedProviderInfos));
}
}
return response;
} else {
throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
"Failed to call " + request.getInterfaceName() + “.” + methodName
+ " on remote server " + providerInfo + “, return null”);
time++;
}
} catch (SofaRpcException e) { // 服务端异常+ 超时异常 才发起rpc异常重试
if (e.getErrorType() == RpcErrorType.SERVER_BUSY
|| e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
throwable = e;
time++;
} else {
throw e;
}
} catch (Exception e) { // 其它异常不重试
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
"Failed to call " + request.getInterfaceName() + “.” + request.getMethodName()
+ " on remote server: " + providerInfo + ", cause by unknown exception: "
+ e.getClass().getName() + ", message is: " + e.getMessage(), e);
} finally {
if (RpcInternalContext.isAttachmentEnable()) {
RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
time + 1); // 重试次数
}
}
invokedProviderInfos.add(providerInfo);
} while (time <= retries);

throw throwable;

}

这个方法里面主要做了这这件事:

  1. 如果失败的话就循环调用
  2. 负载均衡,选取provider
  3. 通过过滤器链调用服务端,并返回结果
  4. 异常处理

接着我们进入到filterChain方法中,根据过滤器链最后会跳到ConsumerInvoker中的invoke方法

@Override
public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
// 设置下服务器应用
ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
if (StringUtils.isNotEmpty(appName)) {
sofaRequest.setTargetAppName(appName);
}

// 目前只是通过client发送给服务端
return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);

}

consumerBootstrap.getCluster()会返回FailOverCluster实例,然后调用父类AbstractCluster的sendMsg方法

public SofaResponse sendMsg(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
if (clientTransport != null && clientTransport.isAvailable()) {
return doSendMsg(providerInfo, clientTransport, request);
} else {
throw unavailableProviderException(request.getTargetServiceUniqueName(), providerInfo.getOriginUrl());
}
}

protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
SofaRequest request) throws SofaRpcException {
RpcInternalContext context = RpcInternalContext.getContext();
// 添加调用的服务端远程地址
RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
try {
checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
String invokeType = request.getInvokeType();
int timeout = resolveTimeout(request, consumerConfig, providerInfo);

    SofaResponse response = null;
    // 同步调用
    if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
        long start = RpcRuntimeContext.now();
        try {
            response = transport.syncSend(request, timeout);
        } finally {
            if (RpcInternalContext.isAttachmentEnable()) {
                long elapsed = RpcRuntimeContext.now() - start;
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
            }
        }
    }
    // 单向调用
    else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
        long start = RpcRuntimeContext.now();
        try {
            transport.oneWaySend(request, timeout);
            response = buildEmptyResponse(request);
        } finally {
            if (RpcInternalContext.isAttachmentEnable()) {
                long elapsed = RpcRuntimeContext.now() - start;
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
            }
        }
    }
    // Callback调用
    else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
        // 调用级别回调监听器
        SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
        if (sofaResponseCallback == null) {
            SofaResponseCallback methodResponseCallback = consumerConfig
                .getMethodOnreturn(request.getMethodName());
            if (methodResponseCallback != null) { // 方法的Callback
                request.setSofaResponseCallback(methodResponseCallback);
            }
        }
        // 记录发送开始时间
        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
        // 开始调用
        transport.asyncSend(request, timeout);
        response = buildEmptyResponse(request);
    }
    // Future调用
    else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
        // 记录发送开始时间
        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
        // 开始调用
        ResponseFuture future = transport.asyncSend(request, timeout);
        // 放入线程上下文
        RpcInternalContext.getContext().setFuture(future);
        response = buildEmptyResponse(request);
    } else {
        throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
    }
    return response;
} catch (SofaRpcException e) {
    throw e;
} catch (Throwable e) { // 客户端其它异常
    throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
}

}

sendMsg方法最后会调用到doSendMsg。
soSendMsg里面主要做了如下几件事:

  1. 如果是同步调用,则直接返回封装好的参数
  2. 如果是单向调用,则调用buildEmptyResponse方法,返回一个空的response
  3. 如果是callback调用asyncSend,RPC在获取到服务端的结果后会自动执行该回调实现。
  4. 服务端返回响应结果被 RPC 缓存,当客户端需要响应结果的时候需要主动获取结果,获取结果的过程阻塞线程。

标签:SofaRpcException,调用,SOFARPC,request,RpcInternalContext,源码,providerInfo,response,
来源: https://blog.csdn.net/wanghao112956/article/details/99673171