其他分享
首页 > 其他分享> > Sphu的调用链

Sphu的调用链

作者:互联网

SphU调用流程

流程图

代码跟踪

  1. 开始调用
    在总结资源生成的时候,我们已经列举了不同的资源生成,会调用不同的方法,下面我们使用较为常用的方法SphU.Entry来作为入口分析
    /**
     * 记录统计数据并对给定资源执行规则检查    
     * Record statistics and perform rule checking for the given resource.
     *
     * @param name the unique name of the protected resource
     * @return the {@link Entry} of this invocation (used for mark the invocation complete and get context data)
     * @throws BlockException if the block criteria is met (e.g. metric exceeded the threshold of any rules)
     */
    public static Entry entry(String name) throws BlockException {
        // 重载了各种方法 通过参数来确定不同的资源
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }
------------------------------------------
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
            // 这里会使用string资源作为我们初始化资源的入参,这里是使用的ResourceWrapper的一个string子类 
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
  1. 调用途中
    沿着调用栈逐渐深入,可以发现无论是从哪个入口,最终都是在 com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object...) 中进行处理
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        // 构建context
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            // 超过阈值后 才会执行到这里,是一个空context
            // 所以只初始化, 不做其他规则校验
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            // 兜底
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        // 全局的开关,如果没有打开,不去做规则校验
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        // 开始获取processSlot  
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * 为空说明 链路超过了常量 6000 条, 就不处理了
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        // 新建一个当前执行的entry, 每次对资源的操作都会生成,
        // 然后将上一次生成的entry 作为parent
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        // 最终返回的entry 操作实体
        return e;
    }
  1. 获取对应ProcessSlot
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        // 从chainMap 根据当前资源获取slot, 这里也说明了 一个resource 只能对应一个porcessSlot
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                // double check
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    // 超过了上限, 就不弄这个资源的限制了
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    //  新弄一个chain
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
  1. 责任链的形式一直去执行chain 里面的entry方法, 我们会在 NodeSelectorSlot 之中更新当前的currNode,并绑定defaultNode
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
        有趣的是,我们使用上下文名称而不是资源名称作为映射键。记住,无论在哪个上下文中,相同的资源({@link ResourceWrapperequals(Object)})将全局共享相同的{@link ProcessorSlotChain}。因此,如果代码进入{@link条目(Context, ResourceWrapper, DefaultNode, int, Object…)},资源名必须相同,但上下文名可以不同。如果我们使用{@link com.alibaba.csp.sentinel。SphUentry(String resource)}在不同的上下文中输入相同的资源,使用上下文名称作为映射键可以区分相同的资源。在这种情况下,将为每个不同的上下文(不同的上下文名称)创建多个具有相同资源名称的{@link DefaultNode}。考虑另一个问题。一个资源可能有多个{@link DefaultNode},那么获得同一资源的统计数据的最快方法是什么?答案是所有具有相同资源名的{@link DefaultNode}共享一个{@link ClusterNode}。详情请参见{@link ClusterBuilderSlot}。
          */
         // 在这里根据上下文去生DefaultNode 
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 一个context 生成的 一个defaultNode 而且是根据context的name 来确认唯一性的                  
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    // 调用树,添加子节点
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        // 设置为当前Node
        context.setCurNode(node);
        // 执行其他的slotchain
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  1. 在flowSlot 根据规则拦截,然后在StatisticSlot 统计当前次数数据
//  StatisticSlot 中的数据统计,, 
// 1 当前entry的次数和成功次数+1
// 2. 整个服务(culsterNode) 的次数和成功次数+1
// 3. 成功统计之后的回调处理.
// 如失败,则当前请求次数+1,成功次数不做增减
try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // 阻止访问了的话 不做访问次数+1 
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // 未知异常的跑抛出
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);
            throw e;
        }

标签:count,node,调用,Sphu,resourceWrapper,context,entry,null
来源: https://www.cnblogs.com/wzqshb/p/16585817.html