其他分享
首页 > 其他分享> > Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState

Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState

作者:互联网

 

ValueState[T]保存单个的值,值的类型为T。

实例一

scala version

复制代码
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

val alerts: DataStream[(String, Double, Double)] = keyedData
  .flatMap(new TemperatureAlertFunction(1.7))

class TemperatureAlertFunction(val threshold: Double)
  extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])

    lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
  }

  override def flatMap(
    reading: SensorReading,
    out: Collector[(String, Double, Double)]
  ): Unit = {
    val lastTemp = lastTempState.value()
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      out.collect((reading.id, reading.temperature, tempDiff))
    }
    this.lastTempState.update(reading.temperature)
  }
}
复制代码

上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。

不同key对应的keyed state是相互隔离的。

 

标签:自定义,val,Double,Flink,ValueState,键控,SensorReading,reading,String
来源: https://www.cnblogs.com/qiu-hua/p/13794625.html