既然它建立连接用的socket,为什么不用netty
作者:互联网
1. 阿里sentinel源码研究深入
1.1. 前言
- 昨天已经把sentinel成功部署到线上环境,可参考我上篇博文,该走的坑也都走了一遍,已经可以初步使用它的限流和降级功能,根据我目前的实践,限流和降级规则似乎不能一同起效,还不知道原因,下面继续探索
1.2. 源码
1.2.1. 流控降级监控等的构建
- 首先客户端而言,我关注的是我写的代码
SphU.entry
,这明显是很关键的方法,下图的内容就是这里构建的
-Sentinel工作主流程就包含在上面一个方法里,通过链式调用的方式,经过了建立树状结构,保存统计簇点,异常日志记录,实时数据统计,负载保护,权限认证,流量控制,熔断降级等Slot
- 进入链式方法的入口为CtSph类,try方法大括号内
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
1.2.2. 修改控制台规则是如何通知客户端的?
- 看sentinel-transport-simple-http包中的
HttpEventTask
类,它开启了一个线程,转么用来做为socket连接,控制台通过socket请求通知客户端,从而更新客户端规则,更改规则核心代码如下
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
} else {
// No matching command handler.
badRequest(printWriter, "Unknown command `" + commandName + '`');
}
通过命令模式,commandName为setRules时,更新规则
1.2.3. 既然它建立连接用的socket,为什么不用netty呢?
- 带着这个疑问,我本想在issues里找下,突然发现它的源码中有个
sentinel-transport-netty-http
这个包和sentinel-transport-simple-http
处于同级,官方的例子用的simple-http,但明显它也准备了netty-http,于是我替换成了netty-http,运行后效果和原先一样,至于效率上有没有提升,我就不清楚了^_^
1.2.4. 流量规则如何检查?
- 该规则检查类为
FlowRuleChecker
,在core核心包中,核心检查方法如下
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
1.2.5. 熔断降级如何判断?
- 判断类为
DegradeRuleManager
,在core核心包,核心内容如下,再深入就是它判断的算法了,感兴趣的自己去看如下的passCheck
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
1.2.6. 默认的链条构建在哪?
- 核心类为
DefaultSlotChainBuilder
,构建了如下的slot
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
1.2.7. 既然已经知道了它是如何构建链式的处理节点的,我们是否何可自己重新构建?
- 发现类
SlotChainProvider
中的构建方法如下
private static void resolveSlotChainBuilder() {
List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
boolean hasOther = false;
for (SlotChainBuilder builder : LOADER) {
if (builder.getClass() != DefaultSlotChainBuilder.class) {
hasOther = true;
list.add(builder);
}
}
if (hasOther) {
builder = list.get(0);
} else {
// No custom builder, using default.
builder = new DefaultSlotChainBuilder();
}
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ builder.getClass().getCanonicalName());
}
- 也就是说,我们如果在
LOADER
中加入了其他的非默认实现就可以替代原来的DefaultSlotChainBuilder
,那LOADER
怎么来的?看代码,如下的全局变量,也就是需要自定义实现SlotChainBuilder
接口的实现类
private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);
1.2.8. 如何实现SlotChainBuilder接口呢?
- 这里要注意的是它使用了
ServiceLoader
,也就是SPI
,全称Service Provider Interface
,加载它需要特定的配合,比如我自定义实现一个Slot
/**
* @author laoliangliang
* @date 2019/7/25 14:13
*/
public class MySlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
//自定义的
chain.addLast(new CarerSlot());
return chain;
}
}
/**
* @author laoliangliang
* @date 2019/7/25 14:15
*/
@Slf4j
public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
log.info(JSON.toJSONString(resourceWrapper));
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
- 这里我自定义了
CarerSlot
,那是否能被加载到呢?事实上还不够,需要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
建这样一个文件,内容如下
- 好了,这样配置过后,它就能读到我们自定义的实现类代替它原先的类了
1.2.9. 该命令模式最初的初始化阶段在哪?
- 用过sentinel的都会感受到,只有当有第一个sentinel监控的请求过来时,sentinel客户端才会正式初始化,这样看来,这个初始化步骤应该在哪呢?
- 我通过不断反向跟踪上述的命令模式最初的初始化,找到了最初初始化的地方如下
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
- 有没有觉得很熟悉?doInit就是很多初始化的起点,当Env被调用时会运行static代码块,那么只有可能是sph被调用时
- 只要你debug过我上述第一条
SphU.entry
的源码,就会发现,如下,该方法一进入不就是先获取Env的sph,再调用的entry吗,所以初始化的地方也就找到了,第一次调用SphU.entry
的地方,或者你不用这个,使用的注解,里面同样有这个方法
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
1.2.10. 注解是如何实现熔断降级的?
- 这个其实是比较容易理解的,既然通过
SphU.entry
包裹可以实现熔断降级,通过注解的形式包裹代码方法应该是比较容易的,那么在哪里实现和配置的呢 - 看过我前一篇文章的应该看到了,有存在如下配置
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
pushlish();
return new SentinelResourceAspect();
}
- 很明显的注解切面,通过spring注解的形式注入,我觉得这还是比较优雅的注入方式了,点进入就可以看到如下
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
对@SentinelResource
注解进行了处理
1.2.11. 什么是直接失败?
- 这个很好理解,qps超过设置的值,直接失败
1.2.12. 什么是排队等待?
-
这个似乎看字面意思很好理解,但是一旦你点了这个选项,下面还有个参数的
-
所以这个排队等待是有超时时间的,达到峰值后匀速通过,采用的漏桶算法,流控图
1.2.13. 什么是慢启动模式?
- 以下是核心算法,
Warm Up
模式不看算法细节,看它的中文说明应该就能理解是怎么回事了吧;所谓慢启动模式,要求系统的QPS请求增速不能超过一定的速率,否则会被压制超过部分请求失败,应该是为了避免一启动就有大流量的请求进入导致系统一下子就宕机卡主或直接进入了熔断
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
-
配置如下时,测试流控
-
流控图
1.2.14. 模式总结
-
你会发现直接失败和排队等待的区别在流控图上并不明显,那差别在哪呢?我重庆给个请求参数,5秒内模拟100个人轮流请求10次
-
sentinel控制台设置
-
流控图
- 总结:我设置了超时时间是5秒,而100个线程10次轮询也就是1000个请求,可以看出,它并不是一定要在5秒内解决这些请求,有了延时后,代表只要响应时间在5秒以内,不管多少请求都不会拒绝;
- 几个模式有利有弊,默认的快速失败使我们可以最大程度的控制系统的QPS,避免造成系统压力过大,但同时可能造成用于的体验效果变差
- 慢启动上面说过了
- 排队等待在设置合理的超时时间后可以最大程度的避免求情的失败,但同时可能造成线程压力过大
- 综上,在我看来排队等待模式是比较适合线上运行的,只是需要设置合理的超时时间,大公司机器不愁那就设小点,业界一般标准是200ms用户无感知,中小型可以设500ms甚至更大,看机器情况动态调整了
1.2.15. 提醒
- 像我是用apollo来持久化规则的,你也可以用nacos,redis,zookeeper等,当控制台未启动时,你启动客户端规则也会生效,只是没了控制台实时监控数据
标签:netty,addLast,chain,public,1.2,sentinel,new,连接,socket 来源: https://www.cnblogs.com/strugksjncxa/p/11256056.html