Flume自定义拦截器
作者:互联网
需求:一个topic包含很多个表信息,需要自动根据json字符串中的字段来写入到hive不同的表对应的路径中。
发送到Kafka中的数据原本最外层原本没有pkDay和project,只有data和name。因为担心data里面会空值,所以根同事商量,让他们在最外层添加了project和pkDay字段。
pkDay字段用于表的自动分区,proejct和name合起来用于自动拼接hive表的名称为 ods_project_name
{ "data": { "blockNo": 12371220, "chain": "Ethereum", "currentTimestamp": 1620177708, "fee": 3000, "project": "test1" }, "name": "view", "pkDay": "2021-05-05", "project": "test1" }
自定义Flume拦截器开发
public class BlockExtractorInterceptor implements Interceptor { Logger log = LoggerFactory.getLogger(this.getClass()); JsonParser parser = null; @Override public void initialize() { parser = new JsonParser(); } @Override public Event intercept(Event event) { //获取数据body byte[] body = event.getBody(); Map<String,String> headerMap = event.getHeaders(); String str = new String(body, Charsets.UTF_8); JsonElement element = parser.parse(str); try{ JsonObject root = element.getAsJsonObject(); log.debug(str); JsonObject dataObj = root.get("data").getAsJsonObject(); String name = root.get("name").getAsString().toLowerCase(); String project = "none"; if(Objects.nonNull(root.get("project"))){ project = root.get("project").getAsString().toLowerCase(); } String pk_day = root.get("pkDay").getAsString(); String pk_year = pk_day.substring(0,4); String pk_month = pk_day.substring(0,7); log.debug("name ======>" + name); //直接取JSON中的data值输出到body中 event.setBody(dataObj.toString().getBytes(Charsets.UTF_8)); //设置Header headerMap.put("pk_year",pk_year); headerMap.put("pk_month",pk_month); headerMap.put("pk_day",pk_day); headerMap.put("name",name); headerMap.put("project",project); }catch (Exception e){ log.error(str); log.error(e.getMessage()); //异常信息单独写入 if(null!= str){ headerMap.put("error","error"); event.setBody(str.getBytes(Charsets.UTF_8)); } e.printStackTrace(); } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new BlockExtractorInterceptor(); } @Override public void configure(Context context) { } } }
我 这里判断了如果project为空时,直接丢到none里面,那么结果表名为 ods_none_name。
幸亏提前预判了他们会出现空的project,结果接收时,果然他们给了一些是project为空的。然后让他们修改数据了。
event 事件中可以获取到两个属性,一个是 event.getHeaders(),一个是event.getBody()。
拿到数据后用data的值覆盖了body中的值。header中的值存进去pk_year,pk_month,pk_day,以及name和project。这个后面flume配置文件中使用。
注意一个细节,解析json串时候,使用的是gson,刚好flume的lib目录下也有gson包,所以不需要额外添加gson的包。之前使用fastjson发现了一些坑。
打包编译
编译好,把生成的jar包放到flume的lib目录下。
flume配置
test.sinks.k1.type = hdfs test.sinks.k1.hdfs.path = hdfs://ns1//user/hive/warehouse/ods.db/ods_%{project}_%{name}/pk_year=%{pk_year}/pk_month=%{pk_month}/pk_day=%{pk_day} test.sinks.k1.hdfs.filePrefix = %{project}_%{name} test.sinks.k1.hdfs.fileSufix = .log test.sinks.k1.hdfs.useLocalTimeStamp = true test.sinks.k1.hdfs.batchSize = 500 test.sinks.k1.hdfs.fileType = DataStream test.sinks.k1.hdfs.writeFormat = Text test.sinks.k1.hdfs.rollSize = 2147483648 test.sinks.k1.hdfs.rollInterval = 0 test.sinks.k1.hdfs.rollCount = 0 test.sinks.k1.hdfs.idleTimeout = 120 test.sinks.k1.hdfs.minBlockReplicas = 1 test.channels.c1.type = file test.channels.c1.checkpointDir = /home/hadoop/bigdata/flume_job/chkDir/project_%{project} test.channels.c1.dataDirs = /home/hadoop/bigdata/flume_job/dataDir/project_%{project} test.sources.s1.channels = c1 test.sinks.k1.channel = c1
测试结果如下:
标记红色部分:test.sinks.k1.hdfs.path和test.sinks.k1.hdfs.filePrefix 可以使用header中放入的字段
标记蓝色部分:test.channels.c1.checkpointDir和test.channels.c1.dataDirs 不能使用header中放入的字段。
我们使用红色部分的设置已经可以满足,根据字符串中不同的字段设置不同的hdfs存放路径了。
其实官方有个正则提取的拦截器也可以,不过正则提取的规则比较麻烦,就怕存在重复的多层嵌套曾在重复的字段时可能会出错,我对正则也不太熟悉吧。
总之能够解决我的需求问题就行。
标签:Flume,hdfs,拦截器,sinks,自定义,project,k1,test,pk 来源: https://www.cnblogs.com/30go/p/16217225.html