高并发限流算法
作者:互联网
RateLimiter 令牌限流
限流算法
常见限流算法有两种:漏桶算法和令牌桶算法。
漏桶算法
漏桶算法(Leaky Bucket)
是网络世界中流量整形(Traffic Shaping)
或速率限制(Rate Limiting)
时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。
漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。 在网络中,漏桶算法可以控制端口的流量输出速率,平滑网络上的突发流量,实现流量整形,从而为网络提供一个稳定的流量。
如下图所示,把请求比作是水,水来了都先放进桶里,并以限定的速度出水,当水来得过猛而出水不够快时就会导致水直接溢出,即拒绝服务。
可以看出,漏桶算法可以很好的控制流量的访问速度,一旦超过该速度就拒绝服务。
令牌桶算法
令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
如下图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
令牌桶是一种常用的流量控制技术。令牌桶本身没有丢弃和优先级策略,原理:
1.令牌以一定的速率放入桶中。
2.每个令牌允许源发送一定数量的比特。
3.发送一个包,流量调节器就要从桶中删除与包大小相等的令牌数。
4.如果没有足够的令牌发送包,这个包就会等待直到有足够的令牌(在整形器的情况下)或者包被丢弃,也有可能被标记更低的DSCP(在策略者的情况下)。
5.桶有特定的容量,如果桶已经满了,新加入的令牌就会被丢弃。因此,在任何时候,源发送到网络上的最大突发数据量与桶的大小成比例。令牌桶允许突发,但是不能超过限制。
示例RateLimiter(Google的Guava包)正是使用的令牌桶算法。
漏桶算法和令牌桶算法的区别
漏桶算法的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。漏桶算法通常可以用于限制访问外部接口的流量,保护其他人系统,比如我们请求银行接口,通常要限制并发数。
令牌桶算法生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,可以处理瞬时流量,而且拿令牌的过程并不是消耗很大的事情。令牌桶算法通常可以用于限制被访问的流量,保护自身系统。
需要注意的是,在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制。
springboot 令牌桶算法实现
项目架构:多模块springboot项目
代码:
描述:通过Goole的Guava包中RateLimiter令牌桶算法,实现对部分接口的灵活限流
RateLimiter常用方法:
修饰符和类型 | 方法 | 描述 |
---|---|---|
double | **acquire() ** | 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits) | 从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | **create(double permitsPerSecond) ** | 根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) | 根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | **getRate() ** | 返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | **setRate(double permitsPerSecond) ** | 更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString() | 返回对象的字符表现形式 |
boolean | **tryAcquire() ** | 从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits) | 从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | **tryAcquire(int permits, long timeout, TimeUnit unit) ** | 从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | **tryAcquire(long timeout, TimeUnit unit) ** | 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
具体实现:
-
导
Guava
包//父模块pom <google.guava.version>28.2-jre</google.guava.version> <!--guava RateLimiter限流--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${google.guava.version}</version> </dependency> //子模块 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency>
-
自定义限流注解
自定义注解,方便接口的使用以及AOP切面获取默认值
import java.lang.annotation.*; /** * 限流注解 * @author Administrator */ @Inherited @Documented @Target(ElementType.METHOD)//作用方法上 @Retention(RetentionPolicy.RUNTIME) public @interface RateLimitNote { // 默认每秒放入桶中的token double limitNum() default 20; //策略名 要求唯一不重复 String name() default ""; }
-
AOP注解切点
import com.google.common.util.concurrent.RateLimiter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; @Aspect @Component @Order(3)//多个aop执行顺序 也可以实现Ordered接口 返回顺序值 public class RateLimitAspect { private static Logger logger = LogManager.getLogger(RateLimitAspect.class); //日志对象 private ConcurrentHashMap<String, RateLimiter> RATE_LIMITER = new ConcurrentHashMap<>(); private RateLimiter rateLimiter; @Pointcut("@annotation(com.zy.website.utils.annotation.RateLimitNote)")//注解切点 public void serviceLimit() { } @Around("serviceLimit()") public Object around(ProceedingJoinPoint point) throws Throwable { Object obj = null; //获取拦截的方法名 Signature sig = point.getSignature(); //获取拦截的方法名 MethodSignature msig = (MethodSignature) sig; //返回被织入增加处理目标对象 Object target = point.getTarget(); //为了获取注解信息 Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes()); //获取注解信息 RateLimitNote annotation = currentMethod.getAnnotation(RateLimitNote.class); double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略 if (RATE_LIMITER.containsKey(functionName)) { rateLimiter = RATE_LIMITER.get(functionName); } else { RATE_LIMITER.put(functionName, RateLimiter.create(limitNum)); rateLimiter = RATE_LIMITER.get(functionName); } if (rateLimiter.tryAcquire()) { return point.proceed(); } else { logger.warn("请求繁忙 【限流接口:{}】", functionName); ApiReturn apiReturn = new ApiReturn();//自定义返回类 本测试使用统一的返回对象 apiReturn.setApiReturnCode(ApiReturnCode.PLEASE_AGAIN_LATER); return apiReturn; } } }
-
测试Controller层
分别对不同接口开启关闭限流,QPS不同,策略不同,不同接口可以执行同一个策略,压测
/** * 令牌限流测试 RateLimiterController * * @author * @since 2022-07-18 */ @RequestMapping("/ratelimiter") @RestController public class RateLimiterController { private static Logger logger = LogManager.getLogger(RateLimiterController.class); /** * 开启限流【10QPS,策略一】 * * @return com.zy.website.facade.ApiReturn * @author */ @GetMapping("/open") @RateLimitNote(limitNum = 10, name = "openRateLimiter1") public ApiReturn openRateLimiter1() { ApiReturn apiReturn = new ApiReturn(); logger.info("【openRateLimiter1 限流执行了....编写业务....】"); apiReturn.setApiReturnCode(ApiReturnCode.SUCCESSFUL); return apiReturn; } /** * 开启限流【10QPS,策略二】 * * @return com.zy.website.facade.ApiReturn * @author */ @GetMapping("/open2") @RateLimitNote(limitNum = 5, name = "openRateLimiter2") public ApiReturn openRateLimiter2() { ApiReturn apiReturn = new ApiReturn(); logger.info("【openRateLimiter2 限流执行了....编写业务....】"); apiReturn.setApiReturnCode(ApiReturnCode.SUCCESSFUL); return apiReturn; } /** * 未开启限流 * * @return com.zy.website.facade.ApiReturn * @author */ @GetMapping("/close") public ApiReturn closeRateLimiter() { ApiReturn apiReturn = new ApiReturn(); logger.info("【closeRateLimiter 接口未开启限流....编写业务....】"); apiReturn.setApiReturnCode(ApiReturnCode.SUCCESSFUL); return apiReturn; } }
-
压测结果
压测策略一:【QPS 50 】
实际场景
接口被第三方系统大数据量,多渠道高并发调用,为不影响改系统下的其它业务进行限流措施,通过控制线程池创建线程数和RateLimiter令牌数来平衡QPS
标签:令牌,RateLimiter,并发,算法,限流,漏桶,ApiReturn 来源: https://www.cnblogs.com/MrYuChen-Blog/p/16490937.html