zabbix 线路质量监控自定义python ping 模块(socket+deque版),批量监控线路质量并自定义阈值联动mtr保存线路故障日志
作者:互联网
前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重
后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控
#!/usr/bin/env python3 #-*-coding:utf-8-*- from collections import deque import itertools,time import queue,json import argparse,sys,re,os,subprocess import time,socket import threading from functools import reduce ipqli=[] #避免系统权限问题没有使用ping3 class Ping: def __init__(self,ip,flag,inver=1,count=20): ip = tuple(ip) self.sip,self.tip=ip self.inver=inver self.count=count self.flag=flag restime_name = 'restime_deque'+''.join(ip).replace('.','') pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') locals()[restime_name] = deque(maxlen=60) locals()[pkloss_name] = deque(maxlen=60) self.restime_deque = locals()[restime_name] self.pkloss_deque = locals()[pkloss_name] self.ret_restime_deque = globals()[restime_name] self.ret_pkloss_deque = globals()[pkloss_name] self.compile= r'(?<=time=)\d+\.?\d+(?= ms)' def fastping(self): cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip) ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') try: value=re.findall(self.compile, ret,re.S)[0] self.restime_deque.append(value) self.pkloss_deque.append(0) except: self.pkloss_deque.append(1) self.restime_deque.append(0) def ping(self): index = 0 res_count=0 while index<self.count: start_time = time.time() cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip) ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') try: value=re.findall(self.compile, ret,re.S)[0] self.restime_deque.append(value) self.pkloss_deque.append(0) res_count+=1 except: self.pkloss_deque.append(1) self.restime_deque.append(0) index+=1 if not self.flag == len(ipqli): break usetime = time.time()-start_time sleep_time = self.inver - usetime if usetime<self.inver else self.inver time.sleep(sleep_time) return index,res_count def ping_value(self): start_time = time.time() count = self.count rescount = self.count if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2: fastli=[] for x in range(self.count): t = threading.Thread(target=self.fastping) t.start() fastli.append(t) for th in fastli: th.join() else: count,rescount = self.ping() rescount=count if rescount==0 else rescount use_time = round(time.time()-start_time,4) li = [self.restime_deque.pop() for x in range(count)] pkli = [self.pkloss_deque.pop() for x in range(count)] restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100 return (restime,pkloss,use_time) #server端代码 class Server(): def __init__(self,sock): global ipqli self.ipqli=ipqli self.thli=[] self.sock=sock self.basedir = os.path.dirname(os.path.realpath(sys.argv[0])) #server程序启动入口 @classmethod def start(cls): s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) address = ('127.0.0.1',6589) s.bind(address) s.listen(100) obj = cls(s) ping_server=threading.Thread(target=obj.server) ping_server.start() obj.thli.append(ping_server) create_t = threading.Thread(target=obj.create) create_t.start() obj.thli.append(create_t) for t in obj.thli: t.join() def server(self): while True: conn,addr = self.sock.accept() data=conn.recv(1024) data = data.decode('utf-8') data = json.loads(data) ip,item = data restime_ipq = 'restime_deque'+''.join(ip).replace('.','') pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','') if ip not in self.ipqli: globals()[restime_ipq] = deque(maxlen=10) globals()[pkloss_ipq] = deque(maxlen=10) self.ipqli.append(ip) self.sendvalue(conn,ip,item) conn.close() def create(self): create_list =[] while True: if self.ipqli: leng = len(self.ipqli) for ip in self.ipqli: t=threading.Thread(target=self.makevalue,args=(ip,leng)) t.start() create_list.append(t) for t in create_list: t.join() def makevalue(self,ip,leng): restime_name = 'restime_deque'+''.join(ip).replace('.','') pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') restime_ipq = globals()[restime_name] pkloss_ipq = globals()[pkloss_name] obj = Ping(ip,leng) while leng==len(self.ipqli): time.sleep(0.1) restime,pkloss,use_time=obj.ping_value() restime_ipq.append((restime,use_time)) pkloss_ipq.append((pkloss,use_time)) def sendvalue(self,conn,ip,item): fromat_ip=''.join(ip).replace('.','') _,tip=ip restime_name = 'restime_deque'+fromat_ip pkloss_name = 'pkloss_deque'+fromat_ip restime_ipq = globals()[restime_name] pkloss_ipq = globals()[pkloss_name] mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log' mtr_cmd = self.basedir + '/mtr.sh'+' '+tip+' '+mtr_dir if item =='restime': while True: if len(restime_ipq)>1: ret,use_time = restime_ipq.pop() hisret,_=restime_ipq[-1] if ret - hisret >20: subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) break elif item =='pkloss': while True: if len(pkloss_ipq) >1: ret,use_time = pkloss_ipq.pop() if 100> ret >20: subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) break conn.sendall(str(ret).encode()) #IP格式检查模块 class Ipcheck(): def __init__(self,sip,tip,item): self.sip =sip self.tip=tip self.item=item def check(self): if self.item not in ['restime','pkloss']: return False elif not self.checkipformat(): return False else: return True def check_fun(self,ip): return int(ip)<256 def checkipformat(self): try: tiplist = self.tip.split('.') tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip) if self.sip: siplist = self.sip.split('.') sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip) else: siplist=[1,1,1,1] sipformat=True if not tipformat or not sipformat: raise check_ipli = tiplist+siplist return self.checkiplength(check_ipli) except: return False def checkiplength(self,check_ipli): if list(itertools.filterfalse(self.check_fun, check_ipli)): return False else: return True #server端后台启动函数 def run(): base_dir = os.path.dirname(os.path.realpath(sys.argv[0])) cmd = 'python3 %s/newpingd.py -S server'%base_dir subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) #客户端代码 def socket_client(ip,item): try: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',6589)) data = [ip,item] data = json.dumps(data) s.sendall(data.encode()) ret = s.recv(1024) print(ret.decode()) except: print('server will start') sys.exit(0) if __name__ == '__main__': parser = argparse.ArgumentParser(description='icmp for monitor') parser.add_argument('-S',action = 'store',dest='server') parser.add_argument('-t',action = 'store',dest='tip') parser.add_argument('-s',action = 'store',dest='sip') parser.add_argument('-I',action='store',dest='item') args= parser.parse_args() server_status_cmd = "ps -ef | grep 'newpingd.py -S server' | grep -v grep | cut -c 9-16" server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0] if not server_status: run() if args.server: Server.start() sys.exit(0) tip = args.tip sip = args.sip item = args.item ip=(sip,tip) check = Ipcheck(sip, tip, item) if not check.check(): print('ip or item error') print('-------example-----------') print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------') sys.exit(0) socket_client(ip,item)
mtr.sh
#!/usr/bin/env bash IP=$1 dir=$2 mtr -r -n -c 30 -w -b $IP >> $2
效果
Mysql 版
https://www.cnblogs.com/darkchen/p/15524856.html
标签:deque,pkloss,name,自定义,ip,self,线路,监控,restime 来源: https://www.cnblogs.com/darkchen/p/15524856.html