编程语言
首页 > 编程语言> > Spark2.x精通:源码剖析SortShuffleWriter具体实现

Spark2.x精通:源码剖析SortShuffleWriter具体实现

作者:互联网

 一、概述


    接之前两篇ShuffleWrite文章,这里讲解Spark Shuffle Write的第三种实现SortShuffleWriter,在ShuffleWrite阶段,如果不满足UnsafeShuffleWriter、BypassMergeSortShuffleWriter两种条件,最后代码执行SortShuffleWriter,这里来看看他的具体实现:


二、具体实现

    这里直接看Write()函数,代码如下:

 /** Write a bunch of records to this task's output */  override def write(records: Iterator[Product2[K, V]]): Unit = {    // 根据是否在map端进行数据合并初始化ExternalSorter    //ExternalSorter初始化对应参数的含义    // aggregator:在RDD shuffle时,map/reduce-side使用的aggregator    // partitioner:对shuffle的输出,使用哪种partitioner对数据做分区,比如hashPartitioner或者rangePartitioner    // ordering:根据哪个key做排序    // serializer:使用哪种序列化,如果没有显示指定,默认使用spark.serializer参数值    sorter = if (dep.mapSideCombine) {      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")      new ExternalSorter[K, V, C](        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)    } else {      // 如果没有map-side聚合,那么创建sorter对象时候,aggregator和ordering将不传入对应的值      new ExternalSorter[K, V, V](        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)    }    //通过insertAll方法先写数据到buffer    sorter.insertAll(records)
   // 构造最终的输出文件实例,其中文件名为(reduceId为0):    // "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)    //在输出文件名后加上uuid用于标识文件正在写入,结束后重命名    val tmp = Utils.tempFileWith(output)
   try {      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
     //将排序后的record写入输出文件      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)      //生成index文件,也就是每个reduce通过该index文件得知它哪些是属于它的数据      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)      //构造MapStatus返回结果,里面含有ShuffleWriter输出结果的位置信息      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)    } finally {      if (tmp.exists() && !tmp.delete()) {        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")      }    }  }      

    

    其中ExternalSorter是SortShuffleWriter一个排序类,这个类用于对一些(K, V)类型的key-value对进行排序,如果需要就进行merge,生的结果是一些(K, C)类型的key-combiner对。combiner就是对同样key的value进行合并的结果。它首先使用一个Partitioner来把key分到不同的partition,然后,如果有必要的话,就把每个partition内部的key按照一个特定的Comparator来进行排序。它可以输出只一个分区了的文件,其中不同的partition位于这个文件的不同区域(在字节层面上每个分区是连续的),这样就适用于shuffle时对数据的抓取。


2.这里接着看上面代码第14行的 sorter.insertAll(records)函数,里面其实干了很多事情,代码如下:

 def insertAll(records: Iterator[Product2[K, V]]): Unit = {       //这里获取Map是否聚合标识    val shouldCombine = aggregator.isDefined    //根据是否进行Map端聚合,来决定使用map还是buffer,    // 如果需要通过key做map-side聚合,则使用PartitionedAppendOnlyMap;    // 如果不需要,则使用PartitionedPairBuffer    if (shouldCombine) {      // 使用AppendOnlyMap优先在内存中进行combine      // 获取aggregator的mergeValue函数,用于merge新的值到聚合记录      val mergeValue = aggregator.get.mergeValue       // 获取aggregator的createCombiner函数,用于创建聚合的初始值      val createCombiner = aggregator.get.createCombiner      var kv: Product2[K, V] = null      val update = (hadValue: Boolean, oldValue: C) => {      //创建update函数,如果有值进行mergeValue,如果没有则createCombiner        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)      }      while (records.hasNext) {        addElementsRead()        kv = records.next()        ////通过key计算partition ID,通过partition ID对数据进行排序        //这里的partitionID其实就是Reduce个数        // 对key计算分区,然后开始进行merge        map.changeValue((getPartition(kv._1), kv._1), update)         // 如果需要溢写内存数据到磁盘        maybeSpillCollection(usingMap = true)      }    } else {      // Stick values into our buffer      while (records.hasNext) {        addElementsRead()        val kv = records.next()        //通过key计算partition ID,通过partition ID对数据进行排序        //这里的partitionID其实就是Reduce个数        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])        // 当buffer达到内存限制时(buffer默认大小32k,由spark.shuffle.file.buffer参数决定),会将buffer中的数据spill到文件中        maybeSpillCollection(usingMap = false)      }    }  }


3.下面继续跟踪maybeSpillCollection()函数,如何对内存数据溢写的,代码如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {    var estimatedSize = 0L   // 如果是map ,也就是Map端需要聚合的情况    if (usingMap) {      //这里预估一个值,根据预估值判断是否需要溢写,      // 如果需要,溢写完成后重新初始化一个map      estimatedSize = map.estimateSize()      if (maybeSpill(map, estimatedSize)) {        map = new PartitionedAppendOnlyMap[K, C]      }     // 这里执行的map不需要聚合的情况    } else {      //这里预估一个值,根据预估值判断是否需要溢写,      // 如果需要,溢写完成后重新初始化一个buffer       estimatedSize = buffer.estimateSize()      if (maybeSpill(buffer, estimatedSize)) {        buffer = new PartitionedPairBuffer[K, C]      }    }    if (estimatedSize > _peakMemoryUsedBytes) {      _peakMemoryUsedBytes = estimatedSize    }  }

 

4.上面涉及到溢写判断函数maybeSpill,我们看下他是如何进行判断的,代码如下:

// maybeSpill函数判断大体分了三步// 1.为当前线程尝试获取amountToRequest大小的内存(amountToRequest = 2 * currentMemory - myMemoryThreshold)。// 2.如果获得的内存依然不足(myMemoryThreshold <= currentMemory),则调用spill,执行溢出操作。内存不足可能是申请到的内存为0或者已经申请得到的内存大小超过了myMemoryThreshold。// 3.溢出后续处理,如elementsRead归零,已溢出内存字节数(memoryBytesSpilled)增加线程当前内存大小(currentMemory),释放当前线程占用的内存。 protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {    var shouldSpill = false    //其中内存阈值myMemoryThreshold  由参数spark.shuffle.spill.initialMemoryThreshold控制,默认是5M    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {      // Claim up to double our current memory from the shuffle memory pool      val amountToRequest = 2 * currentMemory - myMemoryThreshold      //底层调用TaskMemoryManager的acquireExecutionMemory方法分配内存      val granted = acquireMemory(amountToRequest)      // 更新现在内存阀值      myMemoryThreshold += granted     //再次判断当前内存是否大于阀值,如果还是大于阀值则spill      shouldSpill = currentMemory >= myMemoryThreshold    }    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold    // Actually spill    if (shouldSpill) {      _spillCount += 1      logSpillage(currentMemory)      //进行spill,这了溢写肯定先写到缓冲区,后写到磁盘,      //有个比较重要的参数spark.shuffle.file.buffer  默认32k, 优化时常进行调整      spill(collection)      _elementsRead = 0      _memoryBytesSpilled += currentMemory      releaseMemory()    }    shouldSpill  }


    里面还有更深层次的代码,这里就不再跟踪了,只要是了解了整个大体思路即可,有兴趣的自己去跟踪看下即可。


   为方便大家理解,下面给大家画了下SorteShuffleWriter执行的流程图,BypassMergeSortShuffleWriter和UnsafeShuffleWriter的处理流程与这个流程基本一致,只是具体的实现稍有差异,水平有限,仅供参考:



图片


标签:map,val,buffer,Spark2,dep,源码,kv,aggregator,SortShuffleWriter
来源: https://blog.51cto.com/15080019/2653908