其他分享
首页 > 其他分享> > Flum 采集配置

Flum 采集配置

作者:互联网

Flume 采集配置

安装

使用CDH安装

存在的问题

  1. Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.

    增加hdfs的超时时间 tier1.sinks.ods_hdfs_sink.hdfs.callTimeout

  2. ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

    source端的batchSize和sink端的batchSize要一致

    tier1.sources.ods_kafka_source.batchSize=10000
    tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
    
  3. buffer oom

    source是内存模式下,要避免存在多个目录既同时存在多个待写入的文件,这样就会引起OOM。每个文件都会申请你所设置的文件滚动大小的内存空间。

  4. hdfs小文件的问题

#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0

我的解决思路是没一个小时进行滚动一次,然后文件的大小在128M后滚动一次,这样会出现在0点的时候还存在未滚动的数据,需要在1点的时候开始作业。

编写interceptor

AbstractInterceptor

package com.tbl.flume.interceptor;

import com.tbl.flume.conf.CommonConf;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * 自定义flume拦截器
 */
public abstract class AbstractInterceptor implements Interceptor {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractInterceptor.class);
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        doIntercept(event);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    protected abstract void doIntercept(Event event);

}

TimeJsonObjectEventInterceptor

package com.tbl.flume.interceptor;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tbl.flume.conf.CommonConf;
import com.tbl.flume.sink.AbstractClickhouseSink;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.InterceptorBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;

import static com.tbl.flume.conf.CommonConf.*;

public class TimeJsonObjectEventInterceptor extends AbstractInterceptor {
    private String timeField;
    private String prefix;

    public TimeJsonObjectEventInterceptor(Context context) {
        timeField = context.getString(TIME_FIELD);
        prefix = context.getString(TOPIC_PREFIX);
    }

    @Override
    protected void doIntercept(Event event) {
        JSONObject jsonObject = JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8));
        String dateTime = jsonObject.getString(timeField);
        String[] ts = dateTime.split(" ");
        event.getHeaders().put(CONSUME_DATE, ts[0]);
        event.getHeaders().put(CONSUME_DATE_TIME, dateTime);
        String topic = event.getHeaders().get(TOPIC);
        String[] topics = topic.split(prefix);
        event.getHeaders().put(TABLE, topics[topics.length - 1]);
    }

    public static class Builder implements Interceptor.Builder {
        private Context context;

        @Override
        public Interceptor build() {
            return new TimeJsonObjectEventInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }
    }
}

配置文件

#ources sinks channels
tier1.sources=ods_kafka_source
tier1.sinks=ods_hdfs_sink ods_clickhouse_sink
tier1.channels=ods_file_channel

#配置关系
tier1.sources.ods_kafka_source.channels=ods_file_channel
tier1.sinks.ods_hdfs_sink.channel=ods_file_channel
tier1.sinks.ods_clickhouse_sink.channel=ods_file_channel

#配置ods_kafka_source
tier1.sources.ods_kafka_source.type=org.apache.flume.source.kafka.KafkaSource
tier1.sources.ods_kafka_source.kafka.bootstrap.servers=cdh210:9092,cdh211:9092,cdh212:9092
tier1.sources.ods_kafka_source.kafka.consumer.group.id=flume
tier1.sources.ods_kafka_source.kafka.topics=topic_wa_dw_0001,topic_wa_wb_0001,topic_wa_source_fj_0001,topic_wa_source_fj_0002,topic_wa_source_fj_1002,topic_wa_source_fj_1001,topic_ga_viid_0001,topic_ga_viid_0002,topic_ga_viid_0003
tier1.sources.ods_kafka_source.batchSize=10000
tier1.sources.ods_kafka_source.batchDurationMillis=5000
tier1.sources.ods_kafka_source.setTopicHeader=true
tier1.sources.ods_kafka_source.topicHeader=topic
tier1.sources.ods_kafka_source.interceptors=timeInterceptor
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.type=com.tbl.flume.interceptor.TimeJsonObjectEventInterceptor$Builder
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.time_field=startTime
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.topic_prefix=topic_


tier1.channels.ods_file_channel.type=memory
tier1.channels.ods_file_channel.capacity=100000
tier1.channels.ods_file_channel.transactionCapacity=10000

#ods_hdfs_sink
tier1.sinks.ods_hdfs_sink.type=hdfs
tier1.sinks.ods_hdfs_sink.hdfs.useLocalTimeStamp=true
tier1.sinks.ods_hdfs_sink.hdfs.path=/flume/capture/%{topic}/%{consume_date}/
tier1.sinks.ods_hdfs_sink.hdfs.filePrefix=ods_
tier1.sinks.ods_hdfs_sink.hdfs.round=false
#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
tier1.sinks.ods_hdfs_sink.hdfs.threadsPoolSize=10
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
tier1.sinks.ods_hdfs_sink.hdfs.hdfs.minBlockReplicas=1
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0
tier1.sinks.ods_hdfs_sink.hdfs.writeFormat=Text
tier1.sinks.ods_hdfs_sink.hdfs.codeC=snappy
tier1.sinks.ods_hdfs_sink.hdfs.fileType=CompressedStream
tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
tier1.sinks.ods_hdfs_sink.hdfs.callTimeout=180000

tier1.sinks.ods_clickhouse_sink.type=com.tbl.flume.sink.JsonObjectClickhouseSink
tier1.sinks.ods_clickhouse_sink.servers=192.168.2.182:8123
tier1.sinks.ods_clickhouse_sink.user=root
tier1.sinks.ods_clickhouse_sink.password=tbl_db_543
tier1.sinks.ods_clickhouse_sink.database=yth
tier1.sinks.ods_clickhouse_sink.table_prefix=dwd_
tier1.sinks.ods_clickhouse_sink.batchSize=2000
tier1.sinks.ods_clickhouse_sink.max_waite_time=10000

标签:tier1,hdfs,ods,Flum,sinks,source,配置,采集,sink
来源: https://blog.csdn.net/qq_22271479/article/details/120371733