其他分享
首页 > 其他分享> > Flink实例(二十九):自定义时间和窗口的操作符(十)窗口操作符(三)清理器(EVICTORS)

Flink实例(二十九):自定义时间和窗口的操作符(十)窗口操作符(三)清理器(EVICTORS)

作者:互联网

evictor可以在window function求值之前或者之后移除窗口中的元素。

我们看一下Evictor的接口定义:

public interface Evictor<T, W extends Window>
    extends Serializable {
  void evictBefore(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  void evictAfter(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  interface EvictorContext {

    long getCurrentProcessingTime();

    long getCurrentWatermark();
  }
}

  evictBefore()和evictAfter()分别在window function计算之前或者之后调用。

  Iterable迭代器包含了窗口所有的元素,size为窗口中元素的数量,window object和EvictorContext可以访问当前处理时间和水位线。

  可以对Iterator调用remove()方法来移除窗口中的元素。

  evictor也经常被用在GlobalWindow上,用来清除部分元素,而不是将窗口中的元素全部清空。

-----------------------

驱逐器能够在触发器触发之后,窗口函数使用之前或之后从窗口中清除元素。
  evictBefore()在窗口函数之前使用。而 evictAfter() 在窗口函数之后使用。在使用窗口函数之前被逐出的元素将不被处理。

Flink带有三种内置驱逐器:
  CountEvictor:在窗口维护用户指定数量的元素,如果多于用户指定的数量,从窗口缓冲区的开头丢弃多余的元素。
  DeltaEvictor:使用 DeltaFunction 和一个阈值,来计算窗口缓冲区中的最后一个元素与其余每个元素之间的差值,并删除差值大于或等于阈值的元素。
  TimeEvictor:以毫秒为单位的时间间隔(interval)作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。

默认情况下,所有内置的驱逐器在窗口函数之前使用。指定驱逐器可以避免预聚合(pre-aggregation),因为窗口内所有元素必须在窗口计算之前传递给驱逐器。

Flink 不保证窗口内元素的顺序。这意味着虽然驱逐器可以从窗口开头移除元素,但这些元素不一定是先到的还是后到的。

实例一

class MyEvictor() extends Evictor[MyTime, TimeWindow] {
  override def evictBefore(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
    val ite: util.Iterator[TimestampedValue[MyTime]] = iterable.iterator()
    while (ite.hasNext) {
      val elment: TimestampedValue[MyTime] = ite.next()
      //指定事件事件获取到的就是事件时间
      println("驱逐器获取到的时间:" + elment.getTimestamp)
      //模拟去掉非法参数数据
      if (elment.getValue.timestamp <= 0) {
        ite.remove()
      }
    }
  }

  override def evictAfter(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {

  }
}

 

标签:驱逐,窗口,自定义,元素,Evictor,window,操作符,EvictorContext
来源: https://www.cnblogs.com/qiu-hua/p/13782408.html