flink SQL 外关联
作者:互联网
最近flink 因为 其吞吐量 ,exactly once 特性 比较热门 ,尤其是 flink SQL 的易于管理 和 复用的特点 ,都使得大数据团队最近更加喜欢选择flink 进行数据处理 分析等,其他的优势就不一一对比了,下面记录一下我用flink sql遇见的一个小问题。
flink SQL的底层解析用的是apache calcite , hive SQL 也用的calcite解析,因此 flinkSQL 的大致原理和我们常见的sql差不多。outer join的话 ,就是查出有结果的话 就返回结果,没有结果的话将没有结果的字段用null 补齐。
针对官方给出的flinkSQL的例子 进行演示
两个数据流分别是:
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(1L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(1L, "rubber", 3),
Order(4L, "beer", 1)))
注册成表:
// convert DataStream to Table
var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
// register DataStream as Table
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
查询:
val result = tEnv.sqlQuery(
s"SELECT a.`user`,OrderB.product,a.amount FROM $tableA AS a " +
s"FULL OUTER JOIN OrderB ON a.`user` = OrderB.`user`")
结果如下:
1> (true,null,pen,null)
2> (true,1,null,2)
4> (true,null,beer,null)
2> (true,1,null,4)
2> (false,1,null,2)
2> (true,1,rubber,2)
2> (false,1,null,4)
2> (true,1,rubber,4)
2> (true,1,rubber,3)
但是如果指定结果输出类型是 case class的话 只会输出join 结果 ,因为 case class 不支持null 类型 ,这里 我指定为row 作为输出类型:
result.toRetractStream[Row].print()
标签:flink,关联,rubber,user,SQL,null,true,Order 来源: https://blog.csdn.net/weixin_39419040/article/details/89361854