其他分享
首页 > 其他分享> > 使用并发 ssh 连接来提升捞日志脚本执行效率

使用并发 ssh 连接来提升捞日志脚本执行效率

作者:互联网

问题背景

公司有个简单粗暴的日志服务,它部署在多台机器实例上,收集的日志记录在每台机器本地硬盘,写一个小时自动切换日志文件,硬盘空间写满了自动回卷,大约可以保存两三天的历史数据。为什么说它粗暴呢?原来它不提供任何查询日志的接口,想要获取日志唯一的办法就是直接查日志文件:

最后将这些文件拼接在一起作为最终结果。有个前辈写过一个脚本,不过比较简单,基本就是一个 while 循环里串行查询每台实例。获取一次日志需要将近 1 个小时,严重拖慢了开发人员的节奏。作为一个资深 coder,时间是最富贵的财富,婶可忍叔不可忍,于是决定对脚本作一番改造以提升查询效率。

ssh 远程脚本

在开始改造前,先看下原脚本的执行逻辑:

#!/bin/sh

date=$1
type=$2

# 1st, prepare the grep script
echo "#! /bin/sh" > bin/work_${type}.sh
echo "grep type=${type} /home/log/update.log.${date} >/tmp/work_${type}.log.${date}" >> bin/work_${type}.sh

# 2nd, send the script to each machine and run it
for i in $(get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq)
do
    echo $i
    scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" bin/work_${type}.sh $i:/tmp/
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "nohup sh /tmp/work_${type}.sh >/dev/null 2>err.log &"
done

# wait 'grep' a while
sleep 30
rm bin/work_${type}.sh

# 3rd, get log from each machine
for i in $(get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq)
do
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "cat /tmp/work_${type}.log.${date}" >> data/work_${type}.log.${date}
    # delete to free spaces
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "rm /tmp/work_${type}.log.${date} &"
done

针对上面的脚本,做个简单说明:

另外说明一下 ssh 与 scp 的几个参数:

总结一下远程 ssh 执行脚本和 scp 的语法:

其中 scp 也可以将远程文件复制到本地 ,不过这里需要将数据追加到已有文件, 所以使用了 ssh+cat 的实现方式。其实 ssh 的那些选项都可以省略,因为机器之间已经预先建立好了证书信任关系,这里只是 in case。重点说明一下 ssh 执行位于远程机器的脚本时需要注意的点:

其中比较重要的是第 1 条和第 3 条,nohup 亲测可省略。关于这方面的验证,可以查看文末链接。大家记住这个结论即可,后面会用到。

性能瓶颈

为了验证脚本执行慢确实是由 ssh 串行引起的,这里做了一个实验,执行以下简单的脚本,并记录整体耗时:

$ time ssh xxxx-xxxxxxxx-xxx-xx-xxx.xxxx 'pwd'
/home/rd

real	0m2.145s
user	0m0.026s
sys	0m0.008s

ssh 的第一个参数是实例机器名,由 get_instance_by_service 返回的实例列表中随便选取一个;ssh 第二个参数是要远程执行的命令,为了测量 ssh 时间这里使用了 pwd 命令,它的耗时基本可以忽略。

time 输出证明一次 ssh 交互大概在 2 秒左右,参考上一节中的脚本,可以得出以下脚本耗时公式:

total=(ssh_time + scp_time) * instance_count + 30 + ssh_time * 2 * instance_count
     = ssh_time * 4 * instance_count + 30

这里出于计算方便考虑,忽略 scp 过程中文件传输的时间 (单个文件都比较小),将它的耗时约等于 ssh 耗时,经过推导得到了第二个等式。令 ssh_time = 2,instance_count = 500,那么执行一次脚本就需要 4030 ≈ 1.12 小时,看来光消耗在连接上的时间就超过 1 小时了,怪不得这么慢呢!

预读取实例列表

读取实例列表其实比较快,统计了一下也就在几十毫秒之间:

$ time get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx
……
real	0m0.011s
user	0m0.003s
sys	0m0.006s

但当遇到连不通的实例时,一下就要耗掉 3 秒,统计了一下,500 多台实例中只有 300 多台可以连通 (amazing~),相当于每次有 200 * 4 * 3 = 2400 秒,大约 40 分钟的时间是浪费在无效的机器上了。当然可以将等待时间缩小到 1 秒,时间就可以降到 10 分钟。但我连一秒也不愿意浪费,何必傻等这 10 分钟呢?通过提前检查哪些机器是可以连通的,可以节约这 40 分钟,具体的做法就是运行 check_instance.sh 这个脚本:

#!/bin/sh

ret=0
start_time=$(date +%s)
if [ -f instance.txt ]; then 
    mv instance.txt instance_old.txt
fi

for host in `get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq`
do
    echo "check $host"
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "pwd"
    ret=$?
    if [ $ret -eq 0 ]; then 
        # only record success instance
        echo "$host" >> instance.txt
    else 
        echo "error $ret"
    fi
done

end_time=$(date +%s)
cost_time=$(($end_time-$start_time))
echo "check done, total time spend: $(($cost_time/60))m $(($cost_time%60))s "

简单说明一下:

注意,这个脚本仍然是串行的,下面是一次实际的运行耗时:

check done, total time spend: 60m 43s 

印证了之前的计算结果,光 ssh 连接就得耗费一个多小时。好在这个脚本很长时间才运行一次,耗时久还能接受。

预复制过滤脚本

第二块耗时就是上传 grep 过滤脚本到实例机器,平均消耗 ssh_time * instance_count = 600 秒 (有效实例按 300 计算)。照搬前面的思路,如果将这个脚本做成通用的并提前上传到各台实例,那么这一步的耗时也可以省下来了。来看下通用的过滤脚本 fetch_log.sh:

#!/bin/sh

if [ $# -ne 1 -a $# -ne 2 ]; then
    echo "Usage: fetch_log.sh keyword [date]"
    echo "       keyword: anything you think a keyword, support regular expression that grep accepts"
    echo "       date: 2021020308, if no date provide using log file currently writing"
    exit 1
fi

keyword=$1
# using keyword md5 as filename part to avoid file confliction
# and prevent keyword contain invalid character for path names...
keymd5=$(echo $keyword | md5sum | awk '{print $1}')

if [ $# -eq 1 ]; then 
    # if no date provide, using current log file 
    # one can only filter logs that not currently writing ago, 
    # now we can filter them by not providing date parameter.
    grep -a "${keyword}" /home/log/update.log >/tmp/work.${keymd5}.log
else 
    date=$2

    # using keyword md5 as filename part to avoid file confliction
    grep -a "${keyword}" /home/log/update.log.$date >/tmp/work.${date}.${keymd5}.log
fi

 简单说明一下:

干活的脚本有了,下面就来看一下负责上传脚本的 upload_fetch_log.sh:

#!/bin/sh

ret=0
start_time=$(date +%s)

if [ ! -f instance.txt ]; then 
    echo "generate reliable instance list first.."
    sh check_instance.sh
fi

# upload the fetch_log.sh to each machine
for host in `cat instance.txt`
do
    echo "uploading $host"
    scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" fetch_log.sh $host:/tmp/
    ret=$?
    if [ $ret -ne 0 ]; then 
        echo "error $ret"
    fi
done

end_time=$(date +%s)
cost_time=$(($end_time-$start_time))
echo "upload done, total time spend: $(($cost_time/60))m $(($cost_time%60))s "

做个简单说明:

有两点需要注意

下面是一次实际的运行耗时:

upload done, total time spend: 10m 14s

看起来比 check_instance.sh 快多了,主要是只需要对可连通的机器执行 ssh 连接,速度会快很多。

这样一套组合拳下来,新的脚本不光节省了执行时间,还得到了更强大的过滤表达式、更及时的日志捞取,比之前好用不止一点点。

ssh 并发

上面做了足够多的铺垫,可以开始本文的重头戏了 —— ssh 连接的并发执行。其实聪明的读者已经看出来了,上面一顿忙活,也只是解决了 1/4 的耗时问题,还有三大耗时在这儿摆着:

而且这已经是在远程机异步执行了,如果同步执行那就更慢了。所以问题的关键不是 grep 慢,而是启动远程 grep 慢 (之前有过统计,大概是 2 秒左右)。如何节约这个时间呢?第一个想到的方案是并行执行 ssh,将启动 ssh 的过程也后台化 (&),这样 2 秒内就可以并行启动多个连接了,如果一次能将 300 台实例全部启动,时间就可以直接缩短到 2 秒,是不是很厉害!

当然了,考虑到并发连接上限、对日志服务的冲击等因素,最好不要一次启动那么多连接,如果一次能启动 10 个并发连接,那么 300 台实例需要 60 (300 / 10 * 2 ) 秒,也相当快了。不过各个批次之间,需要有一个等待操作,以保证开启下个批次前上个批次的脚本都执行完毕了,这就增大了复杂性。

不过对于第一步 (过滤) 而言,还没有回传文件的问题,相对来说简单一点,来看一下 exec_fetch_log.sh 脚本:

#!/bin/sh

if [ $# -ne 1 -a $# -ne 2 ]; then
    echo "Usage: exec_fetch_log.sh keyword [date]"
    echo "       keyword: anything you think a keyword,support regular expression that grep accepts"
    echo "       date: 2021020308, if no date provide using log file currently writing"
    exit 1
fi

if [ ! -f instance.txt ]; then 
    echo "generate reliable instance list first.."
    sh check_instance.sh
fi

n=0
batch_size=10
batch_num=0
batch_no=0
keyword=$1
# using keyword md5 as filename part to avoid file confliction
# and prevent keyword contain invalid character for path names...
keymd5=$(echo "$keyword" | md5sum | awk '{print $1}')
date=""
if [ $# -eq 1 ]; then 
    date=`date "+%Y%m%d%H"`
else
    date=$2
fi

echo -e "=======================================================================\nkeyword=$keyword; date=$date; keymd5=$keymd5" | tee -a error.log
start_time=$(date +%s)
echo "start grep from each machine"
for host in `cat instance.txt`
do
    batch_no=$(($n % $batch_size))
    batch_num=$(($n / $batch_size))
    if [ $batch_no -eq 0 -a $n -gt 0 ]; then 
        echo "wait batch $batch_num"
        wait
    fi

    echo "$(($n+1)): $host"
    if [ $# -eq 1 ]; then 
        ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "nohup sh /tmp/fetch_log.sh \"$keyword\" >/dev/null 2>err.log &" >> error.log 2>&1 &
    else 
        ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "nohup sh /tmp/fetch_log.sh \"$keyword\" \"$date\" >/dev/null 2>err.log &" >> error.log 2>&1  &
    fi
    n=$(($n+1))
done

echo "wait last batch $(($batch_num+1))"
wait

……

做个简单说明:

ssh 并发的关键是批次控制,每个异步执行的 ssh 都将成为一个单独的子进程,通过 wait 等待子进程就可以完成批次的等待,不过这有一个前题 —— 并发脚本没有其它独立运行的子进程,换句话说,就是不能同时有其它异步执行的任务。当然了,也可以通过在数组中记录子进程的 pid 并挨个 wait 它们来实现,不过那样开发成本就太高了,这里没有采取,感兴趣的可以看下 man wait:

       wait [n ...]
              Wait  for  each  specified  process  and return its termination status.  Each n may be a process ID or a job
              specification; if a job spec is given, all processes in that job’s pipeline are waited for.   If  n  is  not
              given, all currently active child processes are waited for, and the return status is zero.  If n specifies a
              non-existent process or job, the return status is 127.  Otherwise, the return status is the exit  status  of
              the last process or job waited for.

老脚本的 ssh 异步其实只进行到了第一步 —— 远程执行,而 ssh 连接本身还是同步的,新脚本最大的改进是连 ssh 本身也异步了,并提供了并发数量控制,可以实现更极致的并发能力。如果有些人不在乎并发量,可以直接一个循环异步启动所有 ssh 连接,那样代码更简单。

并发数量也会随机器数量增多而增长,不过这里没有将这个参数暴露在外面,主要是防止一些人为了快而不择手段,对现在运行的日志服务造成影响 (当然了,只能防一些小白误操作)。

文件合并

有了上面的基础,再处理剩下的两大耗时操作也就不难了,与执行过滤和删除结果不同,回传结果要求脚本执行完成后将数据保存在本地,之前顺序执行时一个追加操作就能搞定的事情现在变复杂了,批量并行后如何处理同时返回的多个文件块成为一个问题。首先不能再简单的追加了,因为多进程追加有可能导致数据混乱,保险的方式是每个子进程写一个临时文件,最后再将它们合并起来,继续看主脚本 exec_fetch_log:

n=0
batch_num=0
dir=""
olddir=""
if [ ! -d data ]; then 
    mkdir data
fi

echo "fetch result logs from each machine"
for host in `cat instance.txt`
do
    batch_no=$(($n % $batch_size))
    batch_num=$(($n / $batch_size))
    dir="data/$date.$batch_num"
    if [ ! -d "$dir" ]; then 
        mkdir "$dir"
        echo "create data dir: $dir"
    fi
    if [ $batch_no -eq 0 -a $n -gt 0 ]; then 
        # splice previous batch files
        if [ $batch_num -gt 1 ]; then 
            # has previous batch
            echo "batch end, try to splice previous batch files..."
            olddir="data/$date.$(($batch_num-2))"
            for file in $olddir/work.*.log
            do
                echo "handle $file: $(stat -c \"%s\" $file)"
                cat "$file" >> data/work.$date.log
            done 

            rm -rf "$olddir"
            echo "delete dir: $olddir"
        fi

        echo "wait batch $batch_num"
        wait
    fi

    echo "$(($n+1)): $host"
    if [ $# -eq 1 ]; then 
        scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" "$host:/tmp/work.$keymd5.log" "$dir/work.$host.log" >> error.log 2>&1 &
    else
        scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" "$host:/tmp/work.$date.$keymd5.log" "$dir/work.$host.log" >> error.log 2>&1 &
    fi
    n=$(($n+1))
done

# splice previous batch files
echo "batch end, try to splice last batch files..."
olddir="data/$date.$(($batch_num-1))"
for file in $olddir/work.*.log
do
    echo "handle $file: $(stat -c \"%s\" $file)"
    cat "$file" >> data/work.$date.log
done 

rm -rf "$olddir"
echo "delete dir: $olddir"
echo "wait last batch $(($batch_num+1))"
wait

if [ $batch_no -ne 0 ]; then 
    olddir="data/$date.$batch_num"
    for file in $olddir/work.*.log
    do
        echo "handle $file: $(stat -c \"%s\" $file)"
        cat "$file" >> data/work.$date.log
    done 

    rm -rf "$olddir"
    echo "delete dir: $olddir"
fi

做个简单说明:

其实在 wait 后合并是比较符合一般人思维习惯的,此时子进程都退出了,正好就把数据块合并了事,这样在 for 循环结束后就只需要一次 wait 和合并就可以了,代码看上去更清爽。为什么没有这样做呢?并发,还是并发!当一批子进程 wait 成功时,先去启动下一批的 ssh 连接,在 ssh 连接干活的空隙 (2 秒) 去合并数据块绰绰有余,等合并完了再回来一个 wait 可能还得等上个 1 秒多,这样是不是就省下了数据块合并的时间呢?哈哈,这才是真正的时间管理大师好伐

标签:xxxx,log,xxxxxxxx,batch,并发,ssh,date,日志
来源: https://www.cnblogs.com/goodcitizen/p/improve_efficient_of_shell_script_for_fetching_log_b