其他分享
首页 > 其他分享> > Flink键控状态AggregatingState开发实例

Flink键控状态AggregatingState开发实例

作者:互联网

一、键控状态说明

参考官网说明,几个键控状态介绍如下:

注意点:

状态后端目前有三种状态:
MemoryStateBackend:内存级别,一般测试环境使用
FsStateBackend:本地状态在JobManager内存, Checkpoint存储在文件系统中,可应用于生成
RocksDBStateBackend:将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储),使用超大状态作业,对读写状态性能要求不高的作业

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法:

二、开发实例代码

基于流数据获取每个ID的平均水位

  1. 结果展示
    在这里插入图片描述
  2. 代码部分
package com.test;
import bean.WaterSensor2;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.lang.reflect.Type;
/**
 * @author: Rango
 * @create: 2021-05-07 18:53
 * @description:
 **/
public class WaterMarkAvg {
    public static void main(String[] args) throws Exception {
         //前面常规操作,建立环境建立连接装换数据 分组
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> hadoop102 = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<WaterSensor2> mapDS = hadoop102.map(new MapFunction<String, WaterSensor2>() {
            @Override
            public WaterSensor2 map(String value) throws Exception {
                String[] split = value.split(",");
                return new WaterSensor2(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            }
        });
        KeyedStream<WaterSensor2, String> keyedStream = mapDS.keyBy(WaterSensor2::getId);
       //主要处理过程,ACC部分使用Tuple2来实现
        SingleOutputStreamOperator<WaterSensor2> streamOperator = keyedStream.process(
                new KeyedProcessFunction<String, WaterSensor2, WaterSensor2>() {
                    //<IN,OUT>
                    private AggregatingState<Double, Double> aggregatingState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        aggregatingState = getRuntimeContext()
                                .getAggregatingState(new AggregatingStateDescriptor<Double, Tuple2<Double, Integer>, Double>(
                                        "agg-state", new AggregateFunction<Double, Tuple2<Double, Integer>, Double>() {
                                    @Override
                                    public Tuple2<Double, Integer> createAccumulator() {
                                        return Tuple2.of(0.0, 0);
                                    }
                                    @Override
                                    public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
                                        return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                                    }
                                    @Override
                                    public Double getResult(Tuple2<Double, Integer> accumulator) {
                                        return accumulator.f0 / accumulator.f1;
                                    }
                                    @Override
                                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                                    }
                                }, Types.TUPLE(Types.DOUBLE, Types.INT)));
                    }
                    @Override
                    public void processElement(WaterSensor2 value, Context ctx, Collector<WaterSensor2> out) throws Exception {
                        aggregatingState.add(value.getVc());
                        out.collect(new WaterSensor2(value.getId(), value.getTs(), aggregatingState.get()));
                    }});
        streamOperator.print();
        env.execute();
    }}

补充:AggregatingStateDescriptor<IN, ACC, OUT>,中间的ACC使用Tuple2作为累加器实现比较麻烦,可以使用自定义一个类来实现累加

//自定义一个bean类来作为累加器使用,使用lombok简化编写
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AvgVc {
    Double vc;
    Integer count;
}

主类实现部分可以修改如下:

public void open(Configuration parameters) throws Exception {
	aggState = getRuntimeContext().getAggregatingState(
		new AggregatingStateDescriptor<Double, AvgVc, Double>("state-agg",
			new AggregateFunction<Double, AvgVc, Double>() {
				@Override
				public AvgVc createAccumulator() {
					return new AvgVc(0.0, 0);
				}
				@Override
				public AvgVc add(Double value, AvgVc accumulator) {
					return new AvgVc(accumulator.getVc() + value,
							accumulator.getCount() + 1);
				}
				@Override
				public Double getResult(AvgVc accumulator) {
					return accumulator.getVc() / accumulator.getCount();
				}
				@Override
				public AvgVc merge(AvgVc a, AvgVc b) {
					return new AvgVc(a.getVc() + b.getVc(), a.getCount() + b.getCount());
				}
			}, AvgVc.class)
	);
}

学习交流,有任何问题还请随时评论指出交流。

标签:Flink,flink,键控,Tuple2,AggregatingState,org,apache,import,public
来源: https://blog.csdn.net/Rango_lhl/article/details/116525052