sparkRDD所有算子操作,建议全部手敲一遍
作者:互联网
说明:
1、以下方法全部来自这个RDD.scala,可以自己看源码
2、使用$SPARK_HOME/bin/spark-shell运行代码
3、注释部分是运行结果
//org.apache.spark.rdd
//RDD.scala
// Transformations (return a new RDD)
1.1 map
Return a new RDD by applying a function to all elements of this RDD.
def map[U: ClassTag](f: T => U): RDD[U]
val a = sc.parallelize(1 to 9, 2)
a.collect
//res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
val b = a.map(x => x*2)
b.collect
//res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
1.2 flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
val a = sc.parallelize(1 to 9, 2)
val d = a.flatMap(15 to _*2)
d.collect
//res2: Array[Int] = Array(15, 16, 15, 16, 17, 18)
1.3 filter
Return a new RDD containing only the elements that satisfy a predicate.
def filter(f: T => Boolean): RDD[T]
val a = sc.parallelize(1 to 9, 2)
a.filter(_ > 5).collect
//res4: Array[Int] = Array(6, 7, 8, 9)
1.4 distinct
Return a new RDD containing the distinct elements in this RDD.
def distinct(): RDD[T]
val f = sc.makeRDD(Array(1,2,3,1,2,3))
f.distinct.collect
//res9: Array[Int] = Array(2, 1, 3)
1.5 repartition
Return a new RDD that has exactly numPartitions partitions.
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
val a = sc.parallelize(1 to 9, 2)
a.glom.collect
//res10: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8, 9))
a.repartition(4)
val b = a.repartition(3)
b.glom.collect
//res19: Array[Array[Int]] = Array(Array(3, 6, 9), Array(1, 4, 7), Array(2, 5, 8))
1.6 coalesce
Return a new RDD that is reduced into numPartitions partitions.
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
val a = sc.parallelize(1 to 9, 2)
a.glom.collect
//res10: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8, 9))
val c = a.coalesce(3,true)
c.glom.collect
//res22: Array[Array[Int]] = Array(Array(3, 6, 9), Array(1, 4, 7), Array(2, 5, 8))
1.7 sample
Return a sampled subset of this RDD.
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
val a = sc.parallelize(0 to 9, 2)
val b = a.sample(true, 0.1)
b.collect
//res27: Array[Int] = Array(4)
1.8 randomSplit
Randomly splits this RDD with the provided weights.
def randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]]
val i = sc.makeRDD(0 to 9, 3).randomSplit(Array(0.3, 0.2, 0.5))
scala> i(0).collect
//res15: Array[Int] = Array(2, 8)
scala> i(1).collect
//res16: Array[Int] = Array(0, 5, 7, 9)
scala> i(2).collect
//res17: Array[Int] = Array(1, 3, 4, 6)
1.9 takeSample
Return a fixed-size sampled subset of this RDD in an array
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]
//放回取数
sc.makeRDD(0 to 9, 3).takeSample(true,3)
//res20: Array[Int] = Array(7, 7, 6)
//不放回取数
sc.makeRDD(0 to 9, 3).takeSample(false,9)
//res23: Array[Int] = Array(6, 2, 1, 9, 3, 0, 8, 4, 5)
1.10 union
Return the union of this RDD and another one. Any identical elements will appear multiple times(use .distinct() to eliminate them).
def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T] = withScope {
this.union(other)
}
val r1 = sc.makeRDD(1 to 4)
val r2 = sc.makeRDD(3 to 6)
r1.union(r2).collect
//res24: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)
1.11 sortBy
Return this RDD sorted by the given key function.
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
rdd1.sortBy(_._2, false).collect
//res26: Array[(String, Int)] = Array((jerry,3), (kitty,2), (tom,1))
rdd1.sortBy(x => x._2%2, false).collect
//res30: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2))
1.12 intersection
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
Note:This method performs a shuffle internally.
def intersection(
other: RDD[T],
partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("shuke", 2), ("kitty", 2)))
rdd1.intersection(rdd2).collect
//res32: Array[(String, Int)] = Array((kitty,2))
1.13 glom
Return an RDD created by coalescing all elements within each partition into an array.
def glom(): RDD[Array[T]]
sc.makeRDD(0 to 9, 3).glom.collect
//res34: Array[Array[Int]] = Array(Array(0, 1, 2), Array(3, 4, 5), Array(6, 7, 8, 9))
1.14 cartesian
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val r1 = sc.makeRDD(1 to 3, 2)
val r2 = sc.makeRDD(4 to 6, 2)
r1.cartesian(r2).collect
//res1: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (3,4), (2,5), (2,6), (3,5), (3,6))
1.15 groupBy
Return an RDD of grouped items.
Note: This operation may be very expensive.using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])]
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("tom", 2), ("kitty", 3)))
rdd1.groupBy(_._2).collect
//res5: Array[(Int, Iterable[(String, Int)])] = Array((2,CompactBuffer((kitty,2), (tom,2))), (1,CompactBuffer((tom,1))), (3,CompactBuffer((jerry,3), (kitty,3))))
rdd1.groupBy(_._1).collect
//res6: Array[(String, Iterable[(String, Int)])] = Array((tom,CompactBuffer((tom,1), (tom,2))), (jerry,CompactBuffer((jerry,3))), (kitty,CompactBuffer((kitty,2), (kitty,3))))
rdd1.groupByKey.collect
//res7: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 2)), (jerry,CompactBuffer(3)), (kitty,CompactBuffer(2, 3)))
1.16 pipe
//这个函数调用其他脚本,把rdd的每个元素当作标准输入传入,同时接收标准输出作为新rdd的元素
Return an RDD created by piping elements to a forked external process.
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
vi /cube/bin/concat.sh
#!/bin/bash
RESULT="";
while read LINE; do
RESULT=${RESULT}" "${LINE}
done
echo ${RESULT}
val rdd = sc.makeRDD( List("hi", "how", "are", "you", "fine", "thank", "you", "and", "you"), 2)
val pipeRDD = rdd.pipe("/cube/bin/concat.sh")
pipeRDD.collect
res22: Array[String] = Array(hi how are you, fine thank you and you)
1.17 mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
val rdd1 = sc.makeRDD(0 to 9, 3)
rdd1.mapPartitions(_.toList.reverse.iterator).collect
//res0: Array[Int] = Array(4, 3, 2, 1, 0, 9, 8, 7, 6, 5)
rdd1.mapPartitions(_.toList.sortWith(_.compareTo(_) > 0).iterator).collect
//res4: Array[Int] = Array(2, 1, 0, 5, 4, 3, 9, 8, 7, 6)
1.18 mapPartitionsWithIndex
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
val rdd1 = sc.makeRDD(0 to 9, 3)
rdd1.mapPartitionsWithIndex((i,x) => x.map(_+i*1000).toList.reverse.iterator).collect
//res7: Array[Int] = Array(2, 1, 0, 1005, 1004, 1003, 2009, 2008, 2007, 2006)
1.19 zip
Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc.
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
//分区和每个分区的元素个数必须一致
val rdd = sc.makeRDD( List("hi", "how", "are", "you", "fine", "thank", "you", "and", "you"), 3)
val rdd1 = sc.makeRDD(1 to 9, 3)
rdd.zip(rdd1).collect
//res10: Array[(String, Int)] = Array((hi,1), (how,2), (are,3), (you,4), (fine,5), (thank,6), (you,7), (and,8), (you,9))
1.20 zipPartitions
Zip this RDD`s partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions.
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
val rdd1 = sc.makeRDD(1 to 9, 3)
val rdd2 = sc.makeRDD( List("hi", "how", "are", "you", "fine", "thank", "you", "and", "you"), 3)
rdd1.zipPartitions(rdd2){
(rdd1Iter,rdd2Iter) => {
var result = List[String]()
while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
}
result.iterator
}
}.collect
//res22: Array[String] = Array(3_are, 2_how, 1_hi, 6_thank, 5_fine, 4_you, 9_you, 8_and, 7_you)
1.21 zipWithIndex
Zips this RDD with its element indices.
def zipWithIndex(): RDD[(T, Long)]
val rdd = sc.makeRDD( List("hi", "how", "are", "you", "fine", "thank", "you", "and", "you"), 3)
rdd.zipWithIndex.collect
//res15: Array[(String, Long)] = Array((hi,0), (how,1), (are,2), (you,3), (fine,4), (thank,5), (you,6), (and,7), (you,8))
1.22 zipWithUniqueId
Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 2*n+k, ...,
def zipWithUniqueId(): RDD[(T, Long)]
val rdd = sc.makeRDD( List("hi", "how", "are", "you", "fine", "thank", "you", "and", "you"), 3)
rdd.zipWithUniqueId.collect
//res16: Array[(String, Long)] = Array((hi,0), (how,3), (are,6), (you,1), (fine,4), (thank,7), (you,2), (and,5), (you,8))
标签:val,collect,Int,sparkRDD,一遍,RDD,算子,Array,def 来源: https://www.cnblogs.com/zhuisuidefeng/p/16354215.html