其他分享
首页 > 其他分享> > ProcessFunction API(底层API)

ProcessFunction API(底层API)

作者:互联网

我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。

​ 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,FlinkSQL就是使用Process Function实现的

Flink提供了8个Process Function:

9.1 KeyedProcessFunction

​ 这个是相对比较常用的ProcessFunction,根据名字就可以知道是用在keyedStream上的。

​ KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()close()getRuntimeContext()等方法。而KeyedProcessFunction<K, I, O>还额外提供了两个方法:

测试代码

设置一个获取数据后第5s给出提示信息的定时器

package processfunction;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author : Ashiamd email: ashiamd@foxmail.com
 * @date : 2021/2/3 12:30 AM
 */
public class ProcessTest1_KeyedProcessFunction {
  public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // socket文本流
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

    // 转换成SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // 测试KeyedProcessFunction,先分组然后自定义处理
    dataStream.keyBy("id")
      .process( new MyProcess() )
      .print();

    env.execute();
  }

  // 实现自定义的处理函数
  public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
    ValueState<Long> tsTimerState;

    @Override
    public void open(Configuration parameters) throws Exception {
      tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
    }

    @Override
    public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
      out.collect(value.getId().length());

      // context
      // Timestamp of the element currently being processed or timestamp of a firing timer.
      ctx.timestamp();
      // Get key of the element being processed.
      ctx.getCurrentKey();
      //            ctx.output();
      ctx.timerService().currentProcessingTime();
      ctx.timerService().currentWatermark();
      // 在5处理时间的5秒延迟后触发
      ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
      tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
      //            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
      // 删除指定时间触发的定时器
      //            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
      System.out.println(timestamp + " 定时器触发");
      ctx.getCurrentKey();
      //            ctx.output();
      ctx.timeDomain();
    }

    @Override
    public void close() throws Exception {
      tsTimerState.clear();
    }
  }
}

9.2 TimerService和定时器(Timers)

​ Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:

​ 当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。

测试代码

下面举个例子说明KeyedProcessFunction 如何操作KeyedStream。

需求:监控温度传感器的温度值,如果温度值在10 秒钟之内(processing time)连续上升,则报警。

9.3 侧输出流(SideOutput)


测试代码

场景:温度>=30放入高温流输出,反之放入低温流输出

9.4 CoProcessFunction

标签:ProcessFunction,flink,ctx,API,org,apache,import,sensor,底层
来源: https://blog.csdn.net/qq_28403781/article/details/121098314