标签:github err 16 golang Sentinel 熔断 time sentinel 限流
一 限流-熔断-降级介绍
在分布式系统中,如果某个服务节点发生故障或者网络发生异常,都有可能导致调用方被阻塞等待,如果超时时间设置很长,调用方资源很可能被耗尽。这又导致了调用方的上游系统发生资源耗尽的情况,最终导致系统雪崩,如下情况会导致系统雪崩
【服务提供者不可用】:硬件故障;程序bug;缓存击穿;用户大量请求
【重试导致加大请求流量】:重试机制导致过多重试;代码问题重试
【服务调用者不可用】:同步等待造成的资源耗尽
所以我们可以使用如下机制来解决
【扩机器】:增加机器数量;增加机器硬件
【流量控制】:限流;关闭重试
【缓存】:预先加载缓存
【服务降级】:服务器压力剧增,对非核心业务流程进行降级,保证核心功能可用,暂停其他业务保证自身核心业务正常运行
【服务熔断】:下游服务不可用或者响应过慢,切断调用链路直接返回结果,保证自身服务的可用性
1.1 限流
当系统的处理能力不能应对外部请求的突增流量时,为了不让系统奔溃,必须采取限流的措施
1.1.1 限流指标
TPS
系统吞吐量是衡量系统性能的关键指标,按照事务的完成数量来限流是最合理的。
但是对分布式系统来说,按照事务来限流并不现实。在分布式系统中完成一笔事务需要多个系统的配合。比如我们在电商系统购物,需要订单、库存、账户、支付等多个服务配合完成,有的服务需要异步返回,这样完成一笔事务花费的时间可能会很长。如果按照TPS来进行限流,时间粒度可能会很大大,很难准确评估系统的响应性能。
HPS
每秒请求数,指每秒钟服务端收到客户端的请求数量。
如果一个请求完成一笔事务,那TPS和HPS是等同的。但在分布式场景下,完成一笔事务可能需要多次请求,所以TPS和HPS指标不能等同看待
QPS
服务端每秒能够响应的客户端查询请求数量。
如果后台只有一台服务器,那 HPS 和 QPS 是等同的。但是在分布式场景下,每个请求需要多个服务器配合完成响应
目前主流的限流方法多采用 HPS 作为限流指标
1.1.12 限流方法
流量计数器
这是最简单直接的方法,比如限制每秒请求数量 100,超过 100 的请求就拒绝掉。
但是这个方法存在明显的问题:
1 单位时间(比如 1s)很难把控
从下面标注的可以看出HPS 没有超过 100,但是从上面的标注可以看出,HPS超过100了
2 有一段时间流量超了,也不一定真的需要限流
系统 HPS 限制 50,虽然前 3s 流量超了,但是如果读超时时间设置为 5s,并不需要限流
滑动时间窗口
滑动时间窗口算法是目前比较流行的限流算法,主要思想是把时间看做是一个向前滚动的窗口,如下图
开始的时候,我们把 t1~t5 看做一个时间窗口,每个窗口 1s,如果我们定的限流目标是每秒 50 个请求,那 t1~t5 这个窗口的请求总和不能超过 250 个。
这个窗口是滑动的,下一秒的窗口成了 t2~t6,这时把 t1 时间片的统计抛弃,加入 t6 时间片进行统计。这段时间内的请求数量也不能超过 250 个。
滑动时间窗口的优点是解决了流量计数器算法的缺陷,但是也有 2 个问题:
- 流量超过就必须抛弃或者走降级逻辑
- 对流量控制不够精细,不能限制集中在短时间内的流量,也不能削峰填谷
漏桶算法
在客户端的请求发送到服务器之前,先用漏桶缓存起来,这个漏桶可以是一个长度固定的队列,这个队列中的请求均匀的发送到服务端。
如果客户端的请求速率太快,漏桶的队列满了,就会被拒绝掉,或者走降级处理逻辑。这样服务端就不会受到突发流量的冲击。
漏桶算法的优点是实现简单,可以使用消息队列来削峰填谷。
但是也有 3 个问题需要考虑:
- 漏桶的大小,如果太大,可能给服务端带来较大处理压力,太小可能会有大量请求被丢弃。
- 漏桶给服务端的请求发送速率。
- 使用缓存请求的方式,会使请求响应时间变长。
漏桶大小和发送速率这 2 个值在项目上线初期都会根据测试结果选择一个值,但是随着架构的改进和集群的伸缩,这 2 个值也会随之发生改变。
令牌桶算法
令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”
令牌桶算法解决了漏桶算法的问题,而且实现并不复杂,使用信号量就可以实现。在实际限流场景中使用最多
1.2 熔断
相信大家对断路器路器并不陌生,它就相当于一个开关,打开后可以阻止流量通过。比如保险丝,当电流过大时,就会熔断,从而避免元器件损坏。
服务熔断是指调用方访问服务时通过断路器做代理进行访问,断路器会持续观察服务返回的成功、失败的状态,当失败超过设置的阈值时断路器打开,请求就不能真正地访问到服务了
1.2.1 断路器的状态
断路器有 3 种状态:
- CLOSED:默认状态。断路器观察到请求失败比例没有达到阈值,断路器认为被代理服务状态良好。
- OPEN:断路器观察到请求失败比例已经达到阈值,断路器认为被代理服务故障,打开开关,请求不再到达被代理的服务,而是快速失败。
- HALF OPEN:断路器打开后,为了能自动恢复对被代理服务的访问,会切换到半开放状态,去尝试请求被代理服务以查看服务是否已经故障恢复。如果成功,会转成 CLOSED 状态,否则转到 OPEN 状态
1.2.2 需要考虑的问题
使用断路器需要考虑一些问题:
- 针对不同的异常,定义不同的熔断后处理逻辑。
- 设置熔断的时长,超过这个时长后切换到 HALF OPEN 进行重试。
- 记录请求失败日志,供监控使用。
- 主动重试,比如对于 connection timeout 造成的熔断,可以用异步线程进行网络检测,比如 telenet,检测到网络畅通时切换到 HALF OPEN 进行重试。
- 补偿接口,断路器可以提供补偿接口让运维人员手工关闭。
- 重试时,可以使用之前失败的请求进行重试,但一定要注意业务上是否允许这样做。
1.2.3 使用场景
- 服务故障或者升级时,让客户端快速失败
- 失败处理逻辑容易定义
- 响应耗时较长,客户端设置的 read timeout 会比较长,防止客户端大量重试请求导致的连接、线程资源不能释放
1.3 降级
降级也就是服务降级,当我们的服务器压力剧增为了保证核心功能的可用性 ,而选择性的降低一些功能的可用性,或者直接关闭该功能。这就是典型的丢车保帅了。
就比如贴吧类型的网站,当服务器吃不消的时候,可以选择把发帖功能关闭,注册功能关闭,改密码,改头像这些都关了,为了确保登录和浏览帖子这种核心的功能。
1.4 总结
拿下棋比喻:
限流: 相当于尽量避免同时和两三个人同时下
熔断:相当于你的一颗卒被围死了,就不要利用其它棋去救它了,弃卒保帅,否则救他的棋也可能被拖死
降级:相当于尽量不要走用处不大的棋了,浪费走棋机会(资源),使已经过河的棋有更多的走棋机会(资源)发挥最大作用
二 熔断限流技术选型
2.1 Hystrix
github地址:https://github.com/Netflix/Hystrix/
stars数:22.7k
2.2 sentinel
Sentinel是阿里面向云原生开发的微服务流量控制、熔断降级的开源组件。
github地址:https://github.com/alibaba/sentinel
stars数:19.3k
文档:https://sentinelguard.io/zh-cn/docs/quick-start.html
go语言支持:https://github.com/alibaba/sentinel-golang
2.2.1 Sentinel历史
- 2012 年,Sentinel 诞生,主要功能为入口流量控制。
- 2013-2017 年,Sentinel 在阿里巴巴集团内部迅速发展,成为基础技术模块,覆盖了所有的核心场景。Sentinel 也因此积累了大量的流量归整场景以及生产实践。
- 2018 年,Sentinel 开源,并持续演进。
- 2019 年,Sentinel 朝着多语言扩展的方向不断探索,推出 C++ 原生版本,同时针对 Service Mesh 场景也推出了 Envoy 集群流量控制支持,以解决 Service Mesh 架构下多语言限流的问题。
- 2020 年,推出 Sentinel Go 版本,继续朝着云原生方向演进。
- 2021 年,Sentinel 正在朝着 2.0 云原生高可用决策中心组件进行演进;同时推出了 Sentinel Rust 原生版本。同时我们也在 Rust 社区进行了 Envoy WASM extension 及 eBPF extension 等场景探索。
2.2.2 Sentinel优势
Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
官网是这样介绍的
Sentinel 具有以下特征:
- 丰富的应用场景:
Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀,即突发流量控制在系统容量可以承受的范围;消息削峰填谷;实时熔断下游不可用应用,等等。 - 完备的监控功能:
Sentinel同时提供最实时的监控功能,您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。 - 简单易用的扩展点:
Sentinel提供简单易用的扩展点,您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理,适配数据源等。
Sentinel功2.2.3 能和设计理念
流量控制
流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:
流量控制有以下几个角度:
- 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;
- 运行指标,例如 QPS、线程池、系统负载等;
- 控制的效果,例如直接限流、冷启动、排队等。
Sentinel 的设计理念是让您自由选择控制的角度,并进行灵活组合,从而达到想要的效果。
熔断降级
什么是熔断降级
除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。这个问题和 Hystrix 里面描述的问题是一样的。
Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
熔断降级设计理念
在限制的手段上,Sentinel 和 Hystrix 采取了完全不一样的方法。
Hystrix 通过线程池的方式,来对依赖(在我们的概念中对应资源)进行了隔离。这样做的好处是资源和资源之间做到了最彻底的隔离。缺点是除了增加了线程切换的成本,还需要预先给各个资源做线程池大小的分配。
Sentinel 对这个问题采取了两种手段:
- 通过并发线程数进行限制
和资源池隔离的方法不同,Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。这样不但没有线程切换的损耗,也不需要您预先分配线程池的大小。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的线程完成任务后才开始继续接收请求。
- 通过响应时间对资源进行降级
除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。
系统负载保护
Sentinel 同时提供系统维度的自适应保护能力。防止雪崩,是系统防护中重要的一环。当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。在集群环境下,网络负载均衡会把本应这台机器承载的流量转发到其它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,这个增加的流量就会导致这台机器也崩溃,最后导致整个集群不可用。
针对这个情况,Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
三 sentinel-golang使用
3.1 流量控制
官方示例:https://github.com/alibaba/sentinel-golang/tree/master/example/flow
3.1.1 按qps控制
https://github.com/alibaba/sentinel-golang/blob/master/example/flow/qps/qps_limit_example.go
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
Threshold: 10, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
ch := make(chan struct{})
for i := 0; i < 12; i++ {
go func() {
// base.Inbound表示入口流量控制,Outbound表示出口流量控制
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// Blocked. We could get the block reason from the BlockError.
fmt.Println("失败")
} else {
// Passed, wrap the logic here.
fmt.Println("通过")
// Be sure the entry is exited finally.
e.Exit()
}
}()
}
<-ch
}
// 可以看到通过10个,失败2个
Resource
:资源名,即规则的作用目标。TokenCalculateStrategy
: 当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值。ControlBehavior
: 表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。Threshold
: 表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控。RelationStrategy
: 调用关系限流策略,CurrentResource表示使用当前规则的resource做流控;AssociatedResource表示使用关联的resource做流控,关联的resource在字段RefResource
定义;RefResource
: 关联的resource;WarmUpPeriodSec
: 预热的时间长度,该字段仅仅对WarmUp
的TokenCalculateStrategy生效;WarmUpColdFactor
: 预热的因子,默认是3,该值的设置会影响预热的速度,该字段仅仅对WarmUp
的TokenCalculateStrategy生效;MaxQueueingTimeMs
: 匀速排队的最大等待时间,该字段仅仅对Throttling
ControlBehavior生效;StatIntervalInMs
: 规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
流量控制器的控制行为Throttling
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Throttling, //Throttling表示匀速排队
Threshold: 10, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
for {
// base.Inbound表示入口流量控制,Outbound表示出口流量控制
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// Blocked. We could get the block reason from the BlockError.
fmt.Println("失败")
} else {
// Passed, wrap the logic here.
fmt.Println("通过")
// Be sure the entry is exited finally.
e.Exit()
}
//time.Sleep(100*time.Millisecond) // 100 毫秒发送一次,都能通过,如果去掉,就只能通过一个
}
}
3.1.2 预热冷启动(warm_up)
WarmUp 方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。这块设计和 Java 类似,可以参考限流-冷启动文档
通常冷启动的过程系统允许通过的 QPS 曲线如下图所示
【设置了预热冷启动,qps会以曲线的形式慢慢增加,而不是一下增加,保证了系统不会瞬间承受很大压力而挂掉】
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
"math/rand"
"time"
)
func main() {
// 定义三个变量,统计总共请求数,通过个数,失败个数
var total, pass, block int
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.WarmUp,
//WarmUpPeriodSec配置10,
//Threshold配置1000,
//表示这1000个流量,要在10s内缓慢的增加到
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.WarmUp, //使用WarmUp形式启动
ControlBehavior: flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
WarmUpPeriodSec: 10, //预热的时间长度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
Threshold: 1000, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
//WarmUpColdFactor: 预热的因子,默认是3,该值的设置会影响预热的速度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
ch := make(chan struct{})
// 启动100个协程,分别:不停的请求
for i:=0;i<100;i++{
go func() {
// 死循环
for {
total++
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
block++
} else {
pass++
e.Exit()
}
time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // 每次随机睡10毫秒以内的时间
}
}()
}
//
go func() {
// 统计过去一秒钟,总共多少个,通过多少个,block多少个
var oldTotal, oldPass, oldBlock int
for { //死循环每隔1s打印,上一秒中总共多少,通过多少,block多少
oldSecondTotal := total - oldTotal
oldTotal = total
oldSecondPass := pass - oldPass
oldPass = pass
oldSecondBlock := block - oldBlock
oldBlock = block
time.Sleep(1 * time.Second)
fmt.Printf("总共:%d,通过;%d,拒绝:%d \n", oldSecondTotal, oldSecondPass, oldSecondBlock)
}
}()
<-ch
}
3.2 熔断降级
案例:https://github.com/alibaba/sentinel-golang/tree/master/example/circuitbreaker
3.2.1 熔断器模型
3.2.2 熔断策略
Sentinel 熔断器的三种熔断策略都支持静默期 (规则中通过MinRequestAmount字段表示)。静默期是指一个最小的静默请求数,在一个统计周期内,如果对资源的请求数小于设置的静默数,那么熔断器将不会基于其统计值去更改熔断器的状态。静默期的设计理由也很简单,举个例子,假设在一个统计周期刚刚开始时候,第 1 个请求碰巧是个慢请求,这个时候这个时候的慢调用比例就会是 100%,很明显是不合理,所以存在一定的巧合性。所以静默期提高了熔断器的精准性以及降低误判可能性。
Sentinel 支持以下几种熔断策略
- 慢调用比例策略 (SlowRequestRatio):Sentinel 的熔断器不在静默期,并且慢调用的比例大于设置的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。该策略下需要设置允许的调用 RT 临界值(即最大的响应时间),对该资源访问的响应时间大于该阈值则统计为慢调用。
- 错误比例策略 (ErrorRatio):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常的比例大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。
- 错误计数策略 (ErrorCount):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常数大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断
3.2.3 基于错误数量的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟错误不超过50g
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
Threshold: 50, // 5s钟错误不超过50g
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 9 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
3.2.4 基于错误率的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorRatio,
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟错误比例不超过0.4
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
Threshold: 0.4, // 5s钟错误比例不超过0.4
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 6 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+20) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80+40) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
3.2.5 基于慢请求的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc", // 名字
Strategy: circuitbreaker.SlowRequestRatio, // 慢查询的侧脸
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟慢查询比例不超过0.4
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
MaxAllowedRtMs: 50, // 50毫秒以外算慢查询
Threshold: 0.5,// 5s钟慢查询比例不超过0.4
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 6 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
三 gin中集成限流
gin的initGin/sentinel.go
package initGin
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func InitSentinel() {
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
fmt.Println("sss")
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置1,StatIntervalInMs配置100,表示1s钟1个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Throttling, //Throttling表示匀速排队
Threshold: 1, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
}
gin的main.go
package main
import (
"context"
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"net/http"
//sentinelPlugin "github.com/alibaba/sentinel-golang/pkg/adapters/gin"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc_proto_demo/sentinel_demo/gin_demo/initGin"
"grpc_proto_demo/sentinel_demo/proto"
)
func main() {
initGin.InitSentinel()
r := gin.Default()
//r.Use(sentinelPlugin.SentinelMiddleware())
r.GET("/index", func(c *gin.Context) {
// 第一步:连接服务端
conn, err := grpc.Dial("127.0.0.1:50052", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
c.JSON(200, "连接服务异常")
}
defer conn.Close()
// 第二步:创建客户端调用
client := proto.NewGreeterClient(conn)
// ****限流开始****
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// 返回给前端,超过了qps
fmt.Println("失败")
c.JSON(http.StatusTooManyRequests, gin.H{
"msg": "请求过快,请稍后再试",
})
return
}
resp, err := client.SayHello(context.Background(), &proto.HelloRequest{
Name: "lqz",
Age: 19,
})
e.Exit() // 不要忘了加它
// ****限流结束****
if err != nil {
c.JSON(200, "服务器错误")
}
c.JSON(200, resp.Reply)
})
r.Run()
}
grpc/server/main.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc_proto_demo/sentinel_demo/proto"
"net"
)
type GreeterServer struct {
}
func (h GreeterServer) SayHello(ctx context.Context, in *proto.HelloRequest) (*proto.HelloResponse, error) {
// 接收客户端发送过来的数据,打印出来
fmt.Println("客户端传入的名字是:", in.Name)
fmt.Println("客户端传入的年龄是:", in.Age)
// 返回给客户端
return &proto.HelloResponse{
Reply: "服务端给你回复",
}, nil
}
// 服务端代码
func main() {
// 第一步:new一个server
g := grpc.NewServer()
// 第二步:生成一个结构体对象
s := GreeterServer{}
// 第三步: 把s注册到g对象中
proto.RegisterGreeterServer(g, &s)
// 第四步:启动服务,监听端口
lis, error := net.Listen("tcp", "0.0.0.0:50052")
if error != nil {
panic("启动服务异常")
}
g.Serve(lis)
}
proto
syntax = "proto3";
option go_package = ".;proto";
service Greeter{
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 类似于go的结构体,可以定义属性
message HelloRequest {
string name = 1; // 1 是编号,不是值
int32 age = 2;
}
// 定义一个响应的类型
message HelloResponse {
string reply =1;
}
标签:github,err,16,golang,Sentinel,熔断,time,sentinel,限流
来源: https://www.cnblogs.com/liuqingzheng/p/16367225.html
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。