手写springcloud|负载均衡 loadbalanced
作者:互联网
手写springcloud|负载均衡 loadbalanced
文章目录
loadbalanced
负载均衡
本文是模拟springcloud - ribbon
的思想进行实现,作为学习参考
客户端的负载均衡器
什么是客户端的负载均衡,即在客户端完成负载均衡的算法和调用过程
- 客户端先从注册中心获取需要访问的服务器列表
- 使用负载均衡的算法,轮询,随机,权重等等,选择其中一台服务器
- 发送请求到服务器
- 输出响应
代码实现
- pom相关依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-all</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
-
zookeeper版本3.4.14
-
本文只实现客户端代码(服务端自己暴露一个
say()
的接口即可)- 服务端可以写一个最简单的controller,将服务注册到
zookeeper
- 本地的服务端加了熔断的实现,为了避免代码复杂程序,以下部分代码可以作为参考
... @GetMapping("/say") public String say(@RequestParam("message") String message) throws InterruptedException { int value = random.nextInt(200); System.out.println("say() cost " + value + "ms"); Thread.sleep(value); System.out.println("接收到消息-say:" + message); return "Hello," + message; } ...
- 服务端可以写一个最简单的controller,将服务注册到
1.初级版本
1.1 获取服务器列表
不管是Eureka
还是zookeeper
,在注册中心都是serviceName
->{服务器1地址,服务器2地址...}
这种形式
zookeeper
springcloud
提供了服务发现客户端
统一的实现接口DiscoveryClient
package org.springframework.cloud.client.discovery;
import java.util.List;
import org.springframework.cloud.client.ServiceInstance;
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
使用依赖注入的方式获取到DiscoveryClient
的实例对象
通过discoveryClient.getInstances(serviceName)
方法可以获取到服务器列表信息
部分实现如下:
@Autowired
private DiscoveryClient discoveryClient;
...
//获取当前应用的机器列表
List<ServiceInstance> serviceInstances =discoveryClient.getInstances(currentServiceName);
...
每10秒更新一次服务器列表使用Scheduled
定时执行的线程
targetUrlsCache
中存放的是注册中心所有的serviceName和对应的服务器列表
private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();
@Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
public void updateTargetUrlsCache() {
Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
discoveryClient.getServices().forEach(serviceName -> {
//获取当前应用的机器列表
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
Set<String> newTargetUrls = serviceInstances.stream().map(
s -> s.isSecure() ?
"https://" + s.getHost() + ":" + s.getPort()
: "http://" + s.getHost() + ":" + s.getPort()
).collect(Collectors.toSet());
newTargetUrlsCache.put(serviceName, newTargetUrls);
});
this.targetUrlsCache = newTargetUrlsCache;
}
1.2 负载均衡算法和restTemplate调用
使用最简单的随机算法,最后使用spring
中的restTemplate
远程调用服务
@GetMapping("/invoke/say")
public Object invokeSay(@RequestParam("message") String message) {
//服务器列表
List<String> targetUrls = new ArrayList<>(this.targetUrls);
//选择其中一台服务器
int size = targetUrls.size();
int index = new Random().nextInt(size);
String targetUrl = targetUrls.get(index);
//RestTemplate发送请求到服务器
return restTemplate.getForObject(targetUrl + "/say?message=" + message, String.class);
}
1.3 完整的client调用类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.util.*;
import java.util.stream.Collectors;
@RestController
public class ClientController{
@Autowired
private RestTemplate restTemplate;
@Autowired
private DiscoveryClient discoveryClient;
private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();
@Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
public void updateTargetUrlsCache() {
Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
discoveryClient.getServices().forEach(serviceName -> {
//获取当前应用的机器列表
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
Set<String> newTargetUrls = serviceInstances.stream().map(
s -> s.isSecure() ?
"https://" + s.getHost() + ":" + s.getPort()
: "http://" + s.getHost() + ":" + s.getPort()
).collect(Collectors.toSet());
newTargetUrlsCache.put(serviceName, newTargetUrls);
});
this.targetUrlsCache = newTargetUrlsCache;
}
@GetMapping("/invoke/{serviceName}/say")
public Object invokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
//服务器列表
List<String> targetUrls = new ArrayList<>(this.targetUrlsCache.get(serviceName));
//轮训列表
//选择其中一台服务器
int size = targetUrls.size();
int index = new Random().nextInt(size);
String targetUrl = targetUrls.get(index);
//RestTemplate发送请求到服务器
return restTemplate.getForObject(targetUrl + "/say?message=" + message, String.class);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
1.4 启动类
import com.huajie.spring.cloud.client.annotation.EnableRestClients;
import com.huajie.spring.cloud.client.service.rest.clients.SayingRestService;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableDiscoveryClient
@EnableScheduling
public class SpringCloudClientApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SpringCloudClientApplication.class)
.web(WebApplicationType.SERVLET)
.run(args);
}
}
1.5 测试
-
启动zookeeper
-
启动两个服务端,端口号8083 8084
-
启动客户端
浏览器输入地址
http://127.0.0.1:8082/invoke/spring-cloud-server-application/say?message=xwf
多刷新几次请求
服务端8083控制台1
服务端8084控制台2
2.高级版本
改造点
- 每个服务接口写随机算法显然不科学
- 注册中心服务器列表的更新也不应该写在controller中
改造方式
- 使用拦截器的方式对
restTemplate
的调用进行拦截 - 在拦截器中做相应的负载均衡的逻辑
- 仿照
springcloud
中的@LoadBalanced
注解实现
2.1 restTemplate增加拦截器
自定义注解@CustomedLoadBalance
- 使用
@Qualifier
元注解对restTemplate
进行分类的依赖注入
import org.springframework.beans.factory.annotation.Qualifier;
import java.lang.annotation.*;
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface CustomedLoadBalzance {
}
增加拦截器
- 使用
@Autowired
+@CustomedLoadBalance
的方式获取到所有标有@CustomedLoadBalance
注解的restTemplate
对象 - 遍历循环
restTemplate
增加ClientHttpRequestInterceptor
拦截器
@Bean
public ClientHttpRequestInterceptor interceptor() {
return new LoadBalancedRequestInterceptor();
}
@Bean
@Autowired
public Object customizer(@CustomedLoadBalance Collection<RestTemplate> restTemplates, ClientHttpRequestInterceptor interceptor) {
restTemplates.forEach(restTemplate -> {
//增加拦截器
restTemplate.setInterceptors(Arrays.asList(interceptor));
}
);
return new Object();
}
2.2 实现拦截器
这里的远程调用实现没有用RestTemplate
而是用java原生的URL
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.converter.ByteArrayHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.util.*;
import java.util.stream.Collectors;
public class LoadBalancedRequestInterceptor implements ClientHttpRequestInterceptor {
@Autowired
private DiscoveryClient discoveryClient;
private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();
@Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
public void updateTargetUrlsCache() {
Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
discoveryClient.getServices().forEach(serviceName -> {
//获取当前应用的机器列表
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
Set<String> newTargetUrls = serviceInstances.stream().map(
s -> s.isSecure() ?
"https://" + s.getHost() + ":" + s.getPort()
: "http://" + s.getHost() + ":" + s.getPort()
).collect(Collectors.toSet());
newTargetUrlsCache.put(serviceName, newTargetUrls);
});
this.targetUrlsCache = newTargetUrlsCache;
}
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
//URI:/${app-name}/${uri}
URI requestUri = httpRequest.getURI();
String path = requestUri.getPath();
String[] parts = path.split("/");
if ("/".equals(path.substring(1, 1))) {
parts = path.substring(1).split("/");
}
String appName = parts[1];
String uri = parts[2];
//服务器列表
List<String> targetUrls = new ArrayList<>(this.targetUrlsCache.get(appName));
//轮训列表
//选择其中一台服务器
int size = targetUrls.size();
int index = new Random().nextInt(size);
String targetUrl = targetUrls.get(index);
//最终服务器地址
String actualUrl = targetUrl + "/" + uri + "?" + requestUri.getQuery();
//默认用jackson转会报错
List<HttpMessageConverter<?>> messageConverters = Arrays.asList(
new ByteArrayHttpMessageConverter(),
new StringHttpMessageConverter());
URL url = new URL(actualUrl);
URLConnection urlConnection = url.openConnection();
InputStream body = urlConnection.getInputStream();
HttpHeaders headers = new HttpHeaders();
return new SimpleClientHttpResponse(headers,body);
}
}
3.最终版本全部代码
3.1 客户端client controller类
@LoadBalanced
为springcloud
原版的实现lbInvokeSay
接口测试原版功能
import com.huajie.spring.cloud.client.annotation.CustomedLoadBalance;
import com.huajie.spring.cloud.client.loadbalance.LoadBalancedRequestInterceptor;
import com.huajie.spring.cloud.client.service.rest.clients.SayingRestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.util.*;
import java.util.stream.Collectors;
@RestController
public class ClientController {
@Autowired
@CustomedLoadBalance
private RestTemplate restTemplate;
@Autowired
@LoadBalanced
private RestTemplate lbRestTemplate;
@GetMapping("/invoke/{serviceName}/say")
public Object invokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
//RestTemplate发送请求到服务器
return restTemplate.getForObject("/" + serviceName + "/say?message=" + message, String.class);
}
@GetMapping("/loadbalance/invoke/{serviceName}/say")
public Object lbInvokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
return lbRestTemplate.getForObject("http://" + serviceName + "/say?message=" + message, String.class);
}
@Bean
public ClientHttpRequestInterceptor interceptor() {
return new LoadBalancedRequestInterceptor();
}
@Bean
@LoadBalanced
public RestTemplate loadBalanceRestTemplate() {
return new RestTemplate();
}
@Bean
@CustomedLoadBalance
public RestTemplate customLoadBalanceRestTemplate() {
return new RestTemplate();
}
@Bean
@Autowired
public Object customizer(@CustomedLoadBalance Collection<RestTemplate> restTemplates, ClientHttpRequestInterceptor interceptor) {
restTemplates.forEach(restTemplate -> {
//增加拦截器
restTemplate.setInterceptors(Arrays.asList(interceptor));
}
);
return new Object();
}
}
3.2 拦截器实现
和2.2拦截器实现
中的代码一致
3.3 启动类
和2.3启动类
中的代码一致
3.4 调试
测试的过程和1.5测试
一样
LoadBalancedRequestInterceptor#intercept()
这个方法调试的过程如下
- 获取服务列表,拼接
url
的过程
- 远程访问和返回
参考
腾讯课堂 咕泡学院 小马哥
标签:loadbalanced,springcloud,springframework,client,import,org,手写,new,serviceName 来源: https://blog.csdn.net/xiewenfeng520/article/details/101062993