数据库
首页 > 数据库> > Flink基础(67):FLINK SQL(44) 自定义函数(三)自定义聚合函数(UDAF)

Flink基础(67):FLINK SQL(44) 自定义函数(三)自定义聚合函数(UDAF)

作者:互联网

本文为您介绍如何为实时计算Flink版自定义聚合函数(UDAF)搭建开发环境、编写业务代码及上线。

  注意 阿里云实时计算Flink版共享模式暂不支持自定义函数,仅独享模式支持自定义函数。

定义

自定义聚合函数(UDAF)可以将多条记录聚合成1条记录。

UDAF抽象类内部方法

  说明 虽然UDAF可以使用Java或Scala实现,但是建议您使用Java,因为Scala的数据类型有时会造成不必要的性能损失。 AggregateFunction的核心接口方法,如下所示:
/*
* @param <T> UDAF的输出结果的类型。
* @param <ACC> UDAF的accumulator的类型。accumulator是UDAF计算中用来存放计算中间结果的数据类型。您可以需要根据需要自行设计每个UDAF的accumulator。
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/*
* 初始化AggregateFunction的accumulator。
* 系统在进行第一个aggregate计算之前,调用一次此方法。
*/
public ACC createAccumulator();
/*
* 系统在每次aggregate计算完成后,调用此方法。
*/
public T getValue(ACC accumulator);
}

 

搭建开发环境

搭建开发环境请参见环境搭建

编写业务逻辑代码

以Java为例,举例代码如下。  
import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    //定义存放count UDAF状态的accumulator的数据的结构。
    public static class CountAccum {
        public long total;
    }

    //初始化count UDAF的accumulator。
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}

 

说明 AggregateFunction的子类支持open和close方法作为可选方法,请参见 自定义标量函数(UDF)或 自定义表值函数(UDTF)的写法。

注册使用

  1. 登录实时计算控制台
  2. 在顶部菜单中,单击开发。
  3. 在左侧的导航栏中,单击资源引用。
  4. 在资源引用页签的左上角,单击新建资源。
  5. 在上传资源页面,输入资源配置信息。
     
    参数名称说明
    上传方式 实时计算控制台上仅支持本地上传。   说明 本地上传JAR包的大小上限为300 MB。如果JAR包大小超过300 MB,请在集群绑定的OSS控制台上,或通过OpenAPI的方式上传JAR包。
    资源选择 单击选择资源,选择需要引用的资源。
    资源名称 输入资源名称。
    资源备注 输入资源备注信息。
    资源类型 选择引用资源类型,JAR、DICTIONARY或PYTHON。
  6. 在资源列表中,将鼠标悬停在目标资源右侧的更多上。
  7. 在下拉列表中,单击引用。
  8. 在作业编辑窗口,输入自定义函数声明,示例如下。  
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

上线和启动

自定义聚合函数(UDAF)的上线和启动步骤, 请参见上线启动

示例

-- UDAF计算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';

create table sls_stream(
   a int,
   b bigint,
   c varchar
) with (
   type='sls',
   endPoint='yourEndpoint',
   accessKeyId='yourAccessId',
   accessKeySecret='yourAccessSecret',
   startTime='2017-07-04 00:00:00',
   project='<yourPorjectName>',
   logStore='stream-test2',
   consumerGroup='consumerGroupTest3'
);

create table rds_output(
   len1 bigint,
   len2 bigint
) with (
   type='rds',
   url='yourDatabaseURL',
   tableName='<yourDatabaseTableName>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

insert into rds_output
select
   count(a),
   countUdaf(a)
from sls_stream;

 

 

 

 

 

 

 

 

 

标签:函数,自定义,accumulator,UDAF,67,accumulate,方法,public
来源: https://www.cnblogs.com/qiu-hua/p/15111631.html