ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink Table Api 之表函数使用

2022-03-20 10:02:32  阅读:257  来源: 互联网

标签:flink Flink tableEnv 之表 Api org apache import id


表函数(Table Functions)

  • 用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值;
  • 为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类TableFunction 并实现(一个或多个)求值方法;
  • 表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval
比如,通过自定义表函数,对读取数据中的某些字段可以根据业务需求做一些特殊的处理 下面看一个具体的代码案例
package com.congge.table.api.udf;


import com.congge.source.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class TestTableFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";

        // 1. 读取数据
        DataStreamSource<String> inputStream = env.readTextFile(path);

        // 2. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 3. 将流转换成表
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

        // 4. 自定义表函数,实现将id拆分,并输出(word, length)
        // table API
        Split split = new Split("_");

        // 需要在环境中注册UDF
        tableEnv.registerFunction("split", split);
        Table resultTable = sensorTable
                .joinLateral("split(id) as (word, length)")
                .select("id, ts, word, length");

        // SQL写法
        tableEnv.createTemporaryView("sensor", sensorTable);
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
                " from sensor, lateral table(split(id)) as splitid(word, length)");

        // 打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }

    // 实现自定义TableFunction
    public static class Split extends TableFunction<Tuple2<String, Integer>>{
        // 定义属性,分隔符
        private String separator = ",";

        public Split(String separator) {
            this.separator = separator;
        }

        // 必须实现一个eval方法,没有返回值
        public void eval( String str ){
            for( String s: str.split(separator) ){
                collect(new Tuple2<>(s, s.length()));
            }
        }
    }

}

本例的需求是,通过 Flink Table Api读取原始文件数据,然后通过自定义表函数,将读取到的id字段数据通过类似侧写的方式进行输出,统计id字段长度,运行上面的代码,观察控制台打印效果;

 

标签:flink,Flink,tableEnv,之表,Api,org,apache,import,id
来源: https://blog.csdn.net/congge_study/article/details/123608099

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有