StreamSets实战之路(十五)-实战篇- 数据采集与处理
作者:互联网
主要通过一个数据采集与处理的案例来介绍Streamsets(3.13.0)的使用,主要将使用Edge数据流收集streamsets系统的日志和主机性能指标,通过收集数据流收集类数据并进行简单处理,发送至kafka中,性能指标数据入库数据流和日志数据入库数据流分别从kafka中消费数据,并将两类数据进行简单处理加载到数据库中。
学习目的:使用edge和streamset的数据互动,使用streamset进行分布式异步数据处理。
数据流图:
最终数据流的效果图:
需要配置5个数据流,两个edge采集数据流,一个数据收集数据流,两个数据处理与入库数据流
前期准备:
(1)需要在数据采集的节点上部署安装Edge(不会使用的同学可以参照前面文章)。
(2)一个现成kafka集群,并创建一个两个topic,kafka集群主要为了让数据流达到分布式异步处理的能力。
(3)一个现成的ES集群。
构建步骤:
1.首先构建日志数据采集器数据流
配置edge数据流发布的地址(该主机上一定要安装部署了edge)
配一下文件采集文件和数据格式,数据格式我们直接按文本传输
使用destination 类http client配置一下远程数据收集器的地址和APP ID
2.性能指标数据采集器数据流
配置edge数据流发布的地址(该主机上一定要安装部署了edge)
配置一下系统指标采集插件,采集哪些数据和采集的频率,这里我们采集host、cpu、内存、磁盘等,采集频率为两秒
使用destination 类http client同样配置一下数据收集器远程的url和APP ID
3.数据收集与处理数据流
使用origin 类http sevice组件,配置数据收集器的端口、最大并发量以及APP ID
使用Http 路由插件,将接受到的数据路由不到不同分支,这里配置日志和性能指标数据路由。
使用日志解析插件对收集到的日志数据进行解析,这里我们选择Log4j解析器,选择使用自定义日志格式,这里的格式按照streamset的格式:
%d{ISO8601} [user:%X{s-user}] [pipeline:%X{s-entity}] [runner:%X{s-runner}] [thread:%t] [stage:%X{s-stage}] %-5p %c{1} - %m%n
使用kafka生成插件将两类数据输出到不同的topic中。
4.日志数据入库数据流
配置kafka地址和日志数据的topic和消费组
5.性能指标数据入库数据流
配置kafka地址和性能指标数据的topic和消费组
Streamsets实战之路正在更新中,尽情期待!!!
此文章为博主原创,转载请标明出处和原始链接,谢谢。
标签:实战,实战篇,kafka,采集,edge,数据流,StreamSets,日志,数据 来源: https://blog.csdn.net/zwzfgr/article/details/114041281