其他分享
首页 > 其他分享> > RDD操作

RDD操作

作者:互联网

RDD被创建好以后,在后续使用过程中一般会发生两种操作:

转换操作:

进行物理的转换操作

val rdd =sc.parallelize(List(1,2,3,4,5,6)) 
val filterRdd = rdd.filter(_> 5)
filterRdd.collect()
//返回所有大于5的数据的一个Array, Array(6,8,10,12)
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 10)
//Array[Int] = Array(11, 12, 13, 14)
val a=Array(1,2,3,4,5)
val b=a.flatMap(1 to _)
//b: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

val b=a.flatMap(_ to 5)
//b: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5, 4, 5, 5)

分组函数。

val a=sc.parallelize(Array("a"->1,"a"->2,"b"->3))
a.groupByKey.collect
//res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))

按照key分组然后聚集,类似于SQL中的groupby之后再使用聚集函数。
当一个 (K, V) 类型的数据集调用此函数, 返回一个同样是(K, V) 类型的数据集。

scala> a.groupByKey.collect
res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))

scala> val a=sc.parallelize(Array("a"->1,"a"->2,"a"->3,"b"->4,"b"->5))
a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> a.reduceByKey((x,y)=>x+y).collect
res4: Array[(String, Int)] = Array((a,6), (b,9))

行动操作:

scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.count()
res6: Long = 10

scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.first
res8: Int = 1
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24


scala> a.take(6)
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> a.take(11)
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala>  a.take(100)
res12: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.reduce((x,y)=>x+y)
res13: Int = 55

scala> a.reduce((x,y)=>x*y)
res14: Int = 3628800

scala> a.reduce((x,y)=>x-y)
res15: Int = -53

scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.foreach(println)
1
2
3
4
5
6
7
8
9
10

scala> val b=sc.parallelize(2 to 5)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.union(b).collect
res19: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5)

scala> a.intersection(b).collect
res22: Array[Int] = Array(4, 3, 5, 2)
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)

scala> c.distinct
res24: Array[Int] = Array(1, 2, 3, 4, 5, 6)

  • ascending: 可选,是否升序排序
    numTasks: 可选,并发任务数量
    对于 (K, V) 的数据集进行操作,返回同样是(K, V)类型的数据集,其中K实现了Orderedtrait,也就是可以排序。
val p = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
p.sortByKey().collect

res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))

//数组排序
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)

scala> c.sorted
res28: Array[Int] = Array(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 6)

标签:10,val,parallelize,scala,Int,RDD,操作,Array
来源: https://blog.csdn.net/weixin_51309151/article/details/122157035