编程语言
首页 > 编程语言> > Spark源码-2.3 Join物理实现-I

Spark源码-2.3 Join物理实现-I

作者:互联网

StreamedTable与BuildTable(BufferedTable)

SparkSql将join的两张表按流式表(StreamedTable)和构建表(BuildTable/BufferedTable)区分。尽管join实现可能不同,但通常而言,构建表被作为查找表数据结构,流式表作为顺序遍历的数据结构。通常通过一条条迭代流式表中数据,并在构建表中查找与当前流式表数据join键值相同的数据来实现两表的join。

构建表为Hash表数据结构

HashedRelation

BroadcastHashJoinExecShuffledHashJoinExec两种join实现将构建表数据构建为HashedRelation,用于查找与流式表匹配的数据。HashedRelation的两个实现:

两种hash表实现将数据以字节的方式存储,都继承MemoryConsumer,是适用于spark内存管理体系的appendOnly的hash表实现。使得join数据查找高效快速。

BroadcastHashJoinExec

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

    val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
    streamedPlan.execute().mapPartitions { streamedIter =>
      val hashed = broadcastRelation.value.asReadOnlyCopy()
      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
      join(streamedIter, hashed, numOutputRows)
    }
  }

doExecute方法可以看出,流式表在每个partition内将该partition数据的迭代器和广播到每个executor且已处理为HashedRelation的构建表进行join计算。

ShuffledHashJoinExec

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
      val hashed = buildHashedRelation(buildIter)
      join(streamIter, hashed, numOutputRows)
    }
  }

在ShuffledHashJoinExec计算前,左右表的数据经过Exchange会以join键值按相同的分区方式进行shuffle。在doExecute方法中,将两表的rdd经过zipPartitions方法结合,可以使得shuffle后相同键值的数据在一起进行计算。其中构建表的分区被构建为HashedRelation与流式表分区的数据进行join计算。

HashJoin

由于上面两中join都是基于hash的,两种物理计划都继承特质HashJoin,二者对流式表数据和构建为HashedRelation的构建表数据的join实现其实都是调用HashJoin的join方法。

protected def join(
      streamedIter: Iterator[InternalRow],
      hashed: HashedRelation,
      numOutputRows: SQLMetric): Iterator[InternalRow] = {

    val joinedIter = joinType match {
      case _: InnerLike =>
        innerJoin(streamedIter, hashed)
      case LeftOuter | RightOuter =>
        outerJoin(streamedIter, hashed)
      case LeftSemi =>
        semiJoin(streamedIter, hashed)
      case LeftAnti =>
        antiJoin(streamedIter, hashed)
      case j: ExistenceJoin =>
        existenceJoin(streamedIter, hashed)
      case x =>
        throw new IllegalArgumentException(
          s"BroadcastHashJoin should not take $x as the JoinType")
    }

    val resultProj = createResultProjection
    joinedIter.map { r =>
      numOutputRows += 1
      resultProj(r)
    }
  }

不同的join类型以不同的方法实现;以inner join为例说明,其他join方式类似:

private def innerJoin(
      streamIter: Iterator[InternalRow],
      hashedRelation: HashedRelation): Iterator[InternalRow] = {
    val joinRow = new JoinedRow
    val joinKeys = streamSideKeyGenerator()
    streamIter.flatMap { srow =>
      joinRow.withLeft(srow)
      val matches = hashedRelation.get(joinKeys(srow))
      if (matches != null) {
        matches.map(joinRow.withRight(_)).filter(boundCondition)
      } else {
        Seq.empty
      }
    }
  }

可以看到,对流式表数据的迭代器顺序遍历,每取出一条流式数据,就根据其join键值在构建表中查询能够join上的数据,然后根据join时其他条件进行过滤计算join结果(视join类型返回join结果或空)。由于相等join键值的查找基于hash表,所以这两种实现被称为hash join。

构建表为有序数据结构

SortExec

SortMergeJoinExec计算前,左右表的数据经过Exchange会以join键值按相同的分区方式进行shuffle,然后与ShuffledHashJoinExec类似,在doExecute方法中,将两表的rdd经过zipPartitions方法结合,可以使得shuffle后相同键值的数据在一起进行计算。

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    val spillThreshold = getSpillThreshold
    val inMemoryThreshold = getInMemoryThreshold
    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => 
    ...
  }
}

实际上,在Exchange之后还会插入SortExec物理计划对左右表rdd数据按join键值进行分区内排序;所以,SortMergeJoinExec将左右表rdd的分区zip在一起后,左右表分区已经是按join键值有序的。SortExecdoExecute方法:

protected override def doExecute(): RDD[InternalRow] = {
    val peakMemory = longMetric("peakMemory")
    val spillSize = longMetric("spillSize")
    val sortTime = longMetric("sortTime")

    child.execute().mapPartitionsInternal { iter =>
      val sorter = createSorter()
      ...
      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
      ...
      sortedIterator
    }
  }

SortExec的主要工作仅仅将分区内数据排序,而其使用的排序类为UnsafeExternalRowSorter,其原理后续研究。

SortMergeJoinExec

SortMergeJoinExec对不同join类型的实现代码较长。但其核心流程与思想相同。由于流式表与构建表被zip到一起的分区数据join键值相同的一定在一起,且两个分区数据已经有序,两边只需按顺序遍历即可进行join操作。以下列示例数据为例:

seqstreamedTableseqbuildTable
0101
1212
2322
33

以*表示当前遍历到的行:
streamdTable遍历到第0行1,发现buildTable第0行也为1,从而0取出与streamTable当前行join。

seqstreamedTableseqbuildTable
0*10*1
1212
2322
33

streamdTable遍历到第1行2,发现buildTable第1、2行也为2,从而1、2取出与streamTable当前行join。

seqstreamedTableseqbuildTable
0101
1*21*2
232*2
33

streamdTable遍历到第3行3,发现buildTable第3行也为3,从而3取出与streamTable当前行join。

seqstreamedTableseqbuildTable
0101
1212
2*322
3*3

可以看出,由于数据有序,对构建表中数据的查找只需按顺序遍历,然后将键值相同的左右表数据合并(join)在一起,SortMerge这个名字变得非常直观。

参考文章

标签:join,val,hashed,流式,源码,键值,2.3,Join,数据
来源: https://blog.csdn.net/weixin_42265234/article/details/115741280