Sentinel 源码分析(一)
作者:互联网
版本:
<groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> <version>2.2.5.RELEASE</version>
在spring.factories中:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\ #controller层也可以被资源保护到得原因 com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\ com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\ com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\ #核心配置类 com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration #Feign整合Sentinel org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\ com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration
主要看:SentinelAutoConfiguration,不用配置任何参数,默认生效得。
里面有一个重要得bean,SentinelResourceAspect ,这个切面就是拦截@SentinelResource 注解标识的资源得。
源码不长,把代码贴出来:
@Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 拦截 @SentinelResource 注解标识的资源 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try {
// 开启资源保护,走流控,降级等配置的规则 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 业务逻辑 return pjp.proceed(); } catch (BlockException ex) {
// 处理Block异常,所有的Sentinel 规则的异常都走这里 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) {
// 业务异常都走这里 Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) {
// 执行exit逻辑 entry.exit(1, pjp.getArgs()); } } } }
跟着:SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()) 的逻辑看下
在CtSph#entryWithPriority 下有个 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); 来构建一个Slotchain。
Sentinel会把页面配置的 流控规则,降级规则,系统规则,热点规则,授权规则每个规则都抽象出来一个Slot,比如:FlowSlot 处理流控规则,AuthoritySlot 处理授权规则,StatisticSlot 统计请求的QPS,成功的请求数,失败的请求数,阻塞的请求数,
最大响应时间等。上面说的构造的chain,就是通过责任链模式把这些slot都串起来,然后去执行。
来看下:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) {
// 每个资源都会维护一个slotChain chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 构建责任链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
跟到:DefaultSlotChainBuilder#build()
public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // Sentinel 维护了一套spi机制来加载ProcessorSlot接口的实现类,就是那些规则slot,会通过排序的,根据slot上的自定义注解@Spi中的order排序 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // 加载到的slot放入责任链中 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; }
在sentinel-core核心包下面的有spi加载的对应的文件:
最后构建出来的 ProcessorSlotChain 是下面的链条:
每个slot都实现 ProcessorSlot接口,对于具体的内容都在entry方法里面。
NodeSelectorSlot:
构建资源的链路,比如在控制台的簇点链路下面的树形结构。
ClusterBuilderSlot:
构建集群中的资源节点,会在集群限流中用到。
LogSlot:
记录些报错日志而已。
StatisticSlot:
这个slot就比较重要了,从名字就可以看出,这是做统计的,对于那些qps,成功,失败,阻塞的次数都会记录下来。@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. 执行下一个slot之前 并没有什么操作 fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count.
// 执行完责任链后面的slot之后,执行我们的业务逻辑之前,就会进行qps 加1的操作
// 请求的线程数加1
node.increaseThreadNum();
// qps 请求数加1 这里是向滑动时间窗口中的bucket中增加数量 这个后面详细说明 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers.
// 这里可以自己扩展 执行callBack
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {
// sentinel 规则异常的处理 // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count.
// 增加block的qps node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) {
// Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }
AuthoritySlot
授权规则的校验
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
// 就是校验授权规则的黑白名单规则 checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
SystemSlot
执行系统规则。
FlowSlot
执行限流规则。
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); // 触发下一个slot fireEntry(context, resourceWrapper, node, count, prioritized, args); }
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }
checker 是 FlowRuleChecker 实例。
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; }
// 获取所有的限流规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) {
// 校验流控是否通过 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }
canPassCheck 会根据流控规则时候选择的快速失败,Warm Up, 排队等待。分别使用DefaultController,WarmUpRateLimiterController,RateLimiterController类来控制,用到的限流算法分别是滑动时间窗口,
令牌桶,漏桶算法。
最后会走到 DefaultController#canPass 这里是快速失败规则会走到的地方。用到的是滑动时间窗口。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 从滑动时间窗口中得到已经通过的qps 就是比较最近1s内的流量是否达到阈值 int curCount = avgUsedTokens(node);
// acquireCount 一般就是 1 ,判断是否达到限流的要求 if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// 这里一般不走到 long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
DegradeSlot
熔断规则使用的。
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { performChecking(context, resourceWrapper); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 根据在控制台配置的熔断规则,得到断路器,这里一共两个类型的断路器 :ExceptionCircuitBreaker 当熔断策略是:异常比例,异常数的时候用。
// ResponseTimeCircuitBreaker: 熔断策略是:慢调用比例的时候使用 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }
上面重要的限流逻辑和和降级逻辑后面单独分析,这里先把整体流程记录下。
责任链上的entry方法执行完之后,如果没有BlockException异常,也就是配置的规则都正常通过了,那就会执行业务逻辑了,业务逻辑执行完之后,在上面的 SentinelResourceAspect
切面中的finally方法中会调用责任链上的exit方法。
在exit方法中,主要做一些数据的统计。涉及到上面说的流控和降级等。这个和分析流控算法和降级的时候再记录。
Sentinel的核心流程就是这个责任链的执行。各种规则的校验都在这个链上执行的。
标签:分析,node,prioritized,count,resourceWrapper,源码,context,Sentinel,entry 来源: https://www.cnblogs.com/krock/p/16240466.html