编程语言
首页 > 编程语言> > Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V)

Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V)

作者:互联网

@Spark分区器(Partitioner)

HashPartitioner(默认的分区器)

HashPartitioner分区原理是对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则余数+分区的个数,最后返回的值就是这个key所属的分区ID,当key为null值是返回0。
源码在org.apache.spark包下:
origin code:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions
  // 根据键的值来判断在哪一个分区
  def getPartition(key: Any): Int = key match {
    case null => 0   // 键为null始终在0分区
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) // 键不为0,根据键的hashCode值和分区数进行计算
  }
  
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

…………
}
// 底层实质:取模运算
def nonNegativeMod(x: Int, mod: Int): Int = {
   val rawMod = x % mod
   rawMod + (if (rawMod < 0) mod else 0)
}

RangePartitioner

HashPartitioner分区的实现可能会导致数据倾斜,极端情况下会导致某些分区拥有RDD的所有数据。而RangePartitioner分区器则尽量保证各个分区数据均匀,而且分区和分区之间是有序的,也就是说令一个分区中的元素均比另一个分区中的元素小或者大;但是分区内的元素是不能保证顺序的。简单地说就是将一定范围内的数据映射到一个分区内。
  sortByKey底层使用的数据分区器就是RangePartitioner分区器,该分区器的实现方式主要通过两个步骤实现:
①先从整个RDD中抽取样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[key]类型的数组变量rangeBounds;
②判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标。该分区器要求RDD中的key类型必须是可排序的。
origin code:

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

  // A constructor declared in order to maintain backward compatibility for Java, when we add the
  // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
  // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
    this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  }

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
  require(samplePointsPerPartitionHint > 0,
    s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

  // 获取RDD中key类型数据的排序器
  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      // 如果给定的分区数是一个的情况下,直接返回一个空的集合,表示数据不进行分区
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      // 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // 计算每个分区抽样的数据量大小,假设输入数据每个分区分布的比较均匀
      // 对于超大数据集(分区数量超过5万的)乘以3会让数据稍微增大一点,对于分区数低于5万的数据集,每个分区抽取数据量为60条也不算多
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      // 从RDD中抽取数据,返回值:(总RDD数据量,Array[分区id, 当前分区的数据量, 当前分区抽取的数据])
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        // 如果总的数据量为0(RDD为空),那么直接返回一个空的数组
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 计算总样本数量和总记录数的占比,占比最大为1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存样本数据的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 计算抽取出来的样本数据
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽样数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        // 对数据分布不均衡的RDD分区,重新进行数据抽样
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 获取数据分布不均衡的RDD分区,并构成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 随机种子
          val seed = byteswap32(-rdd.id - 1)
          // 利用RDD的sample抽样函数API进行数据抽样
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        // 将最终的抽样数据计算出rangeBounds
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

  // 下一个RDD的分区数量是rangeBounds数组中元素数量+1个
  def numPartitions: Int = rangeBounds.length + 1

  // 二分查找器,内部使用Java中的Arrays提供的二分查找方法
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  // 根据RDD的key值返回对应的分区id,从0开始
  def getPartition(key: Any): Int = {
    // 强制转换key类型为RDD中原本的数据类型
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      // 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      // 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标
      // 但是如果k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比所有的数据都大)
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    // 根据数据排序是升序还是降序进行数据的排列,默认为升序
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

  override def equals(other: Any): Boolean = other match {
    case r: RangePartitioner[_, _] =>
      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
    case _ =>
      false
  }

  override def hashCode(): Int = {
    val prime = 31
    var result = 1
    var i = 0
    while (i < rangeBounds.length) {
      result = prime * result + rangeBounds(i).hashCode
      i += 1
    }
    result = prime * result + ascending.hashCode
    result
  }

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => out.defaultWriteObject()
      case _ =>
        out.writeBoolean(ascending)
        out.writeObject(ordering)
        out.writeObject(binarySearch)

        val ser = sfactory.newInstance()
        Utils.serializeViaNestedStream(out, ser) { stream =>
          stream.writeObject(scala.reflect.classTag[Array[K]])
          stream.writeObject(rangeBounds)
        }
    }
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => in.defaultReadObject()
      case _ =>
        ascending = in.readBoolean()
        ordering = in.readObject().asInstanceOf[Ordering[K]]
        binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

        val ser = sfactory.newInstance()
        Utils.deserializeViaNestedStream(in, ser) { ds =>
          implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
          rangeBounds = ds.readObject[Array[K]]()
        }
    }
  }
}

将一定范围内的数映射到某一个分区内,在实现中,分界(rangeBounds)算法用到了水塘抽样算法。RangePartitioner的重点在于构建rangeBounds数组对象,主要步骤是:

  1. 如果分区数量小于2或者RDD中不存在数据的情况下,直接返回一个空的数组,不需要计算range的边界;如果分区数量大于1的情况下,而且RDD中有数据的情况下,才需要计算数组对象
  2. 计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量
  3. 根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePartition
  4. 调用RangePartitioner的sketch函数进行数据抽样,计算出每个分区的样本
  5. 计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜
  6. 对于数据量比较多的RDD分区调用RDD的sample函数API重新进行数据获取
  7. 将最终的样本数据通过RangePartitioner的determineBounds函数进行数据排序分配,计算出rangeBounds

RangePartitioner的sketch函数的作用是对RDD中的数据按照需要的样本数据量进行数据抽取,主要调用SamplingUtils类的reservoirSampleAndCount方法对每个分区进行数据抽取,抽取后计算出整体所有分区的数据量大小;reserviorSampleAndCount方法的抽取方式是先从迭代器中获取样本数量个数据(顺序获取),然后对剩余的数据进行判断,替换之前的样本数据,最终达到数据抽样的效果。RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界。

RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界,源代码如下:
origin code:

/**
   * Determines the bounds for range partitioning from candidates with weights indicating how many
   * items each represents. Usually this is 1 over the probability used to sample this candidate.
   *
   * @param candidates unordered candidates with weights
   * @param partitions number of partitions
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 按照数据进行排序,默认升序排序
    val ordered = candidates.sortBy(_._1)
    // 获取总的样本数据大小
    val numCandidates = ordered.size
    // 计算总的权重大小
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 计算步长
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // 获取排序后的第i个数据及权重
      val (key, weight) = ordered(i)
      // 累计权重
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        // 权重已经达到一个步长的范围,计算出一个分区id的值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {// 上一个边界值为空,或者当前边界值key数据大于上一个边界的值,那么当前key有效,进行计算
          // 添加当前key到边界集合中
          bounds += key
          // 累计target步长界限
          target += step
          // 分区数量加1
          j += 1
          // 上一个边界的值重置为当前边界的值
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // 返回结果
    bounds.toArray
  }

自定义分区器

自定义分区器是需要继承org.apache.spark.Partitioner类并实现以下三个方法:

  1. numPartitioner: Int:返回创建出来的分区数
  2. getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions - 1)
  3. equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同

e.g.1

// CustomPartitioner
import org.apache.spark.Partitioner

/**
 * @param numPartition 分区数量
 */
class CustomPartitioner(numPartition: Int) extends Partitioner{
    // 返回分区的总数
    override def numPartitions: Int = numPartition

    // 根据传入的 key 返回分区的索引
    override def getPartition(key: Any): Int = {
        key.toString.toInt % numPartition
    }
}

// CustomPartitionerDemo
import com.work.util.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD


object CustomPartitionerDemo {
    def main(args: Array[String]): Unit = {
        val sc: SparkContext = SparkUtil.getSparkContext()
        println("=================== 原始数据 =====================")
        // zipWithIndex 该函数将 RDD 中的元素和这个元素在 RDD 中的 ID(索引号)组合成键值对
        val data: RDD[(Int, Long)] = sc.parallelize(0 to 10, 1).zipWithIndex()
        println(data.collect().toBuffer)

        println("=================== 分区和数据组合成 Map =====================")
        val func: (Int, Iterator[(Int, Long)]) => Iterator[String] = (index: Int, iter: Iterator[(Int, Long)]) => {
            iter.map(x => "[partID:" + index + ", value:" + x + "]")
        }
        val array: Array[String] = data.mapPartitionsWithIndex(func).collect()
        for (i <- array) {
            println(i)
        }

        println("=================== 自定义5个分区和数据组合成 Map =====================")
        val rdd1: RDD[(Int, Long)] = data.partitionBy(new CustomPartitioner(5))
        val array1: Array[String] = rdd1.mapPartitionsWithIndex(func).collect()
        for (i <- array1) {
            println(i)
        }
    }
}

e.g.2

// SubjectPartitioner
import org.apache.spark.Partitioner
import scala.collection.mutable

/**
 *
 * @param subjects 学科数组
 */
class SubjectPartitioner(subjects: Array[String]) extends Partitioner {
    // 创建一个 map 集合用来存储到分区号和学科
    val subject: mutable.HashMap[String, Int] = new mutable.HashMap[String, Int]()
    // 定义一个计数器,用来生成自定义分区号
    var i = 0
    for (s <- subjects) {
        // 存储学科和分区
        subject += (s -> i)
        // 分区自增
        i += 1
    }

    // 获取分区数
    override def numPartitions: Int = subjects.size

    // 获取分区号(如果传入 key 不存在,默认将数据存储到 0 分区)
    override def getPartition(key: Any): Int = subject.getOrElse(key.toString, 0)
}

// SubjectPartitionerDemo
import java.net.URL

import com.work.util.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SubjectPartitionerDemo {
    def main(args: Array[String]): Unit = {
        // 获取上下文对象
        val sc: SparkContext = SparkUtil.getSparkContext()
        val tuples: RDD[(String, Int)] = sc.textFile("src/main/data/project.txt").map(line => {
            val fields: Array[String] = line.split("\t")
            for (i <- fields) {
                println(i)
            }
            // 取出 url
            val url: String = fields(1)
            (url, 1)
        })
        // 将相同的 url 进行聚合,得到了各个学科的访问量
        val sumed: RDD[(String, Int)] = tuples.reduceByKey(_ + _).cache()
        // 从 url 中取出学科的字段,数据组成:学科,url,统计数量
        val subjectAndUC: RDD[(String, (String, Int))] = sumed.map(tup => {
            // 用户 url
            val url: String = tup._1
            // 统计的访问量
            val count: Int = tup._2
            // 学科
            val subject: String = new URL(url).getHost
            (subject, (url, count))
        })
        // 将所有学科取出来
        val subjects: Array[String] = subjectAndUC.keys.distinct.collect
        // 创建自定义分区器对象
        val partitioner: SubjectPartitioner = new SubjectPartitioner(subjects)
        // 分区
        val partitioned: RDD[(String, (String, Int))] = subjectAndUC.partitionBy(partitioner)
        // 取 top3
        val result: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
            val list: List[(String, (String, Int))] = it.toList
            val sorted: List[(String, (String, Int))] = list.sortBy(_._2._2).reverse
            val top3: List[(String, (String, Int))] = sorted.take(3)
            // 因为方法的返回值需要一个 iterator
            top3.iterator
        })
        // 存储数据
        result.saveAsTextFile("src/main/data/out/")
        // 释放资源
        sc.stop()
    }
}

标签:String,val,Int,分区,Partitioner,RDD,key,Spark
来源: https://www.cnblogs.com/unknownshangke/p/16443710.html