使用flume将本地数据导入kafka
作者:互联网
文章目录
创建topic
[root@hadoop1 kafka]# kafka-topics.sh --zookeeper hadoop1:2181 --create --topic users --partitions 1 --replication-factor 1
Created topic "users".
flume操作
创建所需文件夹
[root@hadoop1 jobkb09]# mkdir dataSourceFile
[root@hadoop1 jobkb09]# cd dataSourceFile
[root@hadoop1 dataSourceFile]# mkdir users
[root@hadoop1 dataSourceFile]# cd ..
[root@hadoop1 jobkb09]# mkdir dataChannelFile
[root@hadoop1 jobkb09]# cd dataChannelFile
[root@hadoop1 dataChannelFile]# mkdir users
[root@hadoop1 dataChannelFile]# cd ..
[root@hadoop1 jobkb09]# mkdir checkpointFile
[root@hadoop1 jobkb09]# cd checkpointFile
[root@hadoop1 checkpointFile]# mkdir users
[root@hadoop1 checkpointFile]# cd ..
[root@hadoop1 jobkb09]# vi users-flume-kafka.conf
users-flume-kafka.conf内容如下
users.sources=usersSource
users.channels=usersChannel
users.sinks=usersSink
users.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/flume/conf/jobkb09/dataSourceFile/users
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=1000
users.sources.usersSource.interceptors=head_filter
users.sources.usersSource.interceptors.head_filter.type=regex_filter
users.sources.usersSource.interceptors.head_filter.regex=^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents=true
users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/flume/conf/jobkb09/checkpointFile/users
users.channels.usersChannel.dataDirs=/opt/flume/conf/jobkb09/dataChannelFile/users
users.sinks.usersSink.type=org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize=640
users.sinks.usersSink.brokerList=192.168.153.10:9092
users.sinks.usersSink.topic=users
users.sources.usersSource.channels=usersChannel
users.sinks.usersSink.channel=usersChannel
执行命令
[root@hadoop1 flume]# wc -l conf/jobkb09/dataSourceFile/users/users_2020-12-08.csv
38210 conf/jobkb09/dataSourceFile/users/users_2020-12-08.csv
[root@hadoop1 flume]# flume-ng agent -n users -c conf -f conf/jobkb09/tmp/users-flume-kafka.conf -Dflume.root.logger=INFO,console
使用kafka命令验证
//比38210少一行的原因是我们使用flume操作的时候将表列字段删除了
[root@hadoop1 flume]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.153.10:9092 --topic users -time -1 --offsets 1
users:0:38209
[root@hadoop1 flume]# kafka-console-consumer.sh --bootstrap-server 192.168.153.10:9092 --topic users --from-beginning
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan Indonesia,480
3537982273,id_ID,1992,male,2012-09-29T18:03:12.111Z,Medan Indonesia,420
823183725,en_US,1975,male,2012-10-06T03:14:07.149Z,Stratford Ontario,-240
1872223848,en_US,1991,female,2012-11-04T08:59:43.783Z,Tehran Iran,210
3429017717,id_ID,1995,female,2012-09-10T16:06:53.132Z,,420
627175141,ka_GE,1973,female,2012-11-01T09:59:17.590Z,Tbilisi Georgia,240
2752000443,id_ID,1994,male,2012-10-03T05:22:17.637Z,Medan Indonesia,420
3473687777,id_ID,1965,female,2012-10-03T12:19:29.975Z,Medan Indonesia,420
2966052962,id_ID,1979,male,2012-10-31T10:11:57.668Z,Medan Indonesia,420
264876277,id_ID,1988,female,2012-10-02T07:28:09.555Z,Medan Indonesia,420
1534483818,en_US,1992,male,2012-09-25T13:38:04.083Z,Medan Indonesia,420
2648135297,en_US,1996,female,2012-10-30T05:09:45.592Z,Phnom Penh,420
............................................
标签:flume,users,kafka,导入,jobkb09,hadoop1,root 来源: https://blog.csdn.net/xiaoxionghanhan/article/details/110880408