Spark源码-2.3 Join物理实现-I
作者:互联网
- BroadcastHashJoinExec
- ShuffledHashJoinExec
- SortMergeJoinExec
StreamedTable与BuildTable(BufferedTable)
SparkSql将join的两张表按流式表(StreamedTable)和构建表(BuildTable/BufferedTable)区分。尽管join实现可能不同,但通常而言,构建表被作为查找表数据结构,流式表作为顺序遍历的数据结构。通常通过一条条迭代流式表中数据,并在构建表中查找与当前流式表数据join键值相同的数据来实现两表的join。
构建表为Hash表数据结构
HashedRelation
BroadcastHashJoinExec
和ShuffledHashJoinExec
两种join实现将构建表数据构建为HashedRelation
,用于查找与流式表匹配的数据。HashedRelation
的两个实现:
LongHashedRelation
- 用于join键值只有一列的情况
- hash表实现
LongToUnsafeRowMap
UnsafeHashedRelation
- 用于join键值有多列的情况
- hash表实现
BytesToBytesMap
两种hash表实现将数据以字节的方式存储,都继承MemoryConsumer
,是适用于spark内存管理体系的appendOnly的hash表实现。使得join数据查找高效快速。
BroadcastHashJoinExec
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
streamedPlan.execute().mapPartitions { streamedIter =>
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
join(streamedIter, hashed, numOutputRows)
}
}
从doExecute
方法可以看出,流式表在每个partition内将该partition数据的迭代器和广播到每个executor且已处理为HashedRelation
的构建表进行join计算。
ShuffledHashJoinExec
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
join(streamIter, hashed, numOutputRows)
}
}
在ShuffledHashJoinExec计算前,左右表的数据经过Exchange会以join键值按相同的分区方式进行shuffle。在doExecute
方法中,将两表的rdd经过zipPartitions
方法结合,可以使得shuffle后相同键值的数据在一起进行计算。其中构建表的分区被构建为HashedRelation
与流式表分区的数据进行join计算。
HashJoin
由于上面两中join都是基于hash的,两种物理计划都继承特质HashJoin
,二者对流式表数据和构建为HashedRelation
的构建表数据的join实现其实都是调用HashJoin
的join方法。
protected def join(
streamedIter: Iterator[InternalRow],
hashed: HashedRelation,
numOutputRows: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
case _: InnerLike =>
innerJoin(streamedIter, hashed)
case LeftOuter | RightOuter =>
outerJoin(streamedIter, hashed)
case LeftSemi =>
semiJoin(streamedIter, hashed)
case LeftAnti =>
antiJoin(streamedIter, hashed)
case j: ExistenceJoin =>
existenceJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
}
val resultProj = createResultProjection
joinedIter.map { r =>
numOutputRows += 1
resultProj(r)
}
}
不同的join类型以不同的方法实现;以inner join为例说明,其他join方式类似:
private def innerJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
val joinRow = new JoinedRow
val joinKeys = streamSideKeyGenerator()
streamIter.flatMap { srow =>
joinRow.withLeft(srow)
val matches = hashedRelation.get(joinKeys(srow))
if (matches != null) {
matches.map(joinRow.withRight(_)).filter(boundCondition)
} else {
Seq.empty
}
}
}
可以看到,对流式表数据的迭代器顺序遍历,每取出一条流式数据,就根据其join键值在构建表中查询能够join上的数据,然后根据join时其他条件进行过滤计算join结果(视join类型返回join结果或空)。由于相等join键值的查找基于hash表,所以这两种实现被称为hash join。
构建表为有序数据结构
SortExec
在SortMergeJoinExec
计算前,左右表的数据经过Exchange会以join键值按相同的分区方式进行shuffle,然后与ShuffledHashJoinExec
类似,在doExecute
方法中,将两表的rdd经过zipPartitions
方法结合,可以使得shuffle后相同键值的数据在一起进行计算。
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
...
}
}
实际上,在Exchange之后还会插入SortExec
物理计划对左右表rdd数据按join键值进行分区内排序;所以,SortMergeJoinExec
将左右表rdd的分区zip在一起后,左右表分区已经是按join键值有序的。SortExec
的doExecute
方法:
protected override def doExecute(): RDD[InternalRow] = {
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val sortTime = longMetric("sortTime")
child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
...
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
...
sortedIterator
}
}
SortExec
的主要工作仅仅将分区内数据排序,而其使用的排序类为UnsafeExternalRowSorter
,其原理后续研究。
SortMergeJoinExec
SortMergeJoinExec对不同join类型的实现代码较长。但其核心流程与思想相同。由于流式表与构建表被zip到一起的分区数据join键值相同的一定在一起,且两个分区数据已经有序,两边只需按顺序遍历即可进行join操作。以下列示例数据为例:
seq | streamedTable | seq | buildTable |
---|---|---|---|
0 | 1 | 0 | 1 |
1 | 2 | 1 | 2 |
2 | 3 | 2 | 2 |
3 | 3 |
以*表示当前遍历到的行:
streamdTable遍历到第0行1,发现buildTable第0行也为1,从而0取出与streamTable当前行join。
seq | streamedTable | seq | buildTable |
---|---|---|---|
0* | 1 | 0* | 1 |
1 | 2 | 1 | 2 |
2 | 3 | 2 | 2 |
3 | 3 |
streamdTable遍历到第1行2,发现buildTable第1、2行也为2,从而1、2取出与streamTable当前行join。
seq | streamedTable | seq | buildTable |
---|---|---|---|
0 | 1 | 0 | 1 |
1* | 2 | 1* | 2 |
2 | 3 | 2* | 2 |
3 | 3 |
streamdTable遍历到第3行3,发现buildTable第3行也为3,从而3取出与streamTable当前行join。
seq | streamedTable | seq | buildTable |
---|---|---|---|
0 | 1 | 0 | 1 |
1 | 2 | 1 | 2 |
2* | 3 | 2 | 2 |
3* | 3 |
可以看出,由于数据有序,对构建表中数据的查找只需按顺序遍历,然后将键值相同的左右表数据合并(join)在一起,SortMerge这个名字变得非常直观。
参考文章
- Spark源码阅读(三十二): SparkSQL之Join
- 每个 Spark 工程师都应该知道的五种 Join 策略
- Spark SQL 中 Broadcast Join 一定比 Shuffle Join 快?那你就错了。
标签:join,val,hashed,流式,源码,键值,2.3,Join,数据 来源: https://blog.csdn.net/weixin_42265234/article/details/115741280