其他分享
首页 > 其他分享> > flink sink 批量下层数据时 ,数据"丢失"?

flink sink 批量下层数据时 ,数据"丢失"?

作者:互联网

flink作为目前很火的大数据框架,在实时计算和离线计算场景下有大量应用

图片来源{https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/}

A parallel dataflow

从图中可以看出,flink在处理数据中有三大块:source operator sink

在工作时负责sink这块的数据下层工作,因为下层源有很多,有redis,hbase这些。在实现hbase相关的sink时,我优先考虑使用批量进行sink下层,这样可以提高下层数据时的效率问题。

最开始的编写hbase相关的sink代码( 工作相关业务代码已屏蔽)如下:

public class BatchHbaseSink extends RichSinkFunction<JSONObject> {

    private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class);

    private Connection connection;
    private long lastInvokeTime;
    private final List<Put> puts = new ArrayList();
    /**
     * 每次处理最多500条数据
     */
    private final int maxSize = 500;
    /**
     * 最长延迟5秒
     */
    private final long delayTime = 5000L;


    public BatchHbaseSink() {
        System.out.println("new batch =========== " + Thread.currentThread().getName());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = HBaseUtil.getConnection(
                this.config.getZookeeperQuorum(),
                this.config.getClientPort()
        );
        lastInvokeTime = System.currentTimeMillis();
    }


    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        try {
						.....省略
      
            Put put = new Put(keySuffix + keySuffix).getBytes());
            put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString()));
            puts.add(put);
            final long currentTime = System.currentTimeMillis();
            final long l = currentTime - lastInvokeTime;
            if (puts.size() >= maxSize || l >= delayTime) {
                Table table = connection.getTable(TableName.valueOf(tableName));
                // 数据提交
                table.put(puts);
                puts.clear();
                lastInvokeTime = currentTime;
                table.close();
            }
        } catch (Exception e) {
            logger.error("error:", e);
        }
    }


    @Override
    public void close() throws IOException {
        if (connection != null) {
            connection.close();
        }
    }
}

  

 

我开始的想法很简单,每次等500条数据一次性写入hbase,同时设置延时时间为5秒,防止数据积压

但是当我发送20条需要处理的数据时,发现每次hbase每次写入的数据都不满足20条,运行多次后都有同样的问题,我重新review了代码逻辑,发现逻辑处理方面没有什么问题,我开始把代码改成,来一条数据发送一条数据代码如下

 
public class BatchHbaseSink extends RichSinkFunction<JSONObject> {
​
    private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class);
​
    private Connection connection;
​
    public BatchHbaseSink() {
        System.out.println("new batch =========== " + Thread.currentThread().getName());
    }
​
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = HBaseUtil.getConnection(
                this.config.getZookeeperQuorum(),
                this.config.getClientPort()
        );
    }
​
​
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        try {
            .....省略
      
            Put put = new Put(keySuffix + keySuffix).getBytes());
            put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString()));
            Table table = connection.getTable(TableName.valueOf(tableName));
            // 数据提交
            table.put(puts);
            puts.clear();
            lastInvokeTime = currentTime;
            table.close();
         }
        } catch (Exception e) {
            logger.error("error:", e);
        }
    }
​
​
    @Override
    public void close() throws IOException {
        if (connection != null) {
            connection.close();
        }
    }
}

  



这样处理下层数据,数据不会发生‘’丢失‘’的情况,所以我第一反应是不是缓存的问题导致的,缓存出问题的场景大多数情况是出现线程安全的问题,所以我怀疑是不是线程安全问题,我在程序中添加了线程打印日志,看看是不是多线程处理数据。

System.out.println("---------" + Thread.currentThread().getName() + "---------");

处理数据过程中打印了如下日志:

---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (2/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (4/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (8/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (7/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (1/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (6/8)---------
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (5/8)---------

可以看出,sink过程中有8个线程做处理,所以此时的我觉得已经找到问题的根因了,代码是多线程执行的,存在线程安全问题,所以我给批量处理的数据做了如下处理,将缓存改成线程安全的集合,同时将字段lastInvokeTime修饰为volatile,至此我以为我已经解决了该问题,但是结果仍然是发送的20条数据只落了部分数据到hbase。

这时我有点穷驴技穷了,我重新review了代码,发现一处代码逻辑漏洞

首先我开始设置批量处理数据时,只有当数据超过500条或者超过时间限制5秒钟我就会将数据都刷入hbase中,问题出现在了超时时间限制。我发送的数据是一次性的发送20条,当不满足500条的时候不会发送,但是触发超时5秒也会,但是这个超时只有在下次消费数据时才会触发这个逻辑,所以我需要定时任务去定时的将缓存中积压的数据刷到hbase,防止source一直没有数据导致缓存中的数据无法下层甚至丢失

知道原因,并且有解决办法就是采用定时任务来,我添加了如下代码:

private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList());
private final  static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//定时刷新缓存数据到hbase
public void initFlush() {
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            try {
                if (CollectionUtils.isNotEmpty(puts) && flag) {
                    Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable()));
                    table.put(puts);
                    puts.clear();
                    table.close();
                }
            } catch (Exception e) {
                logger.error("刷新缓存错误:", e);
            }
       }, 30, 5, TimeUnit.SECONDS);
 }

  

 

问题三:添加定时任务,定时刷缓存,可是数据仍然没有下层到hbase

 

想法是好的,现实很残酷,运行下来,下层到hbase的数据仍然缺失。我开始怀疑人生了,我debug了代码,发现缓存中的数据为空

如图:

 

 

这时候只能去看底层flink的源码了,我通过跟踪堆栈,发现了执行逻辑,sink的invoke方法的执行,在streamSink类中被调用:

@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    sinkContext.element = element;
    userFunction.invoke(element.getValue(), sinkContext);
  }

  

我debug到了这边:

如下:

 

 

 

 

 

 

可以看出我上图中用黑色笔圈出的部分,每个线程执行的hbasesink对象都是不一样,也就是说每个线程都有自己的hbasesink对象。问题迎刃而解了,我设置的全局线程池去刷新缓存,是没法针对每个线程中的hbasesink中的缓存操作的,(线程局部变量对其它线程是不可见的)。所以我的做法很简单,我将全局刷新的线程,变为线程局部处理线程,也就是sink线程的子线程,变化逻辑非常简单:

 

private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList());
​
//标志位
private volatile boolean flag = true;
​
//定时刷新缓存数据到hbase
public void initFlush() {
  final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            try {
                if (CollectionUtils.isNotEmpty(puts) && flag) {
                    Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable()));
                    table.put(puts);
                    puts.clear();
                    table.close();
                }
            } catch (Exception e) {
                logger.error("刷新缓存错误:", e);
            }
       }, 30, 5, TimeUnit.SECONDS);
 }

  

如上,我将创建线程的步骤放到了方法里,而非全局变量。

 

问题四:hbasesink是我自己new的创建的对象,为啥每个sink线程有自己的呢?

 

标签:puts,flink,线程,sink,Unnamed,数据,public,Sink
来源: https://www.cnblogs.com/clovejava/p/15068068.html