Spark ~ RDD总结
作者:互联网
Spark ~ RDD总结
- TRANSFORMATION 型 RDD
- ACTION 型 RDD
TRANSFORMATION 型 RDD
VALUE 类型–1
map(func)
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
hadoop fs -cat /tmp/lxw1234/1.txt
hello world
hello spark
hello hive
//读取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子
scala> var mapresult = data.map(line => line.split("\\s+"))
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//运算map算子结果
scala> mapresult.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
flatMap(func)
跟map(func)类似,但是每个输入项和成为0个或多个输出项(所以func函数应该返回的是一个序列化的数据而不是单个数据项)
在使用时map会将一个长度为N的RDD转换为另一个长度为N的RDD;而flatMap会将一个长度为N的RDD转换成一个N个元素的集合,然后再把这N个元素合成到一个单个RDD的结果集。
比如一个包含三行内容的数据文件“README.md”。
a b c
d
经过以下转换过程
val textFile = sc.textFile("README.md")
textFile.flatMap(_.split(" "))
其实就是经历了以下转换
["a b c", "", "d"] => [["a","b","c"],[],["d"]] => ["a","b","c","d"]
在这个示例中,flatMap就把包含多行数据的RDD,即[“a b c”, “”, “d”] ,转换为了一个包含多个单词的集合。实际上,flatMap相对于map多了的是[[“a”,”b”,”c”],[],[“d”]] => [“a”,”b”,”c”,”d”]这一步。
map(func) 与 flatMap(func) 区别
map(func)函数会对每一条输入进行指定的func操作,然后为每一条输入返回一个对象;而flatMap(func)也会对每一条输入进行执行的func操作,然后每一条输入返回一个相对,但是最后会将所有的对象再合成为一个对象;从返回的结果的数量上来讲,map返回的数据对象的个数和原来的输入数据是相同的,而flatMap返回的个数则是不同的。请参考下图进行理解:
通过上图可以看出,flatMap其实比map多的就是flatten操作。
示例验证
接下来,我们用一个例子来进行比较,首先在HDFS里写入了这样内容的一个文件:
C:\WINDOWS\system32>hadoop fs -cat hdfs://localhost:9000/user/input/wordcount.txt
word in text
hello spark
the third line
C:\WINDOWS\system32>
然后再spark里进行测试,如下
scala> var textFile =sc.textFile("hdfs://localhost:9000/user/input/wordcount.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/input/wordcount.txt MapPartitionsRDD[1] at textFile at <console>:27
map的结果
scala> var mapResult = textFile.map(line => line.split("\\s+"))
mapResult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:29
scala> mapResult.collect
res0: Array[Array[String]] = Array(Array(word, in, text), Array(hello, spark), Array(the, third, line))
flatMap的结果
scala> var flatMapResult = textFile.flatMap(line => line.split("\\s+"))
flatMapResult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:29
scala> flatMapResult.collect
res1: Array[String] = Array(word, in, text, hello, spark, the, third, line)
总结:
-
Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
-
而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
操作2:最后将所有对象合并为一个对象
mapPartitionsWithIndex(func)
mapPartitionsWithIndex既可以拿到分区的迭代器,又可以拿到分区编号。
我们如果要使用这个方法的话,还需要传入一个函数,两个参数,参数为一个int值、一个迭代器。然后这个方法就会将分区编号传给int值,将这个分区编号中的值传入迭代器,我们对这些数据操作之后同样应该返回一个迭代器。
scala> var cai = sc.parallelize(1 to 9,3)
cai: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def func1(index:Int,iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partID:"+index+",value:"+x+"]").iterator
| }
func1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> cai.mapPartitionsWithIndex(func1).collect.foreach(println)
[partID:0,value:1]
[partID:0,value:2]
[partID:0,value:3]
[partID:1,value:4]
[partID:1,value:5]
[partID:1,value:6]
[partID:2,value:7]
[partID:2,value:8]
[partID:2,value:9]
mapPartitions()
var conf = new SparkConf().setAppName("cai").setMaster("local[*]")
var sc = new SparkContext(conf)
var rdd = sc.parallelize(1 to 9 , 3)
def func_mappartitionws(iterms:Iterator[Int]):Iterator[(Int,Int,Int)] = {
var re = List[(Int,Int,Int)]()
while(iterms.hasNext){
var res = iterms.next()
re .::= (res,res+2,res+3)
}
re.iterator
}
var luo = rdd.mapPartitions(func_mappartitionws)
luo.foreach(println)
map() 和 mapPartition() 的区别
-
map():每次处理一条数据。
-
mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
-
开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
glom()
- glom的作用是将同一个分区里的元素合并到一个array里
- glom属于Transformation算子:这种变换并不触发提交作业,完成作业中间过程处理。 Transformation
- 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
collect
glom.collect RDD中的元素已经变成了分片映射的列表
groupBy(func)
groupBy算子接收一个函数,这个函数返回的值作为key,然后通过这个key来对里面的元素进行分组。
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
//返回的even或者odd字符串作为key来group RDD里面的值,
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
a % 2
}
a.groupBy(myfunc).collect //同样的,返回的是0的时候,表示的是偶数值,返回的是1的时候表示的是奇数。
res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
var conf = new SparkConf().setAppName("cai").setMaster("local[*]")
var sc = new SparkContext(conf)
var rdd = sc.parallelize(List("才支援","a;sldk","a;sldkj","罗荣银"))
var re = rdd.keyBy(_.length)
var luo = re.groupByKey()
luo.foreach(println)
filter(func)
此算子接受一个函数,通过函数筛选出需要的数据元素,返回true表示保留,返回false表示抛弃
scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:21
scala> val b = a.filter(_ % 2 == 0)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at filter at <console>:23
scala> b.collect
res13: Array[Int] = Array(2, 4, 6, 8, 10)
sample(withReplacement, fraction, seed)
ample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long)
sample算子时用来抽样用的,其有3个参数
- withReplacement:是否放回抽样。true-有放回,false-无放回
- fraction:期望样本的大小作为RDD大小的一部分
- 当withReplacement=false时,选择每个元素的概率,分数一定是[0,1]
- 当withReplacement=true时,选择每个元素的期望次数,分数必须大于等于0 - seed:随机数生成器的种子。一般默认
无放回抽样,每个元素被抽到的概率为0.5:fraction=0.5
val rdd=sc.parallelize(List(2,3,7,4,8))
val sampleRdd=rdd.sample(false,0.5)
sampleRdd.foreach(println)
有放回抽样,每个元素被抽取到的期望次数是2:fraction=2
//简单1--(有/无放回抽样,抽样比例,随机数种子)
val rdd=sc.parallelize(List(2,3,7,4,8))
val sampleRdd=rdd.sample(true,2)
sampleRdd.foreach(println)
distinct([numTasks]))
VALUE 类型–2
coalasce 和 repartition 的区别
coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
以下面的例子来看:
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[53] at textFile at :21
scala> data.collect
res37: Array[String] = Array(hello world, hello spark, hello hive, hi spark)
scala> data.partitions.size
res38: Int = 2 //RDD data默认有两个分区
scala> var rdd1 = data.coalesce(1)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at :23
scala> rdd1.partitions.size
res1: Int = 1 //rdd1的分区数为1
scala> var rdd1 = data.coalesce(4)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at :23
scala> rdd1.partitions.size
res2: Int = 2 //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
scala> var rdd1 = data.coalesce(4,true)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at :23
scala> rdd1.partitions.size
res3: Int = 4
repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该函数其实就是coalesce函数第二个参数为true的实现
scala> var rdd2 = data.repartition(1)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :23
scala> rdd2.partitions.size
res4: Int = 1
scala> var rdd2 = data.repartition(4)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at repartition at :23
scala> rdd2.partitions.size
res5: Int = 4
sortBy(func,[ascending], [numTasks])
该函数最多可以传三个参数:
- 第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
- 第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
- 第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:
scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
scala> rdd.sortBy(x => x, false).collect
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
scala> result.partitions.size
res9: Int = 2
scala> val result = rdd.sortBy(x => x, false, 1)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
scala> result.partitions.size
res10: Int = 1
pipe(command, [envVars])
这个算子比较重要,单独写一篇文章来记录
双 Value 类型交互
union(otherDataset)
var rdd = sc.parallelize(List("才支援","a;sldk","a;sldkj","罗荣银"))
var rdd1 = sc.parallelize(List("我爱你中国么么哒","中国共产党万岁"))
var cai = rdd.union(rdd1)
cai.foreach(println)
subtract (otherDataset)
取出与 rdd 中相同数据以后的 rdd1 的数据,取差集
var rdd = sc.parallelize(List("才支援","a;sldk","a;sldkj","罗荣银"))
var rdd1 = sc.parallelize(List("我爱你中国么么哒","中国共产党万岁","罗荣银"))
var cai = rdd1.subtract(rdd)
cai.foreach(println)
intersection(otherDataset)
取交集(把相同的数据取出来)
var rdd = sc.parallelize(List("才支援","a;sldk","a;sldkj","罗荣银"))
var rdd1 = sc.parallelize(List("我爱你中国么么哒","中国共产党万岁","罗荣银"))
var cai = rdd.intersection(rdd1)
cai.foreach(println)
cartesian(otherDataset)
该函数是用来求笛卡尔积的
var rdd = sc.parallelize(List("才支援","a;sldk","a;sldkj","罗荣银"))
var rdd1 = sc.parallelize(List("我爱你中国么么哒","中国共产党万岁","罗荣银"))
var cai = rdd.cartesian(rdd1)
cai.foreach(println)
zip(otherDataset)
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同:,否则会抛出异常。
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21
scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21
scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))
scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常
zipPartitions
别人的表述:请点击 http://lxw1234.com/archives/2015/07/350.htm
Key-Value 类型–1
keyBy
为各个元素,按指定的函数生成key,形成key-value的RDD。
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[123] at parallelize at <console>:21
scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[124] at keyBy at <console>:23
scala> b.collect
res80: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
partitionBy
repartition和partitionBy都是重新分区的算子,其中partitionBy只能作用于PairRDD. 但是,当作用于PairRDD时,repartition和partitionBy的行为是不同的。repartition是把数据随机打散均匀分布于各个Partition;而partitionBy则在参数中指定了Partitioner(默认HashPartitioner),将每个(K,V)对按照K根据Partitioner计算得到对应的Partition. 在合适的时候使用partitionBy可以减少shuffle次数,提高效率。
var cai = sc.parallelize(List("蔡志远","很爱很爱","罗荣银","LOVE","VERY","MUCH"),3)
var rdd = cai.map((_,1))
println(rdd.partitions.size)
var rdd1 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
println(rdd1.partitions.size)
reduceByKey(func, [numTasks])
该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来运算。
有三种参数形式:
-
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
eg: rdd.reduceByKey((x,y)=>x+y) 或者 rdd.reduceByKey(+) -
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
参数numPartitions用于指定分区数; -
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
参数partitioner用于指定分区函数;
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21
scala> rdd1.partitions.size
res82: Int = 15
scala> var rdd2 = rdd1.reduceByKey((x,y) => x + y)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[94] at reduceByKey at :23
scala> rdd2.collect
res85: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
scala> rdd2.partitions.size
res86: Int = 15
scala> var rdd2 = rdd1.reduceByKey(new org.apache.spark.HashPartitioner(2),(x,y) => x + y)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[95] at reduceByKey at :23
scala> rdd2.collect
res87: Array[(String, Int)] = Array((B,3), (A,2), (C,1))
scala> rdd2.partitions.size
res88: Int = 2
groupByKey()
该函数用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中
- def groupByKey(): RDD[(K, Iterable[V])]
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
参数numPartitions用于指定分区数; - def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
参数partitioner用于指定分区函数;
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[89] at makeRDD at :21
scala> rdd1.groupByKey().collect
res81: Array[(String, Iterable[Int])] = Array((A,CompactBuffer(0, 2)), (B,CompactBuffer(2, 1)), (C,CompactBuffer(1)))
reduceByKeyLocally
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21
scala> rdd1.reduceByKeyLocally((x,y) => x + y)
res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)
reduceByKey 和 groupByKey 的区别
也就是说,reduceBykey在Mapper端对每个分区的key预先进行一次合并,类似于mapreduce当中的combiner归约,之后reducer端再把合并后的数据拉取过来,这样做的好处就是减少了mapper端到reducer端的数据量传输,提高了IO性能,也就提高了效率
(蔡志远,2)
(罗荣银,2)
(很爱很爱,1)
groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError
将 Value 值进行聚合形成一个序列(iterator)
(罗荣银,CompactBuffer(1, 1))
(蔡志远,CompactBuffer(1, 1))
(很爱很爱,CompactBuffer(1))
总结:
通过以上对比可以发现在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还是可以防止使用groupByKey造成的内存溢出问题。
groupBy 和 groupByKey 的区别
groupBy
将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。
val a = sc.parallelize(1 to 9)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res67: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
a.groupBy(x => x % 3).collect
res68: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)),
(1,CompactBuffer(1, 4, 7)))
groupByKey
对Key-Value形式的RDD的操作。与groupBy类似。但是其分组所用的key不是由指定的函数生成的,而是采用元素本身中的key。
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[93] at parallelize at <console>:21
scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[94] at keyBy at <console>:23
scala> b.collect
res69: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (6,spider), (5,eagle))
scala> b.groupByKey.collect
res70: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)),
(3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
这个比较复杂,总结好了在写
foldByKey
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数。
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
直接看例子:
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)
再看:
scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
再看乘法操作:
scala> rdd1.foldByKey(0)(_*_).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
//其他K也一样,最终都得到了V=0
scala> rdd1.foldByKey(1)(_*_).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。
在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。
Key-Value 类型—2
combineByKey[C]
这个算子很复杂,会单独写一篇文章来记录
combineByKey用于将一个PairRDD按key进行聚合操作。
combineByKey的参数共有三个函数。
第一个函数:同一个Partition内第一次碰到一个key的处理函数。用于在遍历RDD的数据集合过程中,对于遍历到的(k,v)。如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用此函数。它的作用是将v转换为C(类型是C,聚合对象的类型,c作为局和对象的初始值)。
第二个函数:同一个Partition内不是第一次碰到一个key的处理函数。在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次…)遇到值为k的Key(类型K),那么将对这个(k,v)调用此函数,它的作用是将v累加到聚合对象(类型C)中。此函数的类型是(C,V)=>C。
第三个函数:不同Partition相同key的聚合函数。因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。
sortByKey
var conf = new SparkConf().setAppName("cai").setMaster("local[1]")
var sc = new SparkContext(conf)
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val sortByKeyRDD = rdd.sortByKey()
sortByKeyRDD.foreach(println)
mapValues
保持key不变,对value操作
var conf = new SparkConf().setAppName("cai").setMaster("local[1]")
var sc = new SparkContext(conf)
var cai = sc.parallelize(List("dog","cat","monkey"))
def func(iters:Iterator[String]):Iterator[(String,Int)]={
var result = List[(String,Int)]()
while (iters.hasNext){
var cai = iters.next()
result .::= (cai,cai.length)
}
result.iterator
}
var luo = cai.mapPartitions(func)
var result = luo.mapValues(x=>x+1)
result.foreach(println)
(monkey,7)
(cat,4)
(dog,4)
flatMapValues
val list = List(("mobin",22),("kpop",20),("lufei",23))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.flatMapValues(x => Seq(x,"male"))
mapValuesRDD.foreach(println)
输出:
(mobin,22)
(mobin,male)
(kpop,20)
(kpop,male)
(lufei,23)
(lufei,male)
如果是mapValues会输出:【对比区别】
(mobin,List(22, male))
(kpop,List(20, male))
(lufei,List(23, male))
与mapValues类似,但可以将一个value展开成多个value。
val a = sc.parallelize(List(("fruit", "apple,banana,pear"), ("animal", "pig,cat,dog,tiger")))
a.flatMapValues(_.split(",")).collect
res23: Array[(String, String)] = Array((fruit,apple), (fruit,banana), (fruit,pear),
(animal,pig), (animal,cat), (animal,dog), (animal,tiger))
join(otherDataset, [numTasks])
根据key相同对键值对类型的rdd的值做一个内连接。返回的值类型也是键值对类型的rdd。只不过一个key,对应于不同rdd的多个value值。
var conf = new SparkConf().setAppName("cai").setMaster("local[*]")
var sc = new SparkContext(conf)
var cai = sc.parallelize(Array(("cai",1),("luo",2),("zhi",3)))
var luo = sc.parallelize(Array(("cai",7),("luo",8),("rong",3)))
var result = cai.join(luo)
result.foreach(println)
println("--------------")
def func(iter:Iterator[(String,(Int,Int))]):Iterator[(String,Int)] = {
var re = List[(String,Int)]()
while(iter.hasNext){
var cai = iter.next()
re .::= (cai._1,cai._2._1)
}
re.iterator
}
def func1(iter:Iterator[(String,(Int,Int))]):Iterator[(String,Int)] = {
var re = List[(String,Int)]()
while(iter.hasNext){
var cai = iter.next()
re .::= (cai._1,cai._2._2)
}
re.iterator
}
var su = result.mapPartitions(func)
var cu = result.mapPartitions(func1)
var rr = su.union(cu)
rr.foreach(println)
结果如下:
(luo,(2,8))
(cai,(1,7))
--------------
(luo,2)
(cai,1)
(cai,7)
(luo,8)
leftOuterJoin
根据两个RDD来进行做外连接,右边没有的值会返回一个None。右边有值的话会返回一个Some
rightOuterJoin
对两个RDD来做一个右外链接。返回的Value类型为option类型。左边有值的话为Some,没有的话为None。
cogroup(otherDataset, [numTasks])
cogroup相当于SQL中的全外关联 full outer join ,返回左右RDD中的记录,关联不上的为空。
参数numPartitions用于指定结果的分区数。
参数partitioner用于指定分区函数。
val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc"));
val scoreArr = Array((1,100),(1,200),(2,80),(2,300),(3,100));
val studentRDD = sc.parallelize(studentArr,1);
val scoreRDD = sc.parallelize(scoreArr,1);
val result = studentRDD.cogroup(scoreRDD);
result.foreach(println)
println("-----------------------------------------")
result.foreach(x=>{
println(x._1);
println(x._2._1.mkString(" "));
println(x._2._2.mkString(","))
})
结果
(1,(CompactBuffer(tele),CompactBuffer(100, 200)))
(3,(CompactBuffer(wyc),CompactBuffer(100)))
(2,(CompactBuffer(yeye),CompactBuffer(80, 300)))
-----------------------------------------
1
tele
100,200
3
wyc
100
2
yeye
80,300
ACTION 型 RDD
reduce(func)
根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc"));
var rdd = sc.parallelize(studentArr)
var cai = rdd.reduce((x,y) => (x._1 + y._1,x._2 + y._2))
println(cai)
输出为:
(6,wycyeyetele)
collect
collect用于将一个RDD转换成数组。
collect返回一个数组,而 glom 有几个分区就返回几个数组
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
count()
def count(): Long
count返回RDD中的元素数量。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd1.count
res15: Long = 3
countByKey()
def countByKey(): Map[K, Long]
countByKey用于统计RDD[K,V]中每个K的数量。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
foreach(func)
foreachPartition
take(n)
def take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
scala> rdd1.take(1)
res0: Array[Int] = Array(10)
scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)
first
def first(): T
first返回RDD中的第一个元素,不排序。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
scala> rdd1.first
res14: (String, String) = (A,1)
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1.first
res8: Int = 10
top
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
scala> rdd1.top(1)
res2: Array[Int] = Array(12)
scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)
//指定排序规则
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
scala> rdd1.top(1)
res4: Array[Int] = Array(2)
scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)
takeOrdered(n, [ordering])
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
takeOrdered和top类似,只不过以和top相反的顺序返回元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
scala> rdd1.top(1)
res4: Array[Int] = Array(2)
scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)
scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(12)
scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(12, 10)
aggregate
这个算子比较复杂,需要单独写一篇文章来记录
fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。lookup
def lookup(key: K): Seq[V]
lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)
scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
标签:总结,rdd1,scala,Int,RDD,var,Spark,Array 来源: https://blog.csdn.net/cai_and_luo/article/details/112979154