编程语言
首页 > 编程语言> > 微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

作者:互联网

5.4 Sentinel 流控、统计和熔断的源码分析


前言

参考资料
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
《Sentinel GitHub 官网》
《Sentinel 官网》

调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起;

本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;


1. Sentinel 的自动装配

1.2 依赖引入

Sentinel 的自动装配

1.3 SentinelWebAutoConfiguration 配置类

@Configuration
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration {
    
    //省略其他代码
    
	@Bean
	@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
	public FilterRegistrationBean sentinelFilter() {
		FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();

		SentinelProperties.Filter filterConfig = properties.getFilter();

		if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
			List<String> defaultPatterns = new ArrayList<>();
			//默认情况下通过 /* 规则拦截所有的请求
			defaultPatterns.add("/*");
			filterConfig.setUrlPatterns(defaultPatterns);
		}

		registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0]));
		//【点进去】注册 CommonFilter
		Filter filter = new CommonFilter();
		registration.setFilter(filter);
		registration.setOrder(filterConfig.getOrder());
		registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify()));
		log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
		return registration;
	}
}

1.4 CommonFilter 过滤器

public class CommonFilter implements Filter {
    
    //省略部分代码

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest)request;
        Entry urlEntry = null;
        try {
            //解析请求 URL
            String target = FilterUtil.filterTarget(sRequest);
            //URL 清洗
            UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
            if (urlCleaner != null) {
                //如果存在,则说明配置过 URL 清洗策略,替换配置的 targer
                target = urlCleaner.clean(target);
            }
            if (!StringUtil.isEmpty(target)) {
                String origin = this.parseOrigin(sRequest);
                ContextUtil.enter("sentinel_web_servlet_context", origin);
                if (this.httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
                    //使用 SphU.entry() 方法对 URL 添加限流埋点
                    urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
                } else {
                    urlEntry = SphU.entry(target, 1, EntryType.IN);
                }
            }
            //执行过滤
            chain.doFilter(request, response);
        } catch (BlockException var14) {
            HttpServletResponse sResponse = (HttpServletResponse)response;
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
        } catch (ServletException | RuntimeException | IOException var15) {
            Tracer.traceEntry(var15, urlEntry);
            throw var15;
        } finally {
            if (urlEntry != null) {
                urlEntry.exit();
            }
            ContextUtil.exit();
        }
    }
}

1.5 小结


2. 获取 ProcessorSlot 链

2.1 Sentinel 源码包结构

模块名说明
sentinel-adapter负责针对主流开源框架进行限流适配,如:Dubbo、gRPC、Zuul 等;
sentinel-coreSentinel 核心库,提供限流、熔断等实现;
sentinel-dashboard控制台模块,提供可视化监控和管理;
sentinel-demo官方案例;
sentinel-extension实现不同组件的数据源扩展,如:Nacos、ZooKeeper、Apollo 等;
sentinel-transport通信协议处理模块;

首次DeBug 进入 SphU.entry() 方法

2.2 获取 ProcessorSlot 链与操作 Slot 槽的入口 CtSph.entryWithPriority()

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        //上下文量已经超过阈值 -> 只初始化条目,不进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        //没有指定上下文 -> 使用默认上下文 context
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
     
     if (!Constants.ON) {
        //全局开关关闭 -> 没有规则检查
        return new CtEntry(resourceWrapper, null, context);
    }
    //【断点步入 2.2.1】通过 lookProcessChain 方法获取 ProcessorSlot 链
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);


    if (chain == null) {
        //表示资源量超过 Constants.MAX_SLOT_CHAIN_SIZE 常量 -> 不会进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //【断点步入 3./4./5.】执行 ProcessorSlot 对 ProcessorSlot 链中的 Slot 槽遍历操作(遍历链表的方式)
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        //这种情况不应该发生,除非 Sentinel 内部存在错误
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

2.2.1 构造 ProcessorSlot 链 CtSph.lookProcessChain()

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //从缓存中获取 slot 调用链
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                //【断点步入】构造 Slot 链(责任链模式)
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }
        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }
    return chain;
}

ProcessorSlotChain 链中有 10 个 Slot 插槽

2.2.2 操作 Slot 槽的入口

3. 流控槽实施流控逻辑 FlowSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】检查流量规则
    checkFlow(resourceWrapper, context, node, count, prioritized);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //【断点步入 3.1】获取流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        //遍历所有流控规则 FlowRule
        for (FlowRule rule : rules) {
            //【点进去 3.2】校验每条规则
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

3.1 获取流控规则 FlowSlot.ruleProvider.apply()

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};

3.2 校验每条规则 FlowRuleChecker.canPassCheck()

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    //集群模式
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    //【点进去】单机模式
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    //【点进去 3.2.1】获取 Node
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //【点进去 3.2.2】获取流控的处理策略
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

3.2.1 获取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    //limitApp 不能为空
    String limitApp = rule.getLimitApp();
    int strategy = rule.getStrategy();
    String origin = context.getOrigin();
    
    //场景1:限流规则设置了具体应用,如果当前流量就是通过该应用的,则命中场景1
    if (limitApp.equals(origin) && filterOrigin(origin)) {
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
    //场景2:限流规则未指定任何具体应,默认为default,则当前流量直接命中场景2
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Return the cluster node.
            return node.getClusterNode();
        }

        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
    //场景3:限流规则设置的是other,当前流量未命中前两种场景
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    }
    return null;
}

3.2.2 获取流控的处理策略 `FlowRule.getRater().canPass()

TrafficShapingController 的四种实现类


4. 统计槽实施指标数据统计 StatisticSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        //先执行后续 Slot 检查,再统计数据(即先调用后续所有 Slot)
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        //【断点步入】使用 Node 统计“增加线程数”和“请求通过数”
        node.increaseThreadNum();
        node.addPassRequest(count);

        //如果存在来源节点,则对来源节点增加线程数和请求通过数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
        
        //如果是入口流量,则对全局节点增加线程数和请求通过数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理优先级等待异常    
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        //如果有来源节点,则对来源节点增加线程数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        //如果是入口流量,对全局节点增加线程数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理限流、熔断等异常    
    } catch (BlockException e) {
        
        //省略
        
        throw e;
    //处理业务异常    
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        throw e;
    }
}

4.1 统计“增加线程数”和“请求通过数”

public class StatisticNode implements Node {

    //省略其他代码

    //【断点步入】最近 1s 滑动窗口计数器(默认 1s)
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    //最近 1min 滑动窗口计数器(默认 1min)
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    
    //增加 “请求通过数” 
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
    //增加 RT 和成功数
    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);
        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

    //增加“线程数”
    @Override
    public void increaseThreadNum() {
        curThreadNum.increment();
    }
}

4.2 数据统计的数据结构

4.2.1 ArrayMetric 指标数组

public class ArrayMetric implements Metric {
    
    //省略其他代码

    //【点进去 4.2.2】数据存储
    private final LeapArray<MetricBucket> data;
    
    //最近 1s 滑动计数器用的是 OccupiableBucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    //最近 1min 滑动计数器用的是 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

    //增加成功数
    @Override
    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }

    //增加通过数
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

    //增加 RT
    @Override
    public void addRT(long rt) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addRT(rt);
    }
}

4.2.2 LeapArray 环形数组

public abstract class LeapArray<T> {

    //省略其他代码

    //单个窗口的长度(1个窗口多长时间)
    protected int windowLengthInMs;
    //采样窗口个数
    protected int sampleCount;
    //全部窗口的长度(全部窗口多长时间)
    protected int intervalInMs;
    private double intervalInSecond;
    //窗口数组:存储所有窗口(支持原子读取和写入)
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    //更新窗口数据时用的锁
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        //计算单个窗口的长度
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    //【点进去 4.2.3】获取当前窗口
    public WindowWrap<T> currentWindow() {
        //这里参数是当前时间
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    //获取指定时间的窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 计算数组下标
        int idx = calculateTimeIdx(timeMillis);
        //计算当前请求对应的窗口开始时间
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * 从 array 中获取窗口。有 3 种情况:
         * (1) array 中窗口不在,创建一个 CAS 并写入 array;
         * (2) array 中窗口开始时间 = 当前窗口开始时间,直接返回;
         * (3) array 中窗口开始时间 < 当前窗口开始时间,表示 o1d 窗口已过期,重置窗口数据并返回;
         */
        while (true) {
            // 取窗口
            WindowWrap<T> old = array.get(idx);
            //(1)窗口不在
            if (old == null) {
                //创建一个窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                //CAS将窗口写进 array 中并返回(CAS 操作确保只初始化一次)
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    //并发写失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候 array 中有数据了会命中第2种情况;
                    Thread.yield();
                }
            //(2)array 中窗口开始时间 = 当前窗口开始时间
            } else if (windowStart == old.windowStart()) {
                //直接返回
                return old;
            //(3)array 中窗口开始时间 < 当前窗口开始时间    
            } else if (windowStart > old.windowStart()) {
                //尝试获取更新锁
                if (updateLock.tryLock()) {
                    try {
                        //拿到锁的线程才重置窗口
                        return resetWindowTo(old, windowStart);
                    } finally {
                        //释放锁
                        updateLock.unlock();
                    }
                } else {
                    //并发加锁失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候因为 old 对象时间更新了会命中第 2 种情况;
                    Thread.yield();
                }
            //理论上不会出现    
            } else if (windowStart < old.windowStart()) {
                // 正常情况不会进入该分支(机器时钟回拨等异常情况)
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    //计算索引
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        //timeId 降低时间精度
        long timeId = timeMillis / windowLengthInMs;
        //计算当前索引,这样我们就可以将时间戳映射到 leap 数组
        return (int)(timeId % array.length());
    }
    //计算窗口开始时间
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }
}

4.2.3 WindowWrap 窗口包装类

public class WindowWrap<T> {
    //窗口长度,与 LeapArray 的 windowLengthInMs 一致
    private final long windowLengthInMs;
    //窗口开始时间,其值是 windowLengthInMs 的整数倍
    private long windowStart;
    //窗口的数据,支持 MetricBucket 类型,存储统计数据
    private T value;

    //省略其他代码
}

4.2.4 MetricBucket 指标桶

public class MetricBucket {
    /**
     * 存储指标的计数器;
     * LongAdder 是线程安全的计数器
     * counters[0]  PASS 通过数;
     * counters[1]  BLOCK 拒绝数;
     * counters[2]  EXCEPTION 异常数;
     * counters[3]  SUCCESS 成功数;
     * counters[4]  RT 响应时长;
     * counters[5]  OCCUPIED_PASS 预分配通过数;
     **/
    private final LongAdder[] counters;

    //最小 RT,默认值是 5000ms
    private volatile long minRt;

    //构造中初始化
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    //覆盖指标
    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    //重置指标为0
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }
    //获取指标,从 counters 中返回
    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
    //添加指标
    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    //省略其他代码
}

4.2.5 各数据结构的依赖关系

各数据结构的 UML 图.png

结构示意图.png

4.2.6 LeapArray 统计数据的大致思路

LeapArray 统计数据的大致思路


5. 熔断槽实施服务熔断 DegradeSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】熔断检查
    performChecking(context, resourceWrapper);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    //根据 resourceName 获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    //循环判断每个断路器
    for (CircuitBreaker cb : circuitBreakers) {
        //【点进去】尝试通过断路器
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

5.1 继续或取消熔断功能

@Override
public boolean tryPass(Context context) {
    //当前断路器状态为关闭
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    if (currentState.get() == State.OPEN) {
        //【点进去】对于半开状态,我们尝试通过
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}
protected boolean fromOpenToHalfOpen(Context context) {
    //尝试将状态从 OPEN 设置为 HALF_OPEN
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        //状态变化通知
        notifyObservers(State.OPEN, State.HALF_OPEN, null);
        Entry entry = context.getCurEntry();
        //在 entry 添加一个 exitHandler  entry.exit() 时会调用
        entry.whenTerminate(new BiConsumer<Context, Entry>() {
            @Override
            public void accept(Context context, Entry entry) {
                //如果有发生异常,重新将状态设置为OPEN 请求不同通过
                if (entry.getBlockError() != null) {
                    currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                    notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                }
            }
        });
        //此时状态已设置为HALF_OPEN正常通行
        return true;
    }
    //熔断
    return false;
}

5.2 请求失败,启动熔断

@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
    Entry curEntry = context.getCurEntry();
    //无阻塞异常
    if (curEntry.getBlockError() != null) {
        fireExit(context, r, count, args);
        return;
    }
    //通过资源名获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    //没有配置断路器,则直接放行
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        fireExit(context, r, count, args);
        return;
    }

    if (curEntry.getBlockError() == null) {
        for (CircuitBreaker circuitBreaker : circuitBreakers) {
            //【点进去】在请求完成时
            circuitBreaker.onRequestComplete(context);
        }
    }
    fireExit(context, r, count, args);
}
@Override
public void onRequestComplete(Context context) {
    Entry entry = context.getCurEntry();
    if (entry == null) {
        return;
    }
    Throwable error = entry.getError();
    //简单错误计数器
    SimpleErrorCounter counter = stat.currentWindow().value();
    if (error != null) {
        //异常请求数加 1
        counter.getErrorCount().add(1);
    }
    //总请求数加 1
    counter.getTotalCount().add(1);
    //【点进去】超过阈值时变更状态
    handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
    //全开则直接放行
    if (currentState.get() == State.OPEN) {
        return;
    }
    //半开状态
    if (currentState.get() == State.HALF_OPEN) {
        //检查请求
        if (error == null) {
            //发生异常,将状态从半开 HALF_OPEN 转为关闭 CLOSE
            fromHalfOpenToClose();
        } else {
            //无异常,解开半开状态
            fromHalfOpenToOpen(1.0d);
        }
        return;
    }
    
    //计算是否超过阈值
    List<SimpleErrorCounter> counters = stat.values();
    long errCount = 0;
    long totalCount = 0;
    for (SimpleErrorCounter counter : counters) {
        errCount += counter.errorCount.sum();
        totalCount += counter.totalCount.sum();
    }
    if (totalCount < minRequestAmount) {
        return;
    }
    double curCount = errCount;
    if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
        //熔断策略为:异常比例
        curCount = errCount * 1.0d / totalCount;
    }
    if (curCount > threshold) {
        transformToOpen(curCount);
    }
}

6. Sentinel 源码结构图小结


最后

新人制作,如有错误,欢迎指出,感激不尽!
欢迎关注公众号,会分享一些更日常的东西!
如需转载,请标注出处!

标签:return,窗口,流控,源码,context,Sentinel,entry,null,public
来源: https://blog.csdn.net/dlhjw1412/article/details/122661183