编程语言
首页 > 编程语言> > SparkRdd实现单词统计 源码分析

SparkRdd实现单词统计 源码分析

作者:互联网

SparkRdd实现单词统计 源码分析

1 手写单词统计

//设置任务名字  local本地模式
 
val conf=new SparkConf().setAppName("WC").setMaster("local")
 //通向spark集群的入口
 
val sc =new SparkContext(conf)
// sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(args(1))

2 本地Debug调试信息

 

 

3 本地debug调试

 

(1) MapPartitionsRDD[7] at sortBy at SparkWordCount.scala:21 []

 |  ShuffledRDD[6] at sortBy at SparkWordCount.scala:21 []

 +-(1) MapPartitionsRDD[5] at sortBy at SparkWordCount.scala:21 []

    |  ShuffledRDD[4] at reduceByKey at SparkWordCount.scala:21 []

    +-(1) MapPartitionsRDD[3] at map at SparkWordCount.scala:21 []

       |  MapPartitionsRDD[2] at flatMap at SparkWordCount.scala:21 []

       |  MapPartitionsRDD[1] at textFile at SparkWordCount.scala:20 []

       |  file:///c:/tools/test/data/a.txt HadoopRDD[0] at textFile at SparkWordCount.scala:20 []


源码分析

 

源码分析

1.sc.textFile("")产生rdd(HadoopRDD[0],MapPartitionsRDD[1])

//textFile会产生两个rdd

 

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 */

 

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString)
}

 

第一个RDD HadoopRDD 主要将路径和数据,广播变量,文件输入类型(InputFormat)

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
 *
 *
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
 * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
 * operation will create many references to the same object.
 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
 * copy them using a
`map` function.
 */
def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()
  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
 
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

第二个RDD 调用map方法 把values取出

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 

 

2 .flatMap(_.split(" "))产生rdd(MapPartitionsRDD[2])

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
调用saclamap方法检测一些信息
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

 

3 .map((_,1))产生rdd(MapPartitionsRDD[3])

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
数据整理(value1)将数据返回
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 

4 .reduceByKey(_+_)产生rdd(ShuffledRDD[4])

/**
 * Merge the values for each key using an associative reduce function. This will also perform
 * the merging locally on each mapper before sending results to a reducer, similarly to a
 * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
 * parallelism level.
 */
 
new ShuffledRDD 聚合计算(1 局部聚合 2 上游取出 整体聚合)
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  reduceByKey(defaultPartitioner(self), func)
}

 

/**
 * Merge the values for each key using an associative reduce function. This will also perform
 * the merging locally on each mapper before sending results to a reducer, similarly to a
 * "combiner" in MapReduce.
 */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

 

 

/**
 * :: Experimental ::
 * Generic function to combine the elements for each key using a custom set of aggregation
 * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
 * Note that V and C can be different -- for example, one might group an RDD of type
 * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
 *
 *  -
`createCombiner`, which turns a V into a C (e.g., creates a one-element list)
 *  -
`mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
 *  -
`mergeCombiners`, to combine two C's into a single one.
 *
 * In addition, users can control the partitioning of the output RDD, and whether to perform
 * map-side aggregation (if a mapper can produce multiple items with the same key).
 */
@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
 
if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }

 

5 .sortBy(_._2,false)  产生三个rdd(MapPartitionsRDD[5] ,ShuffledRDD[6],MapPartitionsRDD[7])

/**
 * Return this RDD sorted by the given key function.
 */

def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

 

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 

 

/**
 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
 *
`collect` or `save` on the resulting RDD will return or output an ordered list of records
 * (in the
`save` case, they will be written to multiple `part-X` files in the filesystem, in
 * order of the keys).
 */
// TODO: this currently doesn't work on P other than Tuple2!
new ShuffledRDD
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

 

MapPartitionsRDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 

标签:map,self,iter,单词,RDD,源码,SparkRdd,new,MapPartitionsRDD
来源: https://www.cnblogs.com/suzy0611/p/13930901.html