编程语言
首页 > 编程语言> > Sentinel 源码分析(一)

Sentinel 源码分析(一)

作者:互联网

版本:

  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  <version>2.2.5.RELEASE</version>

在spring.factories中:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\     #controller层也可以被资源保护到得原因
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\   #核心配置类
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration  #Feign整合Sentinel

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration

主要看:SentinelAutoConfiguration,不用配置任何参数,默认生效得。

 里面有一个重要得bean,SentinelResourceAspect ,这个切面就是拦截@SentinelResource 注解标识的资源得。

 

 

 源码不长,把代码贴出来:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
    
// 拦截 @SentinelResource 注解标识的资源 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try {
// 开启资源保护,走流控,降级等配置的规则 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 业务逻辑 return pjp.proceed(); } catch (BlockException ex) {
// 处理Block异常,所有的Sentinel 规则的异常都走这里 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) {
// 业务异常都走这里 Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) {
// 执行exit逻辑 entry.exit(1, pjp.getArgs()); } } } }

 

跟着:SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())   的逻辑看下

在CtSph#entryWithPriority 下有个 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);  来构建一个Slotchain。

Sentinel会把页面配置的 流控规则,降级规则,系统规则,热点规则,授权规则每个规则都抽象出来一个Slot,比如:FlowSlot 处理流控规则,AuthoritySlot 处理授权规则,StatisticSlot 统计请求的QPS,成功的请求数,失败的请求数,阻塞的请求数,

最大响应时间等。上面说的构造的chain,就是通过责任链模式把这些slot都串起来,然后去执行。

来看下:

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
// 每个资源都会维护一个slotChain chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 构建责任链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }

 

跟到:DefaultSlotChainBuilder#build()

 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //  Sentinel 维护了一套spi机制来加载ProcessorSlot接口的实现类,就是那些规则slot,会通过排序的,根据slot上的自定义注解@Spi中的order排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
              // 加载到的slot放入责任链中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

 

在sentinel-core核心包下面的有spi加载的对应的文件:

 

 最后构建出来的 ProcessorSlotChain 是下面的链条:

每个slot都实现 ProcessorSlot接口,对于具体的内容都在entry方法里面。

 NodeSelectorSlot:

    构建资源的链路,比如在控制台的簇点链路下面的树形结构。

 ClusterBuilderSlot:

           构建集群中的资源节点,会在集群限流中用到。

 LogSlot:

      记录些报错日志而已。

     

 

 StatisticSlot:

       这个slot就比较重要了,从名字就可以看出,这是做统计的,对于那些qps,成功,失败,阻塞的次数都会记录下来。@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking. 执行下一个slot之前 并没有什么操作
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
// 执行完责任链后面的slot之后,执行我们的业务逻辑之前,就会进行qps 加1的操作
// 请求的线程数加1
node.increaseThreadNum();
// qps 请求数加1 这里是向滑动时间窗口中的bucket中增加数量 这个后面详细说明 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers.
// 这里可以自己扩展 执行callBack
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {
// sentinel 规则异常的处理 // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count.
// 增加block的qps node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) {
// Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }

 

AuthoritySlot

  授权规则的校验

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
// 就是校验授权规则的黑白名单规则 checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); }

 

SystemSlot

     执行系统规则。

FlowSlot  

     执行限流规则。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);
         // 触发下一个slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

checker 是 FlowRuleChecker 实例。

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
// 获取所有的限流规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) {
// 校验流控是否通过 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }

canPassCheck 会根据流控规则时候选择的快速失败,Warm Up, 排队等待。分别使用DefaultController,WarmUpRateLimiterController,RateLimiterController类来控制,用到的限流算法分别是滑动时间窗口,

令牌桶,漏桶算法。

最后会走到 DefaultController#canPass   这里是快速失败规则会走到的地方。用到的是滑动时间窗口。

 public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 从滑动时间窗口中得到已经通过的qps 就是比较最近1s内的流量是否达到阈值 int curCount = avgUsedTokens(node);
// acquireCount 一般就是 1 ,判断是否达到限流的要求 if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// 这里一般不走到 long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }

DegradeSlot

     熔断规则使用的。

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 根据在控制台配置的熔断规则,得到断路器,这里一共两个类型的断路器 :ExceptionCircuitBreaker 当熔断策略是:异常比例,异常数的时候用。
// ResponseTimeCircuitBreaker: 熔断策略是:慢调用比例的时候使用 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }

上面重要的限流逻辑和和降级逻辑后面单独分析,这里先把整体流程记录下。

责任链上的entry方法执行完之后,如果没有BlockException异常,也就是配置的规则都正常通过了,那就会执行业务逻辑了,业务逻辑执行完之后,在上面的 SentinelResourceAspect

切面中的finally方法中会调用责任链上的exit方法。

 

 在exit方法中,主要做一些数据的统计。涉及到上面说的流控和降级等。这个和分析流控算法和降级的时候再记录。

 Sentinel的核心流程就是这个责任链的执行。各种规则的校验都在这个链上执行的。

 

标签:分析,node,prioritized,count,resourceWrapper,源码,context,Sentinel,entry
来源: https://www.cnblogs.com/krock/p/16240466.html