Spark(RDD)
作者:互联网
RDD
1.所谓的RDD,其实就是一个数据结构,类似于链表中的Node
2.RDD中有适合并行计算的分区操作
3.RDD中封装了最小的计算单元,目的是更适合重复使用
4.Spark的计算主要就是通过组合RDD的操作,完成业务需求
1.从集合(内存)中创建RDD
从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
conf.set("spark.default.parallelism", "4")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRdd(List(1,2,3,4),2)
val rdd: RDD[Int] = sc.textFile("input",2)
sc.stop()
2.RDD转换算子
RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型
value类型
1)map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
)
2)mapPartitions
将待处理的数据以分区为单位发送到计算节点进行处理,map 是一个数据一个数据的处理,mapPartitions是一个分区为处理单位
// TODO 算子 - 转换 -
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 获取每个数据分区的最大值
// 【1,2】【3,4】
val rdd1 = rdd.mapPartitions(
list => {
val max = list.max
List(max).iterator
}
)
rdd1.collect.foreach(println)
3)mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
val rdd2: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex {
case (index, datas) => {
datas.map((_,index))
}
}
4)flatMap
将处理的数据进行扁平化后再进行映射处理,
val rdd = sc.makeRDD(
List(
"Hello Scala", "Hello Spark"
)
)
val rdd1 = sc.makeRDD(
List(
List(1,2), List(3,4)
)
)
// 整体 => 个体
//val rdd2 = rdd.flatMap(_.split(" "))
val rdd2 = rdd.flatMap(
str => { // 整体(1)
// 容器(个体(N))
str.split(" ")
}
)
val rdd3 = rdd1.flatMap(
list => {
list
}
)
val rdd = sc.makeRDD(
List(List(1,2),3,List(4,5))
)
val rdd1 = rdd.flatMap {
case list : List[_] => list
case other => List(other)
}
rdd1.collect.foreach(println)
5)glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
相当于聚合,将相同分区的数据放到一个数组中
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)
val rdd1: RDD[Array[Int]] = rdd.glom()
rdd1.collect().foreach(a => println(a.mkString(",")))
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
val rdd2: RDD[Array[Int]] = rdd.glom()
val rdd3: RDD[Int] = rdd2.map(_.max)
println(rdd3.collect().sum)
6)groupBy 可以实现wordcount
默认情况下,数据处理后,所在分区不会发生改变
Spark要求,一个组的数据必须在一个分区中
一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称之为shuffle
shuffle操作不允许在内存中等待,必须落盘
从服务器日志数据apache.log中获取每个时间段访问量
val rdd: RDD[String] = sc.textFile("data/apache.log")
//(10,1),(11,1),(10,1)
val rdd2: RDD[(String, Int)] = rdd.map(line => {
val strings: Array[String] = line.split(" ")
val str: String = strings(3)
val strings1: Array[String] = str.split(":")
(strings1(1), 1)
})
val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(_._1)
//(10,list((10,1),(10,1)))
val rdd4: RDD[(String, Int)] = rdd3.mapValues(_.size)
rdd4.collect().foreach(println)
7)filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
// filter算子可以按照指定的规则对每一条数据进行筛选过滤
// 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
val rdd1 = rdd.filter(
num => num % 2 == 1 //等于1的留下 其他的舍弃
)
rdd1.collect.foreach(println)
8)sample
// 抽取数据,采样数据
// 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
// 第二个参数和第一个参数有关系
// 如果抽取不放回的场合:参数表示每条数据被抽取的几率
// 如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
// 第三个参数是【随机数】种子
// 随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
// 3 = xxxxx(10)
// 7 = xxxxx(3)
//val rdd1: RDD[Int] = rdd.sample(false, 0.5)
//val rdd1: RDD[Int] = rdd.sample(true, 2)
val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
rdd1.collect.foreach(println)
9) coalesce
// TODO 算子 - 转换 - 缩减分区
val rdd : RDD[Int] = sc.makeRDD(
List(1,2,3,4,5,6), 3
)
// 缩减 (合并), 默认情况下,缩减分区不会shuffle
//val rdd1: RDD[Int] = rdd.coalesce(2)
// 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
val rdd2: RDD[Int] = rdd.coalesce(2, true) //true代表数据的shuffle操作 将数据打乱和其他分区的数据组合在一起
rdd.saveAsTextFile("output")
rdd2.saveAsTextFile("output1")
10) distinct
底层有shuffle
// TODO 算子 - 转换
val rdd : RDD[Int] = sc.makeRDD(
List(1,1,1)
)
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// 【1,1,1】
// 【(1, null),(1, null),(1, null)】
// 【null, null, null】
// 【null, null】
// 【(1, null)】
// 【1】
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect.foreach(println)
//List(1,1,1,1,1).distinct
11) repartition
标签:rdd1,val,rdd,Int,List,RDD,Spark 来源: https://www.cnblogs.com/xiao-bu/p/14843007.html