其他分享
首页 > 其他分享> > 音频延时测算脚本

音频延时测算脚本

作者:互联网

目录

前言

	每次版本迭代需要测试音频的传输耗时,或者比较唇音不同步的时候也需要计算耗时。纯以人工计算显得很麻烦,因此写了该脚本根据日志信息计算传输耗时,并绘制散点图。
	主要思路就是通过读取终端配置信息,telnet连接上去,通过命令开启日志打印,对日志信息使用正则过滤,计算出传输的时间信息,计算差值。	

配置文件

[RemoteInfo]
host = 127.0.0.1
port = xxxx
username =  
password =   

音频延时测算

# encoding=utf-8
# 终端自动升级脚本
# --杨靖杰,2020/12/5
# 环境: python2

import re
import telnetlib
import time
import ConfigParser
import os
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

file_path = 'file_created_by_running\\'
def path_check(file_path):
    if not os.path.exists(file_path):
        os.makedirs(file_path)

def do_telnet(config, commands, filename):
    '''Telnet远程登录,并根据预置命令抓取打印保存'''

    filename1 = file_path + filename
    ff = open(filename1, 'w')
    # 连接Telnet服务器
    tn = telnetlib.Telnet(config[0], int(config[1]), timeout=10)
    # tn.set_debuglevel(2)
    # 输入登录用户名
    tn.read_until('Username:')
    # Windows程序 使用'\n\r'代表换行
    tn.write(config[2] + '\n\r')
    # 输入登录密码
    tn.read_until('Password:')
    tn.write(config[3] + '\n\r')

    # 登录完毕后执行命令
    for command in commands:
        tn.write(command + '\n\r')
    # 执行完毕后,终止Telnet连接(或输入exit退出)
    # tn.close()  # tn.write('exit\n')
    count = 0
    # 不应该实时读取解析
    # 写入文件进行分析
    while True:
        time.sleep(1)
        if count == 10:
            break

        count = count + 1
        # cmd编码格式gbk
        raw_file = tn.read_very_eager().decode('gbk').strip()
        ff.write(raw_file.encode('gbk'))
    ff.close()
    tn.write('xxxx' + '\n\r')
    tn.close()

def file_anyls(filename):
    '''根据传入的日志文件,过滤出hook1和hook4的timestamp以及对应的时间'''

    ff = open(file_path + filename, 'r')
    filename1 = filename + '_' + 'hook1'
    ff1 = open(file_path + filename1, 'w')
    filename2 = filename + '_' + 'hook4'
    ff2 = open(file_path + filename2, 'w')
    tt = ff.readlines()
    pat1 = re.compile(', ts[=].*?[,]')
    pat2 = re.compile('layer_ts[=].*?[ ]')
    pat3 = re.compile('capture_ts[=].*?[,]')
    pat4 = re.compile('now[=].*?[ ]')
    for lines in tt:
        if 'hook[1][RTP]' in lines:
            line1 = lines
            str1 = re.findall(pat1, line1)
            str2 = re.findall(pat2, line1)
            str3 = str1[0].strip(', ts=').strip(',') + ' ' + str2[0].strip('layer_ts=').strip(',') + '\n'
            ff1.write(str3)
        if 'hook[4][Frame]' in lines:
            line4 = lines.strip()
            str4 = re.findall(pat3, line4)
            str5 = re.findall(pat4, line4)
            str6 = str4[0].strip('capture_ts=').strip(',') + ' ' + str5[0].strip('now=').strip(',') + '\n'
            ff2.write(str6)
    ff.close()
    ff1.close()
    ff2.close()
    filenamelist = [filename, filename1, filename2]
    return filenamelist

def compare_join(filenamelist):
    '''实现类似linux join功能'''
    
    ff1 = open(file_path + filenamelist[1], 'r')
    ff2 = open(file_path + filenamelist[2], 'r')
    filename3 = 'compare_join'
    ff3 = open(file_path + filename3, 'w')
    tt1 = ff1.readlines()
    tt2 = ff2.readlines()
    for line2 in tt2:
        k2 = line2.strip('\n\r').split()
        for line1 in tt1:
            k1 = line1.strip('\n\r').split()
            if len(k2) != 0 and len(k1) != 0 and k1[0] == k2[0]:
                line3 = line1.strip('\n\r') + k2[1] + '\n'
                ff3.write(line3)
                break
    ff1.close()
    ff2.close()
    ff3.close()
    return filename3

def createList(filename):
    '''根据传入的过滤好的文件,计算第二列和第三列的差值,并返回一个list'''

    numlist = []
    count = 0
    ff = open(file_path + filename, 'r')
    tt = ff.readlines()
    ff.close()
    ff1 = open(file_path + 'numlist', 'w')
    for line in tt:
        count = count + 1
        if count == 1:
            continue
        en = line.strip().split(' ')
        if len(en) == 3:
            dd = int(en[2], 10) - int(en[1], 10)
            numlist.append(dd)
            ff1.write(str(dd) + '\n')

    ff1.close()
    return numlist

def numAlys(numlist):
    '''计算传入list的max, min, avg的值'''

    print "max:", max(numlist), "min:", min(numlist), "avg:", (sum(numlist)/len(numlist))

def generate_statistical_graph(numlist):
    '''绘制传入list的散点图'''

    y = numlist
    x = np.arange(1, len(numlist) + 1)

    plt.figure()
    plt.scatter(x,y,label='mnet_delay_1_4')
    plt.legend()
    plt.savefig(file_path + 'scatter.png')
    plt.show()


if __name__ == '__main__':
    # 配置选项
    cf = ConfigParser.ConfigParser()
    cf.read("Config.ini")
    Host = cf.get('RemoteInfo', 'host')  # Telnet服务器IP
    Port = cf.get('RemoteInfo', 'port')
    Username = cf.get('RemoteInfo', 'username')  # 登录用户名
    Password = cf.get('RemoteInfo', 'password')  # 登录密码
    config = [Host, Port, Username, Password]
    commands = ['commands']
    filename = 'mylog'
    path_check(file_path)
    do_telnet(config, commands, filename)
    filenamelist = file_anyls(filename)
    filename = compare_join(filenamelist)
    list1 = createList(filename)
    numAlys(list1)
    generate_statistical_graph(list1)

标签:延时,音频,filename,tn,测算,file,strip,path,numlist
来源: https://blog.csdn.net/qq_37647354/article/details/117016802