流量采集任务分配算法
作者:互联网
任务分配算法,主要是轮询然后是 MAP利用数据结构, KEY里封装对象 再追加list 如
Map<String, List<String>> old_map = new HashMap();
old_map.get(same.getIsoCode()).add(nodeSame.getCountryIp());
对于任务分配 非常有帮助
/*
* Zenlayer.com Inc.
* Copyright (c) 2014-2019 All Rights Reserved.
*/
package com.zenlayer.ad.nodetool;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.City;
import com.maxmind.geoip2.record.Continent;
import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Postal;
import com.maxmind.geoip2.record.Subdivision;
import com.zenlayer.ad.jedis.RedisClient;
import com.zenlayer.ad.mapper.SnmpMapper;
import com.zenlayer.util.DataTransformation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author jack.li
* @date 2019-10-30 18:07:30
* @version $ Id: NodeToolService.java, v 0.1 admin Exp $
*<1> 采用新的分配节点任务,按区域划分,通过IP自动获取所在地区 (BMC 交换机的分配方式)
*
*<2> 按着IP,平均分配到所有存活的节点上
*/
@Component("nodeToolService")
public class NodeToolService {
@Resource
RedisClient redisclient;
@Autowired
private SnmpMapper snmpMapper;
@Autowired
NodeIdcService nodeIdcService;
//默认洛杉机的节点,备份采集所有失败的交换机
public static String laxIp = "68.8.8.194";
public boolean getSnmpAll() throws IOException, GeoIp2Exception {
//1 所有内网IP 不进行地址解析, 2 另外分配所有失败交换机的备选节点
List<Map<String, Object>> innerAllIpList = new ArrayList<>();
//所有公网IP,需要解析地区位置,分配采集任务
List<Map<String, Object>> publicIp = new ArrayList<>();
//采集服务注册的IP,往这里分配交换机IP
List<CountryModel> nodelist = new ArrayList<>();
//中国的IP,记录下来,跳过此IP去分配内网IP,用LIST防止有多个中国服务器
List<String> cn_ip_list = new ArrayList();
Map<String, List<Map<String, Object>>> snmpAll = new HashMap<>();
Map<String, List<Map<String, Object>>> bmclist = null;
Map<String, List<Map<String, Object>>> idclist = null;
try {
AllIplistTask(publicIp, innerAllIpList, nodelist, cn_ip_list);
if (publicIp.size() > 0 && nodelist.size() > 0) {
bmclist = nodeIPtask(publicIp, innerAllIpList, nodelist, cn_ip_list);
}
if (nodelist.size() > 0) {
idclist = nodeIdcService.getIdcNodeService(nodelist, innerAllIpList);
nodelist.stream().forEach(countryModel -> {
snmpAll.put(countryModel.getCountryIp(), new ArrayList<>());
});
bmclist.forEach((k, v) -> {
snmpAll.get(k).addAll(v);
});
idclist.forEach((k, v) -> {
snmpAll.get(k).addAll(v);
});
snmpAll.forEach((s, maps) -> {
// JSONArray arraylist = new JSONArray(Collections.singletonList(maps));
redisclient.set(s, DataTransformation.parseListForMapsToJsonArrayStr(maps).toJSONString(), 0);
//System.out.println(DataTransformation.parseListForMapsToJsonArrayStr(maps).size());
});
}
//内网地直 直接分配到洛杉机,采集专用所有
if (innerAllIpList.size() > 0) {
redisclient.set(laxIp, DataTransformation.parseListForMapsToJsonArrayStr(innerAllIpList).toJSONString(), 0);
}
System.out.println("====================================:SNMP任务调度结束:====================================:");
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
//封装所有IP区域 过滤的方法
public void AllIplistTask(List publicIps, List innerAllIpList, List nodelists, List cn_ip_lists) throws IOException {
File database = getFileGeo();
DatabaseReader reader = new DatabaseReader.Builder(database).build();
List<Map<String, Object>> countryList = new ArrayList<>();
List<Set<Map<String, Object>>> listset = new ArrayList(snmpMapper.getSnmpList());
List<Map<String, Object>> list = new ArrayList(listset);
//过滤掉所有为空的IP
List<Map<String, Object>> filtelistIP = null;
filtelistIP = list.stream().filter(smap -> null != smap.get("ip") && !"".equals(smap.get("ip"))).collect(Collectors.toList());
//所有公网IP,需要解析地区位置,分配采集任务
List<Map<String, Object>> publicIp = publicIps;
filtelistIP.stream().forEach(filterIP -> {
if (internalIp(filterIP.get("ip").toString())) {
// innerAllIpList.add(filterIP);// 暂时去掉内网地址
} else {
publicIp.add(filterIP);
}
});
publicIp.stream().forEach(imp -> {
try {
InetAddress ipAddress = InetAddress.getByName(imp.get("ip").toString());
CityResponse response = reader.city(ipAddress);
Continent continent = response.getContinent();
Country country = response.getCountry();
imp.put("countryName", country.getNames().get("zh-CN"));
imp.put("isoCode", country.getIsoCode());
imp.put("countryIp", imp.get("ip"));
imp.put("code", continent.getCode());
countryList.add(imp);
} catch (IOException e) {
e.printStackTrace();
} catch (GeoIp2Exception e) {
e.printStackTrace();
}
});
//打印局域网IP
System.out.println("innerAllIpList: " + innerAllIpList.size());
System.out.println("publicIp: " + publicIp.size());
System.out.println("===================打印需要分配给采集服务器IP地理位置=====================");
//采集服务注册的IP,往这里分配交换机IP
List<CountryModel> nodelist = nodelists;
List<String> list_ip = redisclient.smembers("collet:node", 0);
//删除洛杉机节点,不分配任务,只作备选节点
list_ip.remove(laxIp);
//中国的IP记录下来
List<String> cn_ip_list = cn_ip_lists;
if (null == list_ip) {
return;
}
list_ip.stream().filter(s -> !StringUtils.isBlank(s)).forEach(nodeip -> {
try {
InetAddress ipAddress = InetAddress.getByName(nodeip);
CityResponse response = reader.city(ipAddress);
Continent continent = response.getContinent();
Country country = response.getCountry();
if (null != response) {
CountryModel model = new CountryModel();
model.setName(country.getNames().get("zh-CN"));
model.setIsoCode(country.getIsoCode());
model.setCountryIp(nodeip);
model.setCode(continent.getCode());
nodelist.add(model);
if (country.getIsoCode().equals("CN")) {
cn_ip_list.add(nodeip);
}
System.out.println(model.getName() + ", EN: " + model.getIsoCode() + " IP: " + model.getCountryIp());
} else {
System.out.println("************************* response is null: " + response + " IP: " + nodeip);
}
} catch (IOException e) {
e.printStackTrace();
} catch (GeoIp2Exception e) {
e.printStackTrace();
}
});
}
//分配给采集服务器的IP
public Map<String, List<Map<String, Object>>> nodeIPtask(List<Map<String, Object>> publicIp, List<Map<String, Object>> innerIp,
List<CountryModel> nodelist, List<String> cn_ip_list) {
//处理 没有区域交换机的IP数据,所有日本,台湾等IP
List<Map<String, Object>> publicTaskList = new ArrayList<>();
Map<String, List<Map<String, Object>>> listAllMap = new HashMap<String, List<Map<String, Object>>>();
List<CountryModel> strlist = new ArrayList<>();
//中国一个LIST 国外一个LIST,区分开来
nodelist.stream().forEach(countryModel -> {
listAllMap.put(countryModel.getCountryIp(), new ArrayList<>());
//单独给中国一个LIST
List<Map<String, Object>> publicTaskCnmap = new ArrayList<>();
// 除了中国之外的IP节点,并且有服务器区域的
List<Map<String, Object>> publicTaskAll = new ArrayList<>();
strlist.add(countryModel);
for (int s = 0; s < publicIp.size(); s++) {
if (countryModel.getIsoCode().equals(publicIp.get(s).get("isoCode")) && countryModel.getIsoCode().equals("CN")) {
publicTaskCnmap.add(publicIp.get(s));
} else if (countryModel.getIsoCode().equals(publicIp.get(s).get("isoCode"))) {
publicTaskAll.add(publicIp.get(s));
}
}
if (publicTaskCnmap.size() > 0) {
listAllMap.put(countryModel.getCountryIp(), publicTaskCnmap);
}
if (publicTaskAll.size() > 0) {
listAllMap.put(countryModel.getCountryIp(), publicTaskAll);
}
});
//采集服务器重复的IP记录,再分配, isocode相同,但是IP不相同,说明不是同一个节点, 多个服务的时候,old_ip和oldNodeip有重复,因为后面用KEY处理,重复的会覆盖
List<String> old_ip = new ArrayList<>();
Map<String, List<String>> old_map = new HashMap();
// 是否同一国家
nodelist.stream().forEach(same -> {
//同一个国家两个节点以上,分配任务,多个节点轮询分配
if (!old_map.containsKey(same.getIsoCode())) {
old_map.put(same.getIsoCode(), new ArrayList<>());
}
nodelist.stream().forEach(nodeSame -> {
if (same.getIsoCode().equals(nodeSame.getIsoCode()) && !same.getCountryIp().equals(nodeSame.getCountryIp())) {
old_ip.add(nodeSame.getCountryIp());
old_map.get(same.getIsoCode()).add(nodeSame.getCountryIp());
}
});
});
//相同国家的的IP,重新再分配,先删除重复的IP KEY健值
Map<String, List<Map<String, Object>>> sameListIsocode = new HashMap<>();
// 用重复的ID去 删除KEY值
listAllMap.forEach((s, maps) -> {
old_ip.stream().forEach(s1 -> {
if (s.equals(s1)) {
sameListIsocode.put(s, maps);
listAllMap.put(s, new ArrayList<>());
}
});
});
//平均分配两个,一个区域CN 下面有多个IP,得到一个LIST值, 轮询分到重复的IP上
old_map.forEach((code, maps) -> {
//有两个节点相同的国家
if (maps.size() > 0) {
List<Map<String, Object>> ziNode = new ArrayList<>();
ziNode = sameListIsocode.get(maps.get(0));
int f = 0;
for (int j = 0; j < ziNode.size(); j++) {
if (f >= maps.size()) {
f = 0;
}
listAllMap.get(maps.get(f)).add(ziNode.get(j));
f++;
}
}
});
//======================================== 这是处理没有区域服务器IP的数据,比如日本台湾等
publicIp.stream().forEach(smp -> {
//如果采集服务器有这个IP区域,就不进行分配,标记
boolean tagst = false;
for (int s = 0; s < strlist.size(); s++) {
if (smp.get("isoCode").equals(strlist.get(s).getIsoCode())) {
tagst = true;
}
}
if (!tagst) {
publicTaskList.add(smp);
}
});
int ns = 0;
for (int y = 0; y < publicTaskList.size(); y++) {
if (ns >= nodelist.size()) {
ns = 0;
}
listAllMap.get(nodelist.get(ns).getCountryIp()).add(publicTaskList.get(y));
ns++;
}
//内网分配到国外的采集服务器上,除了中国服务器,避免国内两台以上服务器采集, 顺序分配或轮询分配
/* int x = 0;
for (int t = 0; t < innerIp.size(); t++) {
boolean tabCn = false; //国内IP 不参与分配内网
if (x >= strlist.size()) {
x = 0;
}
for (int e = 0; e < cn_ip_list.size(); e++) {
if (listAllMap.get(strlist.get(x).getCountryIp()).equals(cn_ip_list.get(e))) {
tabCn = true;
}
}
if (!tabCn) {
listAllMap.get(strlist.get(x).getCountryIp()).add(innerIp.get(t));
}
x++;
}*/
return listAllMap;
}
//GeoIP2-City 数据库文件,测试用于IP查询位置
public static void getLiteIPinit() throws IOException, GeoIp2Exception {
File database = getFileGeo();
// this.getClass().getClassLoader().getResource("").getPath(); 得到的是 ClassPath的绝对URI路径。
// 创建 DatabaseReader对象
DatabaseReader reader = new DatabaseReader.Builder(database).build();
InetAddress ipAddress = InetAddress.getByName("18.8.6.22");
CityResponse response = reader.city(ipAddress);
Continent continent = response.getContinent();
System.out.println(continent.getCode()); // 'US'
System.out.println(continent.getName()); // 'United States'
System.out.println(continent.getNames().get("zh-CN")); // '
Country country = response.getCountry();
System.out.println(country.getIsoCode()); // 'US'
System.out.println(country.getName()); // 'United States'
System.out.println(country.getNames().get("zh-CN")); // '美国'
Subdivision subdivision = response.getMostSpecificSubdivision();
System.out.println(subdivision.getName()); // 'Minnesota'
System.out.println(subdivision.getIsoCode()); // 'MN'
City city = response.getCity();
System.out.println(city.getName()); // 'Minneapolis'
Postal postal = response.getPostal();
System.out.println(postal.getCode()); // '55455'
Location location = response.getLocation();
System.out.println(location.getLatitude()); // 44.9733
System.out.println(location.getLongitude()); // -93.2323
}
// 判断IP是否是区域网
public static boolean internalIp(String ip) {
byte[] addr = new byte[0];
try {
//将ip字符串转为byte数组,注意:ip不可以是域名,否则会进行域名解析
addr = InetAddress.getByName(ip).getAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
final byte b0 = addr[0];
final byte b1 = addr[1];
//10.x.x.x/8
final byte SECTION_1 = 0x0A;
//172.16.x.x/12
final byte SECTION_2 = (byte) 0xAC;
final byte SECTION_3 = (byte) 0x10;
final byte SECTION_4 = (byte) 0x1F;
//192.168.x.x/16
final byte SECTION_5 = (byte) 0xC0;
final byte SECTION_6 = (byte) 0xA8;
switch (b0) {
case SECTION_1:
return true;
case SECTION_2:
if (b1 >= SECTION_3 && b1 <= SECTION_4) {
return true;
}
case SECTION_5:
switch (b1) {
case SECTION_6:
return true;
}
default:
return false;
}
}
// 第二种算法,平均分配IP到所有存活节点服务上
public void avgIptask() throws IOException {
//所有内网IP 不进行地址解析
List<Map<String, Object>> innerAllIpList = new ArrayList<>();
//所有公网IP,需要解析地区位置,分配采集任务
List<Map<String, Object>> publicIp = new ArrayList<>();
//采集服务注册的IP,往这里分配交换机IP
List<CountryModel> nodelist = new ArrayList<>();
//中国的IP,记录下来,跳过此IP去分配内网IP,用LIST防止有多个中国服务器
List<String> cn_ip_list = new ArrayList();
AllIplistTask(publicIp, innerAllIpList, nodelist, cn_ip_list);
Map<String, List<Map<String, Object>>> listAllMap = new HashMap<String, List<Map<String, Object>>>();
nodelist.stream().forEach(countryModel -> {
listAllMap.put(countryModel.getCountryIp(), new ArrayList<>());
});
// 全并两个 IP 的LIST 数据
List<Map<String, Object>> allIplist = new ArrayList<>();
allIplist.addAll(publicIp);
allIplist.addAll(innerAllIpList);
int x = 0;
int nodeSize = nodelist.size();
for (int t = 0; t < allIplist.size(); t++) {
boolean tabCn = false; //国内IP 不参与分配内网
if (x >= nodeSize) {
x = 0;
}
listAllMap.get(nodelist.get(x).getCountryIp()).add(allIplist.get(t));
x++;
}
//把分配好任务的LIST保存到采集服务的IP上 放到redis 的节点上
listAllMap.forEach((s, maps) -> {
// JSONArray arraylist = new JSONArray(Collections.singletonList(maps));
// System.out.println(arraylist.get(0));
// redisclient.set(s, DataTransformation.parseListForMapsToJsonArrayStr(maps).toJSONString(), 0);
System.out.println(DataTransformation.parseListForMapsToJsonArrayStr(maps).size());
});
}
public static File getFileGeo() {
// 容器里运行的时候,找不到文件路径 本地运行用下面路径
File database = new File(Thread.currentThread().getContextClassLoader().getResource("GeoLite2-City.mmdb").getPath());
//打包上线,用下面路径
//File database = new File(System.getProperty("user.dir") + "/GeoLite2-City.mmdb");
return database;
}
public static void main(String[] args) throws IOException {
//这里因为是去读取本地的纯真库,所以有一个IO异常抛出
/* QQWry wry = new QQWry();
NodeToolService nodeTool = new NodeToolService();
List<Map<String, Object>> list = nodeTool.getSnmpAll();
List<IPZone> listipzone = new ArrayList<>();
list.stream().forEach(ipmap -> {
listipzone.add(wry.findIP(ipmap.get("ip").toString()));
});
// IPZone zone = wry.findIP("123.123.123.123");
listipzone.stream().forEach(ipZone -> System.out.println(ipZone.getMainInfo()));
// System.out.println(zone.getMainInfo());
// System.out.println(zone.getSubInfo());
测试IP 加到LIST测试区域
中国, EN: CN IP: 13
南非, EN: ZA IP: 126.60
美国, EN: US IP: 107.60
德国, EN: DE IP: 107.40
新加坡, EN: SG IP: 12.60
俄罗斯, EN: RU IP: 128.60
美国, EN: US IP: 107.49
*/
try {
getLiteIPinit();
} catch (GeoIp2Exception e) {
e.printStackTrace();
}
//180.168.61.243
//boolean ipbyte = internalIp("127.0.0.1");
//if (ipbyte) {
// System.out.println("10.68.1.243");
//} else {
// System.out.println("判断局域网IP:" + "127.0.0.1");
//}
}
}
大树168
发布了146 篇原创文章 · 获赞 28 · 访问量 16万+
私信
关注
标签:get,IP,任务分配,List,System,采集,算法,ip,new 来源: https://blog.csdn.net/limingcai168/article/details/104503391