SpringCloud gateway源码走读(顺带聊聊响应式)
作者:互联网
1.概述
最近公司海外要搭建一套网关系统,调研相关开源组件,最终选择了springcloud gateway(scg)的网关框架。为了更好的使用,便走读了其核心源代码。本文对其重点源码就行剖析。scg是基于spring webflux实现的。如果 响应式基础不错的话会更容易理解。
2.聊聊响应式(事件循环)
其实对响应式开发一直很感兴趣,如果整个链路都支持响应式api,那么对于cpu的吞吐量会有很大的提升。毕竟更少的线程能负载高的qps,是非常吸引人的事情。说个玩笑话,对于泛娱乐c端业务,完全可以采用此架构,只不过学习成本确实很高,这一点就可以把这个方案否决掉。但是对于网关来讲,webflux还是非常迷人的。至少对我公司业务,网关鉴权通过响应式的rpc call可以大幅提升吞吐量。
两个问题:为什么可以减少线程数?是否可以提高响应速度?
响应式确实不能降低一个请求的处理时间,比如一个请求查询数据库300ms,这个除非在db优化,不然没有人能降低这个时间。响应式也是一个道理。那么响应式有什么用,这也就来到第二个问题。
非响应式的场景
对于一个http请求,接受请求是tomcat线程(默认500个),在处理redis操作的时候,用的redis的连接池,但是redis要经过一次网络io,阻塞的过程中,这个tomcat线程什么都干不了。
响应式
netty的eventloop接收http请求,执行redis操作的时候通过redis响应式API,eventloop会立刻返回(将数据丢给redis客户端线程),redis底层基于epol进行非阻塞IO,redis客户端线程将数据丢给网络然后返回,这个过程都非阻塞的。等redis 处理完成之后,epoll的机制去通知redis线程执行回调时间,完成整个网络。整个执行过程中线程各司其职(完全cpu密集操作),eventloop只负责接收以及传递,redis负责丢网络以及监听返回。当然你可以理解将阻塞交给了操作系统。
3.源码解析
底层通过netty启动的http服务器。所有请求进来之后先由DispatcherHandler的handle方法处理。handle方法会调用所有HandlerMapping的getHandlerInternal方法,选择一个支持该请求的Handler执行后续逻辑。
RoutePredicateHandlerMapping
我们重点关心scg实现的HandlerMapping:
RoutePredicateHandlerMapping
@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}
RoutePredicateHandlerMapping的getHandlerInternal会根据配置的路由信息对请求进行匹配,如果有一个配置的路由和请求匹配成功(该请求为我们gateway路由的请求),则将该路由信息绑定到请求,最终返回FilteringWebHandler。执行后续逻辑。
RoutePredicateHandlerMapping#lookupRoute
通过该方法迭代所有路由进行判断。
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
DispatcherHandler#invokeHandler
处理路由结束并返回FilteringWebHandler,DispatcherHandler会执行invokeHandler方法。
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
该方法遍历所有适配器,执行对应的handle方法。如果是gateway路由的请求,则会由SimpleHandlerAdapter处理。也就是调用SimpleHandlerAdapter的handle方法。该方法其实就是调用FilteringWebHandler的handle方法。
FilteringWebHandler#handle
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
这个方法主要是执行所有过滤器的逻辑。scg有两种过滤器,一个是GlobalFilter,一个是GatewayFilter。在用户上的区别:GatewayFilter是针对某个或者某组路由,而GlobalFilter是所有路由都会执行。
GlobalFilter在FilteringWebHandler初始化的时候调用loadFilters加载。核心其实就是将其包装成GatewayFilter存储到FilteringWebHandler的全局变量中。
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
在FilteringWebHandler的handle执行的时候,会从路由中取出对应路由配置的GatewayFilter。构造成DefaultGatewayFilterChain(filter链)执行。
后续其实就是执行过滤逻辑。过滤器可以自己实现,gateway也有默认的实现。比如转发代理服务的过滤器NettyRoutingFilter。以及负责负载均衡的过滤器
ReactiveLoadBalancerClientFilter。
ReactiveLoadBalancerClientFilter其实就是根据路由选择对应的负载均衡实现,返回一个可用的server。当然有意义的是LoadBalancerLifecycle。在完成后调用成功或者失败都会回调LoadBalancerLifecycle的对应方法。我们可以做一些监控等等。
因为是业务初期,对于熔断降级限流的事情还没有做。后续会详细介绍
4.总结
scg的代码并不多,只不过大部分是响应式代码,读起来比较吃力,但是如果熟悉之后还是比较简单的。
作者:大远哥
链接:https://juejin.cn/post/7044447160821088263
来源:稀土掘金
标签:return,exchange,走读,route,redis,源码,SpringCloud,handle,路由 来源: https://blog.csdn.net/weixin_60707895/article/details/122225092