使用logstash过滤出特定格式的日志
作者:互联网
1.一个例子
- 项目日志生成在某个路径,如/var/log/project,里面有warn,info,error目录,分别对应不同级别的日志,需要采集这些日志。
- 需要采集特定格式的日志,如:
[2018-11-24 08:33:43,253][ERROR][http-nio-8080-exec-4][com.hh.test.logs.LogsApplication][code:200,msg:测试录入错误日志,param:{}]
- 筛选采集到的日志,生成自定义字段,如 date, level, thread, class, msg
filter {
if "nova" in [tags]{
grok {
# 筛选过滤
match => {
"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
}
mutate {
remove_field => [
"message",
]
}
# 不匹配正则则删除,匹配正则用=~
if [level] !~ "(ERROR|WARN|INFO)" {
# 删除日志
drop {}
}
}
}
备注:grok里边有定义好的现场模板可以用,但是更多的是自定义模板,规则是这样的,小括号里边包含一个key和value,例子:(?
value),比如以上的信息,第一个定义的key是date,表示方法为:? 前边一个问号,然后用<>把key包含在里边去。value就是纯正则了。这有个在线的调试库,以供参考:http://grokdebug.herokuapp.com/
2.多项匹配(提高性能)
grok {
"match" => {
"message => [
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{DATA:data}',
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{DATA:data} | %{NUMBER:number}'
]
}
}
logstash 会按照这个定义次序依次尝试匹配,直到匹配成功为止。虽说效果跟用 | 分割写个大大的正则是一样的,但是可阅读性好很多。
3.grok匹配失败(性能基准测试)
当grok匹配失败的时候,插件会为这个事件打个tag,默认是 _grokparsefailure。LogStash允许你把这些处理失败的事件路由到其他地方做后续的处理,例如:
input { # ... }
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}" }
}
}
output {
if "_grokparsefailure" in [tags] {
# write events that didn't match to a file
file { "path" => "/tmp/grok_failures.txt" }
} else {
elasticsearch { }
}
}
4.快速失败,设置锚点(提高性能)
^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$
5.数据修改(Mutate)
filter {
mutate {
convert => ["request_time", "float"]
}
}
6.大小写转换:uppercase 和 lowercase
filter {
mutate {
uppercase => [ "fieldname" ]
}
}
标签:grok,process,number,NUMBER,过滤,日志,DATA,logstash 来源: https://www.cnblogs.com/varden/p/15164913.html