其他分享
首页 > 其他分享> > flink双流JOIN原理

flink双流JOIN原理

作者:互联网


JOIN简介 谈flink双流JOIN之前,我们先谈一下大家最熟悉的mysql表join,我们知道表join有如下几种,具体区别就不在介绍了。那么流的join和表的join有什么区别呢?本文我们介绍一下。 CROSS JOIN - 交叉连接,计算笛卡儿积; INNER JOIN - 内连接,返回满足条件的记录; OUTER JOIN LEFT - 返回左表所有行,右表不存在补NULL; RIGHT - 返回右表所有行,左边不存在补NULL; FULL -  返回左表和右表的并集,不存在一边补NULL; SELF JOIN - 自连接,将表查询时候命名不同的别名
CROSS JOIN 交叉连接,计算笛卡儿积;
INNER JOIN 内连接,返回满足条件的记录
LEFT JOIN 返回左表所有行,右表不存在补NULL;
RIGHT JOIN 返回右表所有行,左边不存在补NULL;
FULL JOIN 返回左表和右表的并集,不存在一边补NULL;
SELF JOIN  自连接,将表查询时候命名不同的别名
支持的join类型     Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 可以转换为普通的INNER和OUTER)。在语义上面Apache Flink严格遵守标准SQL的语义
Apache Flink CROSS INNER OUTER SELF ON WHERE
N Y Y Y 可选 可选
双流JOIN操作注意事项     想要实现流的join我们要考虑数据的延迟,也就是不同流数据到达算子时间不一致的问题。这时候需要用到flink的水印,窗口,EventTime等概念,另外需要注意 Flink 对多表关联是直接顺序链接的,因此需要注意先进行结果集小的关联。同时 flink提供了两种流join的算子,Join和coGroup。具体区别参考上篇博客:flink实战--双流join之Join和coGroup的区别和应用_阿华田的博客-CSDN博客_flink join和cogroup,这篇博客中详细介绍了Join和coGroup的区别,以实现双流Join的案例。 双流JOIN与传统数据库表JOIN的区别     传统数据库表的JOIN是两张静态表的数据联接,在流上面是动态表,双流JOIN的数据不断流入与传统数据库表的JOIN有如下3个核心区别:     1、左右两边的数据集合无穷 - 传统数据库左右两个表的数据集合是有限的,双流JOIN的数据会源源不断的流入。     2、JOIN的结果不断产生/更新 - 传统数据库表JOIN是一次执行产生最终结果后退出,双流JOIN会持续不断的产生新的结果。     3、查询计算的双边驱动 - 双流JOIN由于左右两边的流的速度不一样,会导致左边数据到来的时候右边数据还没有到来,或者右边数据到来的时候左边数据没有到来,所以在实现中要将左右两边的流数据进行保存,以保证JOIN的语义。 数据Shuffle     分布式流计算所有数据会进行Shuffle,怎么才能保障左右两边流的要JOIN的数据会在相同的节点进行处理呢?在双流JOIN的场景,我们会利用JOIN中ON的联接key进行partition,确保两个流相同的联接key会在同一个节点处理,这个在flink的源码中有说明。

数据的保存     不论是INNER JOIN还是OUTER JOIN 都需要对左右两边的流的数据进行保存,JOIN算子会开辟左右两个State进行数据存储,左右两边的数据到来时候,进行如下操作:     1、LeftEvent到来存储到LState,RightEvent到来的时候存储到RState;         2、LeftEvent会去RightState进行JOIN,并发出所有JOIN之后的Event到下游;     3、RightEvent会去LeftState进行JOIN,并发出所有JOIN之后的Event到下游。

简单场景介绍实现原理 INNER JOIN 实现     JOIN有很多复杂的场景,我们先以最简单的场景进行实现原理的介绍,比如:最直接的两个进行INNER JOIN,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行INNER JOIN,JION条件是产品ID,具体如下:

    双流JOIN两边事件都会存储到State里面,如上,事件流按照标号先后流入到join节点,我们假设右边流比较快,先流入了3个事件,3个事件会存储到state中,但因为左边还没有数据,所有右边前3个事件流入时候,没有join结果流出,当左边第一个事件序号为4的流入时候,先存储左边state,再与右边已经流入的3个事件进行join,join的结果如图 三行结果会流入到下游节点sink。当第5号事件流入时候,也会和左边第4号事件进行join,流出一条jion结果到下游节点。这里关于INNER JOIN的语义和大家强调两点:     1、INNER JOIN只有符合JOIN条件时候才会有JOIN结果流出到下游,比如右边最先来的1,2,3个事件,流入时候没有任何输出,因为左边还没有可以JOIN的事件;     2、INNER JOIN两边的数据不论如何乱序,都能够保证和传统数据库语义一致,因为我们保存了左右两个流的所有事件到state中。 LEFT OUTER JOIN 实现     LEFT OUTER JOIN 可以简写 LEFT JOIN,语义上和INNER JOIN的区别是不论右流是否有JOIN的事件,左流的事件都需要流入下游节点,但右流没有可以JION的事件时候,右边的事件补NULL。同样我们以最简单的场景说明LEFT JOIN的实现,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行LEFT JOIN,JION条件是产品ID,具体如下:

下图也是表达LEFT JOIN的语义,只是展现方式不同:

上图主要关注点是当左边先流入1,2事件时候,右边没有可以join的事件时候会向下游发送左边事件并补NULL向下游发出,当右边第一个相同的Join key到来的时候会将左边先来的事件发出的带有NULL的事件撤回(对应上面command的-记录,+代表正向记录,-代表撤回记录)。这里强调三点:     1、左流的事件当右边没有JOIN的事件时候,将右边事件列补NULL后流向下游;* 当右边事件流入发现左边已经有可以JOIN的key的时候,并且是第一个可以JOIN上的右边事件(比如上面的3事件是第一个可以和左边JOIN key P001进行JOIN的事件)需要撤回左边下发的NULL记录,并下发JOIN完整(带有右边事件列)的事件到下游。后续来的4,5,6,8等待后续P001的事件是不会产生撤回记录的。     2、在Apache Flink系统内部事件类型分为正向事件标记为“+”和撤回事件标记为“-”。 RIGHT OUTER JOIN 和 FULL OUTER  JOINRIGHT JOIN内部实现与LEFT JOIN类似, FULL JOIN和LEFT JOIN的区别是左右两边都会产生补NULL和撤回的操作。对于State的使用都是相似的,这里不再重复说明了。 Flink维表JOIN     维表 JOIN 语法 由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法 SELECT * FROM T JOIN dim_table on T.id = dim_table.id    来表达维表 JOIN,是不完整的(对于双流join 来说,只要其中一起关联的流表发生变化,就会进行最新的关联)。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了 SQL:2011 Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。维表 JOIN 语法/示例如下。     假设我们有一个 Orders 订单数据流,希望根据产品 ID 补全流上的产品维度信息,所以需要跟 Products 维度表进行关联。Orders 和 Products 的 DDL 声明语句如下: CREATE TABLE Orders (   orderId VARCHAR,          -- 订单 id   productId VARCHAR,        -- 产品 id   units INT,                -- 购买数量   orderTime TIMESTAMP       -- 下单时间 ) with (   type = ''tt'',              -- tt 日志流   ... ) CREATE TABLE Products (   productId VARCHAR,        -- 产品 id   name VARCHAR,             -- 产品名称   unitPrice DOUBLE          -- 单价   PERIOD FOR SYSTEM_TIME,   -- 这是一张随系统时间而变化的表,用来声明维表   PRIMARY KEY (productId)   -- 维表必须声明主键 ) with (   type = ''alihbase'',        -- HBase 数据源   ... ) 如上声明了一张来自 TT 的 Orders 订单数据流,和一张存储于 HBase 中的 Products 产品维表。为了补齐订单流的产品信息,需要 JOIN 维表,这里可以分为 JOIN 当前表和 JOIN 历史表。 JOIN 当前维表 SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的维表关联。如上语法所示的,维表 JOIN 语法与传统的 JOIN 语法并无二异。只是 Products 维表后面需要跟上  FOR SYSTEM_TIME AS OF PROCTIME() 的关键字,其含义是每条到达的数据所关联上的是到达时刻的维表快照,也就是说,当数据到达时,我们会根据数据上的 key 去查询远程数据库,拿到匹配的结果后关联输出。这里的 PROCTIME 即 processing time。使用 JOIN 当前维表功能需要注意的是,如果维表插入了一条数据能匹配上之前左表的数据时,JOIN的结果流,不会发出更新的数据以弥补之前的未匹配。JOIN行为只发生在处理时间(processing time),即使维表中的数据都被删了,之前JOIN流已经发出的关联上的数据也不会被撤回或改变。

JOIN 历史维表 SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF o.orderTime AS p ON o.productId = p.productId 有时候想关联上的维度数据,并不是当前时刻的值,而是某个历史时刻的值。比如,产品的价格一直在发生变化,订单流希望补全的是下单时的价格,而不是当前的价格,那就是 JOIN 历史维表。语法上只需要将上文的  PROCTIME()改成o.orderTime 即可。含义是关联上的是下单时刻的 Products 维表。

Flink 在获取维度数据时,会根据左流的时间去查对应时刻的快照数据。因此 JOIN 历史维表需要外部存储支持多版本存储,如 HBase,或者存储的数据中带有多版本信息。注:JOIN 历史维表功能目前暂未开放维表优化在实际使用的过程中,会遇到许多性能问题。为了解决这些性能问题,我们做了大量的优化,性能得到大幅提升,在双11的洪峰下表现特别淡定。我们的优化主要是为了解决两方面的问题: 1. 提高吞吐。维表的IO请求严重阻塞了数据流的计算处理。 2. 降低维表数据库读压力。如 HBase 也只能承受单机每秒 20 万的读请求。

 

标签:join,双流,INNER,flink,事件,维表,JOIN,LEFT
来源: https://blog.csdn.net/qq_24505127/article/details/122772573