SparkCore-常用转换算子总结
作者:互联网
主要是分为三个类型:Value 类型、双 Value 类型和 Key-Value 类型。
1.Value类型
1.1map
传递一个对象,返回一个对象
源码中给的解释机翻如下:
通过对这个RDD的所有元素应用一个函数,返回一个新的RDD。
说人话就是:
将处理的数据逐条进行映射转换,可以是类型的转换,也可以是值的转换。
值的转换,即里面每个数据*2
val mapRDD: RDD[Int] = rdd.map(
_ * 2
)
类型转换,转为k-v结构
val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1)
)
1.1.1mapPartitions
传递一个迭代器,返回一个迭代器
源码中给的解释机翻如下:
通过对这个RDD的每个分区应用一个函数,返回一个新的RDD。
说人话就是:
将待处理的数据以分区为单位发送到计算节点进行处理,可以是值的转换,可以是类型的转换,也可以过滤数据。
一次是处理一个分区的数据,所以说我们发现下面的代码中println只执行两次。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD: RDD[Int] = rdd.mapPartitions(
iter => {
//一次把一个分区的数据拿过来,所以只执行两次
println(">>>>>>>>>>>>>>")
iter.map(_ * 2)
}
)
1.1.2map与mapPartitions的区别
数据处理上:map是分区内数据逐个执行,类似于串行操作;而mapPartitions是以分区为单位进行批处理操作。
功能上:map对于源数据进行值改变或者类型转换,但不能减少或者增加数据;mapPartitions传递一个迭代器返回一个迭代器,可以对里面的元素进行过滤,即减少和增加数据。
性能上:map类似于串行操作,所以性能较低;mapPartitions算子类似于批处理,性能较高,但是mapPartitions会长时间占用内存。
1.1.3mapPartitionsWithIndex
源码中给的解释机翻如下:
通过对这个RDD的每个分区应用一个函数,返回一个新的RDD,同时跟踪原始分区的索引。
说人话就是:
其实就在mapPartitions的基础上多了一个可以返回分区的索引的功能,不多赘述。
1.2flatMap
传递一个对象,返回一个“序列”
源码中给的解释机翻如下:
返回一个新的RDD,首先对这个RDD的所有元素应用一个函数,然后将结果扁平化。
说人话就是:
在map的基础上做了扁平化处理。
下面这张网上找的图可以很形象的说明flatMap:
就是先map再flatten。
1.3glom
源码中的解释机翻如下:
返回一个RDD,将每个分区中的所有元素合并到一个数组中。
说人话就是:
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不会改变。
val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD: RDD[Array[Any]] = rdd.glom()
glomRDD.collect().foreach(data => {
println(data.mkString(","))
})
有两个分区,那么就被转换为数组1,2和数组3,4。
1.4groupBy
源码中的解释机翻如下:
返回一个分组项目的RDD。每个组由一个键和一个映射到该键的元素序列组成。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。
说人话就是:
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重组(shuffle)。
按奇偶分区
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
结果:
很明显这里是传入列表中的元素,返回由运算结果为key,映射到该key的元素组成的序列为value的结果。
1.5filter
传递的是一个对象,返回的是一个布尔类型。
源码中的解释机翻如下:
返回一个新的RDD,其中只包含满足谓词的元素。
说人话就是:
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。
进行筛选后,分区不变,但各个分区的数据可能不均衡,就会出现数据倾斜。
筛选掉所有偶数,保留奇数。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val filterRDD: RDD[Int] = rdd.filter(_ % 2 != 0)
1.6sample
源码中的解释机翻如下:
返回这个RDD的抽样子集。
说人话就是:
根据指定的规则从数据集中抽取数据。
我个人觉得这个了解即可,没必要特意去记,这里就举例分析这个转换算子了。
1.7distinct
源码中的解释机翻如下:
返回一个新的RDD,其中包含这个RDD中不同的元素
说人话就是:
去重,比如对下面的列表进行去重
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val rdd1: RDD[Int] = rdd.distinct()
1.8coalesce
源码中的解释机翻如下:
返回一个新的RDD,它被简化为' numPartitions '分区。
说人话就是:
根据数据量缩小分区。比如当spark程序存在过多的小任务时,可以通过coalesce方法收缩合并分区,减少分区的个数,减少任务调度成本。
有可能缩小分区的时候导致数据分配不均匀,可以通过启用shuffle使数据均匀。
1.9repartition
源码中的解释机翻如下:
返回一个具有numPartitions分区的新RDD。
说人话就是:
可以扩大也可以缩小分区,但是其实其底层就是:
也就是说上面提到的 coalesce方法如果启用shuffle是可以扩大分区的。
1.10sortBy
源码中的解释机翻如下:
返回按给定键函数排序的RDD。
说人话就是:
用于排序数据,使用比较简单。其中ascending默认为true表示升序排列,指定为false可以改为降序。
2.双value类型
2.1intersection
求两个集合的交集
2.2union
求两个集合的并集
2.3subtract
求差集,也就是说以一个RDD的元素为主,去除两个RDD中重复的元素,其他保留下来。
比如(1,2,3,4)和(3,4,5,6),最后只保留1和2。
2.4zip
俗称拉链,将两个RDD中的元素,以键值对的形式进行合并。其中key和value分别为第一个、第二个RDD中的相同位置的元素。
3.key-value类型
3.1partitionBy
源码中的解释机翻如下:
返回使用指定分区程序分区的RDD的副本
说人话:
将数据按照指定的分区器Partitioner重新进行分区。默认的分区器是HashPartitioner。
3.2reduceByKey
源码中的解释机翻如下:
使用关联和交换reduce函数合并每个键的值。这也会在将结果发送给reducer之前,在每个mapper上执行本地合并,类似于MapReduce中的“combiner”。输出将使用现有分区并行级别进行散列分区。
说人话:
将数据按照相同的k对v进行聚合,至于翻译后半段在groupByKey后面再解释。典型应用就是wordCount。
3.3groupByKey
源码中的解释机翻如下:
将RDD中每个键的值分组到单个序列中。使用现有的分区并行级别对产生的RDD进行散列分区。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。
说人话:
将数据根据k对v进行分组。我们发现和reduceByKey差不多,不过groupByKey只包含分组的功能,而reduceByKey包含分组和聚合的功能。
这里提到reduceByKey说到的机翻:reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合处理,即combine,这样可以减少落盘的数据量,所以性能较高,而groupByKey只是进行分组,不存在数据量减少的问题。
3.4aggregateByKey
源码中的解释机翻如下:
使用给定的组合函数和中性的“零值”聚合每个键的值。这个函数可以返回一个不同的结果类型U,而不是这个RDD中值的类型V。因此,我们需要一个操作将V合并为U,一个操作将两个U合并,如scala.TraversableOnce。前一个操作用于合并分区内的值,后一个操作用于合并分区之间的值。为了避免内存分配,这两个函数都允许修改并返回它们的第一个参数,而不是创建一个新的U。
说人话:
将数据根据不同的规则进行分区内计算和分区间计算
比如下面的先是分区内求最大,然后分区间求和。
//第一个参数需要传递一个参数表示初始值,主要用于当碰见第一个key的时候和value进行分区计算
//第二个参数中的第一个参数是分区内计算规则(选最大),第二个参数是分区间计算规则(相加聚合)
val abkRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
3.5foldByKey
源码中的解释机翻如下:
使用一个关联函数和一个中立的“零值”来合并每个键的值,它可以被添加到结果中任意次数,并且不能改变结果(例如,Nil用于列表拼接,0用于加法,或1用于乘法)。
说人话:
相当于分区内分区间计算规则相同版的aggregateByKey。
3.6combineByKey
源码中的解释机翻如下:
简化版的combineByKeyWithClassTag,哈希分区的结果RDD使用现有的分区并行级别。这里的方法是为了向后兼容。它不向洗牌提供组合器类信息。
说人话:
是对k-v类型的rdd进行聚合操作的聚集函数,其实和aggregateByKey挺像,但是它允许用户返回值的类型与输入的不同。
如下是求相同key的v的总和与数量,当然得到这个总和与数量,后面就可以求某个key对应值的平均值了。
val abkRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
//t和默认值类型一样,v是指k相同的待聚合的value
(t: (Int, Int), v) => {
(t._1 + v, t._2 + 1)
},
//t1和t2都和默认值类型一样
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
3.6.1reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
第一个数据 | 分区内与分区间计算规则 | ||
reduceByKey | 不进行任何计算 | 相同 | |
foldByKey | 与初始值进行分区内计算 | 相同 | |
aggregateByKey | 与初始值进行分区内计算 | 不同 | |
combineByKey | 当数据结构不满足要求时,可以让第一个数据转换结构 | 不同 |
3.7sortByKey
源码中的解释机翻如下:
按键对RDD排序,这样每个分区都包含一个排序过的元素范围。在生成的RDD上调用' collect '或' save '将返回或输出一个有序的记录列表(在' save '的情况下,它们将按照键的顺序写入文件系统中的多个' part-X '文件)。
说人话:
根据key进行排序。这个函数比较简单
3.8join
源码中的解释机翻如下:
返回一个RDD,包含所有在' this '和' other '中具有匹配键的元素对。每对元素将以a (k, (v1, v2))元组的形式返回,其中(k, v1)在' this '中,(k, v2)在' other '中。执行跨集群的散列连接。
说人话:
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD。且key的类型必须相同
3.9leftOuterJoin
有左当然也有右,这个比较基础,和sql中的左外连接以及右外连接比较像。
3.10cogroup
源码中的解释机翻如下:
对于' this '或' other '中的每个键k,返回一个结果RDD,该RDD包含一个元组,其中包含' this '和' other '中该键的值列表。
说人话:
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD 。
如下对rdd1和rdd2使用
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
("a", 1), ("b", 2) //, ("c", 3)
))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6), ("c", 7)
))
//cogroup : connect + group
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
得到结果如下:
标签:总结,Int,分区,SparkCore,说人话,RDD,源码,算子,机翻 来源: https://blog.csdn.net/emttxdy/article/details/121512772