其他分享
首页 > 其他分享> > 手写springcloud|负载均衡 loadbalanced

手写springcloud|负载均衡 loadbalanced

作者:互联网

手写springcloud|负载均衡 loadbalanced

文章目录

loadbalanced 负载均衡

本文是模拟springcloud - ribbon的思想进行实现,作为学习参考

客户端的负载均衡器

什么是客户端的负载均衡,即在客户端完成负载均衡的算法和调用过程

  1. 客户端先从注册中心获取需要访问的服务器列表
  2. 使用负载均衡的算法,轮询,随机,权重等等,选择其中一台服务器
  3. 发送请求到服务器
  4. 输出响应
    [外链图片转存失败(img-ipI3A3zf-1568965693218)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568879482949.png)]

代码实现

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
 </parent>

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zookeeper-all</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

1.初级版本

1.1 获取服务器列表

不管是Eureka还是zookeeper ,在注册中心都是serviceName->{服务器1地址,服务器2地址...}这种形式
[外链图片转存失败(img-T5jW36nS-1568965693219)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568879891799.png)]

zookeeper
[外链图片转存失败(img-jKeoRqBj-1568965693220)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568884943514.png)]
springcloud提供了服务发现客户端统一的实现接口DiscoveryClient

package org.springframework.cloud.client.discovery;

import java.util.List;
import org.springframework.cloud.client.ServiceInstance;

public interface DiscoveryClient {
	String description();
	List<ServiceInstance> getInstances(String serviceId);
	List<String> getServices();
}

使用依赖注入的方式获取到DiscoveryClient的实例对象

通过discoveryClient.getInstances(serviceName)方法可以获取到服务器列表信息

部分实现如下:

@Autowired
private DiscoveryClient discoveryClient;
...
//获取当前应用的机器列表
List<ServiceInstance> serviceInstances =discoveryClient.getInstances(currentServiceName);
...

每10秒更新一次服务器列表使用Scheduled定时执行的线程

targetUrlsCache中存放的是注册中心所有的serviceName和对应的服务器列表

    private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();

    @Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
    public void updateTargetUrlsCache() {
        Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
        discoveryClient.getServices().forEach(serviceName -> {
            //获取当前应用的机器列表
            List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
            Set<String> newTargetUrls = serviceInstances.stream().map(
                    s -> s.isSecure() ?
                            "https://" + s.getHost() + ":" + s.getPort()
                            : "http://" + s.getHost() + ":" + s.getPort()
            ).collect(Collectors.toSet());
            newTargetUrlsCache.put(serviceName, newTargetUrls);
        });
        this.targetUrlsCache = newTargetUrlsCache;
    }

1.2 负载均衡算法和restTemplate调用

使用最简单的随机算法,最后使用spring中的restTemplate远程调用服务

    @GetMapping("/invoke/say")
    public Object invokeSay(@RequestParam("message") String message) {
        //服务器列表
        List<String> targetUrls = new ArrayList<>(this.targetUrls);
        //选择其中一台服务器
        int size = targetUrls.size();
        int index = new Random().nextInt(size);
        String targetUrl = targetUrls.get(index);
        //RestTemplate发送请求到服务器
        return restTemplate.getForObject(targetUrl + "/say?message=" + message, String.class);
    }

1.3 完整的client调用类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import java.util.*;
import java.util.stream.Collectors;

@RestController
public class ClientController{

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private DiscoveryClient discoveryClient;

    private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();

    @Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
    public void updateTargetUrlsCache() {
        Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
        discoveryClient.getServices().forEach(serviceName -> {
            //获取当前应用的机器列表
            List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
            Set<String> newTargetUrls = serviceInstances.stream().map(
                    s -> s.isSecure() ?
                            "https://" + s.getHost() + ":" + s.getPort()
                            : "http://" + s.getHost() + ":" + s.getPort()
            ).collect(Collectors.toSet());
            newTargetUrlsCache.put(serviceName, newTargetUrls);
        });
        this.targetUrlsCache = newTargetUrlsCache;
    }

    @GetMapping("/invoke/{serviceName}/say")
    public Object invokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
        //服务器列表
        List<String> targetUrls = new ArrayList<>(this.targetUrlsCache.get(serviceName));
        //轮训列表
        //选择其中一台服务器
        int size = targetUrls.size();
        int index = new Random().nextInt(size);
        String targetUrl = targetUrls.get(index);
        //RestTemplate发送请求到服务器
        return restTemplate.getForObject(targetUrl + "/say?message=" + message, String.class);
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

1.4 启动类

import com.huajie.spring.cloud.client.annotation.EnableRestClients;
import com.huajie.spring.cloud.client.service.rest.clients.SayingRestService;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableDiscoveryClient
@EnableScheduling
public class SpringCloudClientApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SpringCloudClientApplication.class)
                .web(WebApplicationType.SERVLET)
                .run(args);
    }

}

1.5 测试

  1. 启动zookeeper

  2. 启动两个服务端,端口号8083 8084
    [外链图片转存失败(img-yFDB3ALw-1568965693220)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568884427623.png)]

  3. 启动客户端

浏览器输入地址

http://127.0.0.1:8082/invoke/spring-cloud-server-application/say?message=xwf

多刷新几次请求
在这里插入图片描述
服务端8083控制台1
[外链图片转存失败(img-8xWrF8Hw-1568965693221)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568885008683.png)]

服务端8084控制台2
[外链图片转存失败(img-wrAPB2vZ-1568965693221)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568885026223.png)]

2.高级版本

改造点

改造方式

2.1 restTemplate增加拦截器

自定义注解@CustomedLoadBalance

import org.springframework.beans.factory.annotation.Qualifier;

import java.lang.annotation.*;

@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface CustomedLoadBalzance {
}

增加拦截器

    @Bean
    public ClientHttpRequestInterceptor interceptor() {
        return new LoadBalancedRequestInterceptor();
    }

    @Bean
    @Autowired
    public Object customizer(@CustomedLoadBalance Collection<RestTemplate> restTemplates, ClientHttpRequestInterceptor interceptor) {
        restTemplates.forEach(restTemplate -> {
                    //增加拦截器
                    restTemplate.setInterceptors(Arrays.asList(interceptor));
                }
        );
        return new Object();
    }

2.2 实现拦截器

这里的远程调用实现没有用RestTemplate而是用java原生的URL

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.converter.ByteArrayHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.util.*;
import java.util.stream.Collectors;

public class LoadBalancedRequestInterceptor implements ClientHttpRequestInterceptor {

    @Autowired
    private DiscoveryClient discoveryClient;

    private volatile Map<String, Set<String>> targetUrlsCache = new HashMap<>();

    @Scheduled(fixedRate = 10 * 1000)//10秒更新一次缓存
    public void updateTargetUrlsCache() {
        Map<String, Set<String>> newTargetUrlsCache = new HashMap<>();
        discoveryClient.getServices().forEach(serviceName -> {
            //获取当前应用的机器列表
            List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
            Set<String> newTargetUrls = serviceInstances.stream().map(
                    s -> s.isSecure() ?
                            "https://" + s.getHost() + ":" + s.getPort()
                            : "http://" + s.getHost() + ":" + s.getPort()
            ).collect(Collectors.toSet());
            newTargetUrlsCache.put(serviceName, newTargetUrls);
        });
        this.targetUrlsCache = newTargetUrlsCache;
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        //URI:/${app-name}/${uri}
        URI requestUri = httpRequest.getURI();
        String path = requestUri.getPath();
        String[] parts = path.split("/");
        if ("/".equals(path.substring(1, 1))) {
            parts = path.substring(1).split("/");
        }
        String appName = parts[1];
        String uri = parts[2];
        //服务器列表
        List<String> targetUrls = new ArrayList<>(this.targetUrlsCache.get(appName));
        //轮训列表
        //选择其中一台服务器
        int size = targetUrls.size();
        int index = new Random().nextInt(size);
        String targetUrl = targetUrls.get(index);
        //最终服务器地址
        String actualUrl = targetUrl + "/" + uri + "?" + requestUri.getQuery();
        //默认用jackson转会报错
        List<HttpMessageConverter<?>> messageConverters = Arrays.asList(
                new ByteArrayHttpMessageConverter(),
                new StringHttpMessageConverter());
        URL url = new URL(actualUrl);
        URLConnection urlConnection = url.openConnection();
        InputStream body = urlConnection.getInputStream();
        HttpHeaders headers = new HttpHeaders();
        return new SimpleClientHttpResponse(headers,body);
    }
}

3.最终版本全部代码

3.1 客户端client controller类

import com.huajie.spring.cloud.client.annotation.CustomedLoadBalance;
import com.huajie.spring.cloud.client.loadbalance.LoadBalancedRequestInterceptor;
import com.huajie.spring.cloud.client.service.rest.clients.SayingRestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import java.util.*;
import java.util.stream.Collectors;

@RestController
public class ClientController {

    @Autowired
    @CustomedLoadBalance
    private RestTemplate restTemplate;

    @Autowired
    @LoadBalanced
    private RestTemplate lbRestTemplate;

    @GetMapping("/invoke/{serviceName}/say")
    public Object invokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
        //RestTemplate发送请求到服务器
        return restTemplate.getForObject("/" + serviceName + "/say?message=" + message, String.class);
    }

    @GetMapping("/loadbalance/invoke/{serviceName}/say")
    public Object lbInvokeSay(@RequestParam("message") String message, @PathVariable("serviceName") String serviceName) {
        return lbRestTemplate.getForObject("http://" + serviceName + "/say?message=" + message, String.class);
    }

    @Bean
    public ClientHttpRequestInterceptor interceptor() {
        return new LoadBalancedRequestInterceptor();
    }

    @Bean
    @LoadBalanced
    public RestTemplate loadBalanceRestTemplate() {
        return new RestTemplate();
    }

    @Bean
    @CustomedLoadBalance
    public RestTemplate customLoadBalanceRestTemplate() {
        return new RestTemplate();
    }

    @Bean
    @Autowired
    public Object customizer(@CustomedLoadBalance Collection<RestTemplate> restTemplates, ClientHttpRequestInterceptor interceptor) {
        restTemplates.forEach(restTemplate -> {
                    //增加拦截器
                    restTemplate.setInterceptors(Arrays.asList(interceptor));
                }
        );
        return new Object();
    }

}

3.2 拦截器实现

2.2拦截器实现中的代码一致

3.3 启动类

2.3启动类中的代码一致

3.4 调试

测试的过程和1.5测试一样

LoadBalancedRequestInterceptor#intercept()这个方法调试的过程如下

  1. 获取服务列表,拼接url的过程
    [外链图片转存失败(img-5a7s2qQZ-1568965693222)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568889811541.png)]
  2. 远程访问和返回
    [外链图片转存失败(img-vTqqcoto-1568965693222)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1568889956237.png)]

参考

腾讯课堂 咕泡学院 小马哥

标签:loadbalanced,springcloud,springframework,client,import,org,手写,new,serviceName
来源: https://blog.csdn.net/xiewenfeng520/article/details/101062993