其他分享
首页 > 其他分享> > [ Project ] Editing Flume.conf

[ Project ] Editing Flume.conf

作者:互联网

文件编写

简单了解

一、介绍

  这一步主要是利用 flume 采集 HDFS 上的源数据并流向 kafka

二、简单要点

  1.Flume 是什么?
  简单了解一下。

  2.为什么 flumeKafka 要联合使用 ?
  简单了解一下。

  3.建议用谷歌浏览器打开 Flume 官方网站(地址:flume.apache.org),如果需要可在站内将网页转为中文。

  4.首先是查看用户指南,找到 Spooling Directory Source ,这主要是说把 文件夹 放到 目录 中来索取数据,说明这边监控的是文件夹,不监控文件,所以在编辑脚本时,目录写到文件夹即可,不需要写到文件。

三、操作步骤

(一)按图操作

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(二)修改 sources

找到 Spooling Directory Source

a1.channels = ch-1										# 设置名称
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir =/var/log/apache/flumeSpool	# 需要flume处理的文件目录
a1.sources.src-1.fileHeader = true						# 是否添加存储绝对路径文件名的标头
a1.channels = c1
a1.sources = s1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = ch-1
a1.sources.s1.spoolDir =/opt/data/event_attendees
a1.sources.s1.deserializer.maxLineLength=120000

(三)使用拦截器去除表头

  1. 先找到正则拦截器

在这里插入图片描述

  1. 编写代码
    注:因为选择了正则拦截器,那 type 势必为 regex_filter

在这里插入图片描述

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true

(四)修改 channels

找到 File Channels

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint	# 这是检查点文件夹
a1.channels.c1.dataDirs = /mnt/flume/data				# 这是数据通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data

如果担心在传输的过程中日志会丢掉,就要加上一个备份检查点。

在这里插入图片描述

(五)修改 sinks

找到 Kafka Sink

a1.sinks.k1.channel  =  c1 
a1.sinks.k1.type  =  org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic  =  mytopic 
a1.sinks.k1.kafka.bootstrap.servers  = 本地主机:9092 
a1.sinks.k1.kafka.flumeBatchSize  =  20 					# 一批的尺寸是20条
a1.sinks.k1.kafka.producer.acks  =  1 
a1.sinks.k1.kafka.producer.linger.ms  =  1 					# 保留缓存时间
a1.sinks.k1.kafka .producer.compression.type  = snappy		# 压缩方式
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288

四、完整配置代码

a1.channels = c1
a1.sources = s1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/data/event_attendees
a1.sources.s1.deserializer.maxLineLength=120000

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data


a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288

五、执行命令

flume-ng agent -n a1 -c /opt/software/hadoop/flume160/conf -f /opt/flumecfg/event_attendees.conf -Dflume.root.logger=info,console
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.59.200:9092 --topic event_attendees_raw --time -1
kafka-console-consumer.sh --bootstrap-server 192.168.59.200:9092 --topic event_attendees_raw --from-beginning

标签:Flume,a1,sinks,kafka,Project,sources,k1,Editing,channels
来源: https://blog.csdn.net/weixin_51184877/article/details/116310683