大数据技术之SparkCore
作者:互联网
第1章RDD概述
1.1RDD引入之IO流
1.2什么是RDD
1.3RDD特性
A list of partitions
多个分区,分区可以看成是数据集的基本组成单位
对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。
用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的 CPU Core 的数目.
每个分配的存储是由BlockManager 实现的, 每个分区都会被逻辑映射成 BlockManager 的一个 Block,,而这个 Block 会被一个 Task 负责计算。
A function for computing each split
计算每个切片(分区)的函数.
Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
A list of dependencies on other RDDs
与其他 RDD 之间的依赖关系
RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
对存储键值对的 RDD,还有一个可选的分区器
只有对于 key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
存储每个切片优先(preferred location)位置的列表
比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
第2章RDD编程
2.1编程模型
算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。
2.2RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
2.2.1IDEA环境准备
1)创建一个maven工程,工程名称叫SparkCoreTest
2)在pom文件中添加
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>SparkCoreTest</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3)添加scala 框架支持
3)创建一个scala文件夹,并把它修改为Source Root
4)创建包名:com.atguigu.createrdd
2.2.2从集合中创建
1)从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
package com.atguigu.createrdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object createrdd01_array {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.使用parallelize()创建rdd
val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))
rdd.foreach(println)
println("分区数:" + rdd.partitions.size)
//4.使用makeRDD()创建rdd
val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))
rdd1.foreach(println)
println("分区数:" + rdd1.partitions.size)
sc.stop()
}
}
2.2.3从外部存储系统的数据集创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等
1)数据准备
在新建的SparkCoreTest1项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。
2)创建RDD
package com.atguigu.createrdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object createrdd03_file {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.读取文件。如果是集群路径:hdfs://hadoop102:9000/input
val lineWordRdd: RDD[String] = sc.textFile("input")
//4.打印
lineWordRdd.foreach(println)
//5.关闭
sc.stop()
}
}
2.2.4从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD。
详见2.4节
2.2.5创建IDEA快捷键
1)点击File->Settings…->Editor->Live Templates->output->Live Template
2)点击左下角的Define->选择Scala
3)在Abbreviation中输入快捷键名称scc,在Template text中填写,输入快捷键后生成的内容。
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//4.关闭连接
sc.stop()
2.3分区规则
2.3.1默认分区源码(RDD数据从集合中创建)
1)默认分区数源码解读
2)代码验证
object partition01_default {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
rdd.saveAsTextFile("output")
}
}
产生了8个分区
3)思考:数据就4个,分区却产生了8个,严重浪费资源,怎么办?
2.3.2分区源码(RDD数据从集合中创建)
1)分区测试(RDD数据从集合中创建)
object partition02_Array {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
val sc: SparkContext = new SparkContext(conf)
//1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4
//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
//2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4
//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)
//3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)
rdd.saveAsTextFile("output")
sc.stop()
}
}
2)分区源码
2.3.3分区源码(RDD数据从文件中读取后创建)
1)分区测试
object partition03_file {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
val sc: SparkContext = new SparkContext(conf)
//1)默认分区的数量:默认取值为当前核数和2的最小值
//val rdd: RDD[String] = sc.textFile("input")
//2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
//val rdd: RDD[String] = sc.textFile("input/3.txt",3)
//3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空}
val rdd: RDD[String] = sc.textFile("input/4.txt",3)
rdd.saveAsTextFile("output")
sc.stop()
}
}
2)源码解析
注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量
start=split.getStart() end = start + split.getLength
2.4Transformation转换算子(面试开发重点)
RDD整体上分为Value类型、双Value类型和Key-Value类型
2.4.1Value类型
2.4.1.1map()映射
4)具体实现
object value01_map {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
// 3.2 调用map方法,每个元素乘以2
val mapRdd: RDD[Int] = rdd.map(_ * 2)
// 3.3 打印修改后的RDD中数据
mapRdd.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.2mapPartitions()以分区为单位执行Map
4)具体实现
object value02_mapPartitions {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
// 3.2 调用mapPartitions方法,每个元素乘以2
val rdd1 = rdd.mapPartitions(x=>x.map(_*2))
// 3.3 打印修改后的RDD中数据
rdd1.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.3map()和mapPartitions()区别
2.4.1.4mapPartitionsWithIndex()带分区号
4)具体实现
object value03_mapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
// 3.2 创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD
val indexRdd = rdd.mapPartitionsWithIndex( (index,items)=>{items.map( (index,_) )} )
//扩展功能:第二个分区元素*2,其余分区不变
// 3.3 打印修改后的RDD中数据
indexRdd.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.5flatMap()压平
4)具体实现:
object value04_flatMap {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)
// 3.2 把所有子集合中数据取出放入到一个大的集合中
listRDD.flatMap(list=>list).collect.foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.6glom()分区转换数组
4)具体实现
object value05_glom {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd = sc.makeRDD(1 to 4, 2)
// 3.2 求出每个分区的最大值 0->1,2 1->3,4
val maxRdd: RDD[Int] = rdd.glom().map(_.max)
// 3.3 求出所有分区的最大值的和 2 + 4
println(maxRdd.collect().sum)
//4.关闭连接
sc.stop()
}
}
2.4.1.7groupBy()分组
4)具体实现
object value06_groupby {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd = sc.makeRDD(1 to 4, 2)
// 3.2 将每个分区的数据放到一个数组并收集到Driver端打印
rdd.groupBy(_ % 2).collect().foreach(println)
// 3.3 创建一个RDD
val rdd1: RDD[String] = sc.makeRDD(List("hello","hive","hadoop","spark","scala"))
// 3.4 按照首字母第一个单词相同分组
rdd1.groupBy(str=>str.substring(0,1)).collect().foreach(println)
sc.stop()
}
}
groupBy会存在shuffle过程
shuffle:将不同的分区数据进行打乱重组的过程
shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
2.4.1.8GroupBy之WordCount
object value06_groupby {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val strList: List[String] = List("Hello Scala", "Hello Spark", "Hello World")
val rdd = sc.makeRDD(strList)
// 3.2 将字符串拆分成一个一个的单词
val wordRdd: RDD[String] = rdd.flatMap(str=>str.split(" "))
// 3.3 将单词结果进行转换:word=>(word,1)
val wordToOneRdd: RDD[(String, Int)] = wordRdd.map(word=>(word, 1))
// 3.4 将转换结构后的数据分组
val groupRdd: RDD[(String, Iterable[(String, Int)])] = wordToOneRdd.groupBy(t=>t._1)
// 3.5 将分组后的数据进行结构的转换
val wordToSum: RDD[(String, Int)] = groupRdd.map {
case (word, list) => {
(word, list.size)
}
}
wordToSum.collect().foreach(println)
sc.stop()
}
}
扩展复杂版WordCount
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2)))
2.4.1.9filter()过滤
4)代码实现:
object value07_filter {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4),2)
//3.1 过滤出符合条件的数据
val filterRdd: RDD[Int] = rdd.filter(_ % 2 == 0)
//3.2 收集并打印数据
filterRdd.collect().foreach(println)
//4 关闭连接
sc.stop()
}
}
2.4.1.10sample()采样
4)代码实现:
object value08_sample {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
// 3.2 打印放回抽样结果
rdd.sample(true, 0.4, 2).collect().foreach(println)
// 3.3 打印不放回抽样结果
rdd.sample(false, 0.2, 3).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.11distinct()去重
4)代码实现:
object value09_distinct {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))
// 3.2 打印去重后生成的新RDD
distinctRdd.distinct().collect().foreach(println)
// 3.3 对RDD采用多个Task去重,提高并发度
distinctRdd.distinct(2).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.12coalesce()重新分区
Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。
1、不执行Shuffle方式
5)代码实现
object value10_coalesce {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
//3.1 缩减分区
//val coalesceRdd: RDD[Int] = rdd.coalesce(2)
//4. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//4.1 缩减分区
val coalesceRdd: RDD[Int] = rdd.coalesce(2)
//5 打印查看对应分区数据
val indexRdd: RDD[Int] = coalesceRdd.mapPartitionsWithIndex(
(index, datas) => {
// 打印每个分区数据,并带分区号
datas.foreach(data => {
println(index + "=>" + data)
})
// 返回分区的数据
datas
}
)
indexRdd.collect()
//6. 关闭连接
sc.stop()
}
}
2、执行Shuffle方式
//3. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//3.1 执行shuffle
val coalesceRdd: RDD[Int] = rdd.coalesce(2, true)
输出结果:
0=>1
1=>2
0=>3
1=>4
0=>5
1=>6
3、Shuffle原理
2.4.1.13repartition()重新分区(执行Shuffle)
4)代码实现:
package com.atguigu.createrdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object value11_repartition {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//3.1 缩减分区
//val coalesceRdd: RDD[Int] = rdd.coalesce(2,true)
//3.2 重新分区
val repartitionRdd: RDD[Int] = rdd.repartition(2)
//4 打印查看对应分区数据
val indexRdd: RDD[Int] = repartitionRdd.mapPartitionsWithIndex(
(index, datas) => {
// 打印每个分区数据,并带分区号
datas.foreach(data => {
println(index + "=>" + data)
})
// 返回分区的数据
datas
}
)
indexRdd.collect()
//6. 关闭连接
sc.stop()
}
}
2.4.1.14coalesce和repartition区别
1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2)repartition实际上是调用的coalesce,进行shuffle。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。
package com.atguigu.createrdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object value11_repartition {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 3)
//3.1 合并分区(没有shuffle)
// coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的
//val pRdd: RDD[Int] = rdd.coalesce(4)
//3.2 重新分区(有shuffle)
val pRdd: RDD[Int] = rdd.repartition(4)
//4 打印查看对应分区数据
val indexRdd: RDD[Int] = pRdd.mapPartitionsWithIndex(
(index, datas) => {
// 打印每个分区数据,并带分区号
datas.foreach(data => {
println(index + "=>" + data)
})
// 返回分区的数据
datas
}
)
indexRdd.collect()
//6. 关闭连接
sc.stop()
}
}
2.4.1.15sortBy()排序
4)代码实现:
object value12_sortBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 3, 4, 6, 5))
// 3.2 默认是升序排
val sortRdd: RDD[Int] = rdd.sortBy(num => num)
sortRdd.collect().foreach(println)
// 3.3 配置为倒序排
val sortRdd2: RDD[Int] = rdd.sortBy(num => num, false)
sortRdd2.collect().foreach(println)
// 3.4 创建一个RDD
val strRdd: RDD[String] = sc.makeRDD(List("1", "22", "12", "2", "3"))
// 3.5 按照字符的int值排序
strRdd.sortBy(num => num.toInt).collect().foreach(println)
// 3.5 创建一个RDD
val rdd3: RDD[(Int, Int)] = sc.makeRDD(List((2, 1), (1, 2), (1, 1), (2, 2)))
// 3.6 先按照tuple的第一个值排序,相等再按照第2个值排
rdd3.sortBy(t=>t).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.1.16pipe()调用脚本
3)需求说明:编写一个脚本,使用管道将脚本作用于RDD上。
(1)编写一个脚本,并增加执行权限
[atguigu@hadoop102 spark]$ vim pipe.sh
#!/bin/sh
echo "Start"
while read LINE; do
echo ">>>"${LINE}
done
[atguigu@hadoop102 spark]$ chmod 777 pipe.sh
(2)创建一个只有一个分区的RDD
scala> val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1)
(3)将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
(4)创建一个有两个分区的RDD
scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2)
(5)将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res19: Array[String] = Array(Start, >>>hi, >>>Hello, Start, >>>how, >>>are, >>>you)
2.4.2双Value类型交互
2.4.2.1 union()并集
4)代码实现:
object DoubleValue01_union {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd1: RDD[Int] = sc.makeRDD(1 to 4)
//3.2 创建第二个RDD
val rdd2: RDD[Int] = sc.makeRDD(4 to 8)
//3.3 计算两个RDD的并集
rdd1.union(rdd2).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.2.2 subtract ()差集
4)代码实现:
object DoubleValue02_subtract {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4)
//3.2 创建第二个RDD
val rdd1: RDD[Int] = sc.makeRDD(4 to 8)
//3.3 计算第一个RDD与第二个RDD的差集并打印
rdd.subtract(rdd1).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.2.3 intersection()交集
4)代码实现:
object DoubleValue03_intersection {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd1: RDD[Int] = sc.makeRDD(1 to 4)
//3.2 创建第二个RDD
val rdd2: RDD[Int] = sc.makeRDD(4 to 8)
//3.3 计算第一个RDD与第二个RDD的差集并打印
rdd1.intersection(rdd2).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.2.4 zip()拉链
3)需求说明:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD
4)代码实现:
object DoubleValue04_zip {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3),3)
//3.2 创建第二个RDD
val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c"),3)
//3.3 第一个RDD组合第二个RDD并打印
rdd1.zip(rdd2).collect().foreach(println)
//3.4 第二个RDD组合第一个RDD并打印
rdd2.zip(rdd1).collect().foreach(println)
//3.5 创建第三个RDD(与1,2分区数不同)
val rdd3: RDD[String] = sc.makeRDD(Array("a","b"),3)
//3.6 元素个数不同,不能拉链
// Can only zip RDDs with same number of elements in each partition
rdd1.zip(rdd3).collect().foreach(println)
//3.7 创建第四个RDD(与1,2分区数不同)
val rdd4: RDD[String] = sc.makeRDD(Array("a","b","c"),2)
//3.8 分区数不同,不能拉链
// Can't zip RDDs with unequal numbers of partitions: List(3, 2)
rdd1.zip(rdd4).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3Key-Value类型
2.4.3.1 partitionBy()按照K重新分区
4)代码实现:
object KeyValue01_partitionBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
//3.2 对RDD重新分区
val rdd2: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
//3.3 查看新RDD的分区数
println(rdd2.partitions.size)
//4.关闭连接
sc.stop()
}
}
5)HashPartitioner源码解读
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
6)自定义分区器
object KeyValue01_partitionBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
//3.2 自定义分区
val rdd3: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
//4 打印查看对应分区数据
val indexRdd: RDD[(Int, String)] = rdd3.mapPartitionsWithIndex(
(index, datas) => {
// 打印每个分区数据,并带分区号
datas.foreach(data => {
println(index + "=>" + data)
})
// 返回分区的数据
datas
}
)
indexRdd.collect()
//5.关闭连接
sc.stop()
}
}
// 自定义分区
class MyPartitioner(num: Int) extends Partitioner {
// 设置的分区数
override def numPartitions: Int = num
// 具体分区逻辑
override def getPartition(key: Any): Int = {
if (key.isInstanceOf[Int]) {
val keyInt: Int = key.asInstanceOf[Int]
if (keyInt % 2 == 0)
0
else
1
}else{
0
}
}
}
2.4.3.2 reduceByKey()按照K聚合V
4)代码实现:
object KeyValue02_reduceByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
//3.2 计算相同key对应值的相加结果
val reduce: RDD[(String, Int)] = rdd.reduceByKey((x,y) => x+y)
//3.3 打印结果
reduce.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.3 groupByKey()按照K重新分组
3)需求说明:创建一个pairRDD,将相同key对应值聚合到一个seq中,并计算相同key对应值的相加结果。
4)代码实现:
object KeyValue03_groupByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
//3.2 将相同key对应值聚合到一个Seq中
val group: RDD[(String, Iterable[Int])] = rdd.groupByKey()
//3.3 打印结果
group.collect().foreach(println)
//3.4 计算相同key对应值的相加结果
group.map(t=>(t._1,t._2.sum)).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.4 reduceByKey和groupByKey区别
1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
2)groupByKey:按照key进行分组,直接进行shuffle。
3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。
2.4.3.5 aggregateByKey()按照K处理分区内和分区间逻辑
4)需求分析
5)代码实现:
object KeyValue04_aggregateByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
//3.2 取出每个分区相同key对应值的最大值,然后相加
rdd.aggregateByKey(0)(math.max(_, _), _ + _).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.6 foldByKey()分区内和分区间相同的aggregateByKey()
4)代码实现:
object KeyValue05_foldByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val list: List[(String, Int)] = List(("a",1),("a",1),("a",1),("b",1),("b",1),("b",1),("b",1),("a",1))
val rdd = sc.makeRDD(list,2)
//3.2 求wordcount
//rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println)
rdd.foldByKey(0)(_+_).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.7 combineByKey()转换结构后分区内和分区间操作
3)需求说明:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key对应值的总和以及key出现的次数,再相除得到结果)
4)需求分析:
5)代码实现
object KeyValue06_combineByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建第一个RDD
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
//3.2 将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
//3.3 打印合并后的结果
combineRdd.collect().foreach(println)
//3.4 计算平均值
combineRdd.map {
case (key, value) => {
(key, value._1 / value._2.toDouble)
}
}.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.8 reduceByKey、aggregateByKey、foldByKey、combineByKey
2.4.3.9 sortByKey()按照K进行排序
3)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序
4)代码实现:
object KeyValue07_sortByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
//3.2 按照key的正序(默认顺序)
rdd.sortByKey(true).collect().foreach(println)
//3.3 按照key的倒序
rdd.sortByKey(false).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.9 mapValues()只对V进行操作
4)代码实现:
object KeyValue08_mapValues {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
//3.2 对value添加字符串"|||"
rdd.mapValues(_ + "|||").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.10 join()连接 将相同key对应的多个value关联在一起
3)需求说明:创建两个pairRDD,并将key相同的数据聚合到一个元组。
4)代码实现:
object KeyValue09_join {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
//3.2 创建第二个pairRDD
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
//3.3 join操作并打印结果
rdd.join(rdd1).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.3.11 cogroup() 类似全连接,但是在同一个RDD中对key聚合
操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
4)代码实现:
object KeyValue10_cogroup {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c")))
//3.2 创建第二个RDD
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1,4),(2,5),(3,6)))
//3.3 cogroup两个RDD并打印结果
rdd.cogroup(rdd1).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.4.4案例实操(广告点击Top3)
0)数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分割。
3)实现过程
object Demo_top3 {
def main(args: Array[String]): Unit = {
//1. 初始化Spark配置信息并建立与Spark的连接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")
val sc = new SparkContext(sparkConf)
//2. 读取日志文件,获取原始数据
val dataRDD: RDD[String] = sc.textFile("input/agent.log")
//3. 将原始数据进行结构转换string =>(prv-adv,1)
val prvAndAdvToOneRDD: RDD[(String, Int)] = dataRDD.map {
line => {
val datas: Array[String] = line.split(" ")
(datas(1) + "-" + datas(4), 1)
}
}
//4. 将转换结构后的数据进行聚合统计(prv-adv,1)=>(prv-adv,sum)
val prvAndAdvToSumRDD: RDD[(String, Int)] = prvAndAdvToOneRDD.reduceByKey(_ + _)
//5. 将统计的结果进行结构的转换(prv-adv,sum)=>(prv,(adv,sum))
val prvToAdvAndSumRDD: RDD[(String, (String, Int))] = prvAndAdvToSumRDD.map {
case (prvAndAdv, sum) => {
val ks: Array[String] = prvAndAdv.split("-")
(ks(0), (ks(1), sum))
}
}
//6. 根据省份对数据进行分组:(prv,(adv,sum)) => (prv, Iterator[(adv,sum)])
val groupRDD: RDD[(String, Iterable[(String, Int)])] = prvToAdvAndSumRDD.groupByKey()
//7. 对相同省份中的广告进行排序(降序),取前三名
val mapValuesRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues {
datas => {
datas.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(3)
}
}
//8. 将结果打印
mapValuesRDD.collect().foreach(println)
//9.关闭与spark的连接
sc.stop()
}
}
2.5Action行动算子
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
2.5.1reduce()聚合
1)函数签名:def reduce(f: (T, T) => T): T
2)功能说明:f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
3)需求说明:创建一个RDD,将所有元素聚合得到结果
object action01_reduce {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 聚合数据
val reduceResult: Int = rdd.reduce(_+_)
println(reduceResult)
//4.关闭连接
sc.stop()
}
}
2.5.2collect()以数组的形式返回数据集
1)函数签名:def collect(): Array[T]
2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
注意:所有的数据都会被拉取到Driver端,慎用
3)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印
object action02_collect {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 收集数据到Driver
rdd.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.5.3count()返回RDD中元素个数
1)函数签名:def count(): Long
2)功能说明:返回RDD中元素的个数
3)需求说明:创建一个RDD,统计该RDD的条数
object action03_count {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 返回RDD中元素的个数
val countResult: Long = rdd.count()
println(countResult)
//4.关闭连接
sc.stop()
}
}
2.5.4first()返回RDD中的第一个元素
1)函数签名: def first(): T
2)功能说明:返回RDD中的第一个元素
3)需求说明:创建一个RDD,返回该RDD中的第一个元素
object action04_first {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 返回RDD中元素的个数
val firstResult: Int = rdd.first()
println(firstResult)
//4.关闭连接
sc.stop()
}
}
2.5.5take()返回由RDD前n个元素组成的数组
1)函数签名: def take(num: Int): Array[T]
2)功能说明:返回一个由RDD的前n个元素组成的数组
3)需求说明:创建一个RDD,统计该RDD的条数
object action05_take {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 返回RDD中元素的个数
val takeResult: Array[Int] = rdd.take(2)
println(takeResult)
//4.关闭连接
sc.stop()
}
}
2.5.6takeOrdered()返回该RDD排序后前n个元素组成的数组
1)函数签名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
2)功能说明:返回该RDD排序后的前n个元素组成的数组
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
......
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
3)需求说明:创建一个RDD,获取该RDD排序后的前2个元素
object action06_takeOrdered{
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
//3.2 返回RDD中元素的个数
val result: Array[Int] = rdd.takeOrdered(2)
println(result)
//4.关闭连接
sc.stop()
}
}
2.5.7aggregate()案例
3)需求说明:创建一个RDD,将所有元素相加得到结果
object action07_aggregate {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),8)
//3.2 将该RDD所有元素相加得到结果
//val result: Int = rdd.aggregate(0)(_ + _, _ + _)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
println(result)
//4.关闭连接
sc.stop()
}
}
2.5.8fold()案例
3)需求说明:创建一个RDD,将所有元素相加得到结果
object action08_fold {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//3.2 将该RDD所有元素相加得到结果
val foldResult: Int = rdd.fold(0)(_+_)
println(foldResult)
//4.关闭连接
sc.stop()
}
}
2.5.9countByKey()统计每种key的个数
1)函数签名:def countByKey(): Map[K, Long]
2)功能说明:统计每种key的个数
3)需求说明:创建一个PairRDD,统计每种key的个数
object action09_countByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
//3.2 统计每种key的个数
val result: collection.Map[Int, Long] = rdd.countByKey()
println(result)
//4.关闭连接
sc.stop()
}
}
2.5.10save相关算子
1)saveAsTextFile(path)保存成Text文件
(1)函数签名
(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
2)saveAsSequenceFile(path) 保存成Sequencefile文件
(1)函数签名
(2)功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
注意:只有kv类型RDD有该操作,单值的没有
3)saveAsObjectFile(path) 序列化成对象保存到文件
(1)函数签名
(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。
4)代码实现
object action10_save {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//3.2 保存成Text文件
rdd.saveAsTextFile("output")
//3.3 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
//3.4 保存成Sequencefile文件
rdd.map((_,1)).saveAsSequenceFile("output2")
//4.关闭连接
sc.stop()
}
}
2.5.11foreach(f)遍历RDD中每一个元素
3)需求说明:创建一个RDD,对每个元素进行打印
object action11_foreach {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
// val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//3.2 收集后打印
rdd.map(num=>num).collect().foreach(println)
println("****************")
//3.3 分布式打印
rdd.foreach(println)
//4.关闭连接
sc.stop()
}
}
2.6RDD序列化
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:
2.6.1闭包检查
1)闭包引入
object serializable01_object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建两个对象
val user1 = new User()
user1.name = "zhangsan"
val user2 = new User()
user2.name = "lisi"
val userRDD1: RDD[User] = sc.makeRDD(List(user1, user2))
//3.1 打印,ERROR报java.io.NotSerializableException
//userRDD1.foreach(user => println(user.name))
//3.2 打印,RIGHT
val userRDD2: RDD[User] = sc.makeRDD(List())
//userRDD2.foreach(user => println(user.name))
//3.3 打印,ERROR Task not serializable 注意:没执行就报错了
userRDD2.foreach(user => println(user1.name))
//4.关闭连接
sc.stop()
}
}
//class User {
// var name: String = _
//}
class User extends Serializable {
var name: String = _
}
2)闭包检查
2.6.2序列化方法和属性
1)说明
Driver:算子以外的代码都是在Driver端执行
Executor:算子里面的代码都是在Executor端执行
2)代码实现
object serializable02_function {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
//3.1创建一个Search对象
val search = new Search("hello")
// Driver:算子以外的代码都是在Driver端执行
// Executor:算子里面的代码都是在Executor端执行
//3.2 函数传递,打印:ERROR Task not serializable
search.getMatch1(rdd).collect().foreach(println)
//3.3 属性传递,打印:ERROR Task not serializable
search.getMatche2(rdd).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
class Search(query:String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
//rdd.filter(this.isMatch)
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatche2(rdd: RDD[String]): RDD[String] = {
//rdd.filter(x => x.contains(this.query))
rdd.filter(x => x.contains(query))
//val q = query
//rdd.filter(x => x.contains(q))
}
}
3)问题一说明
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
(1)在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
(2)解决方案
类继承scala.Serializable即可。
class Search() extends Serializable{...}
4)问题二说明
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
(1)在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
(2)解决方案一
(a)类继承scala.Serializable即可。
class Search() extends Serializable{...}
(b)将类变量query赋值给局部变量
修改getMatche2为
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
val q = this.query//将类变量赋值给局部变量
rdd.filter(x => x.contains(q))
}
(3)解决方案二
把Search类变成样例类,样例类默认是序列化的。
case class Search(query:String) extends Serializable {...}
2.6.3Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的提交也比较大.
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用kryo来序列化。
注意:即使使用kryo序列化,也要继承Serializable接口。
object serializable03_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
2.7RDD依赖关系
2.7.1查看血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
1)代码实现
object Lineage01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.toDebugString)
println("----------------------")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
resultRDD.collect()
//4.关闭连接
sc.stop()
}
}
2)打印结果
(2) input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) ShuffledRDD[4] at reduceByKey at Lineage01.scala:27 []
+-(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
注意:圆括号中的数字表示RDD的并行度,也就是有几个分区
2.7.2查看依赖关系
1)代码实现
object Lineage01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.dependencies)
println("----------------------")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.dependencies)
println("----------------------")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.dependencies)
println("----------------------")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.dependencies)
resultRDD.collect()
//4.关闭连接
sc.stop()
}
}
2)打印结果
List(org.apache.spark.OneToOneDependency@f2ce6b)
----------------------
List(org.apache.spark.OneToOneDependency@692fd26)
----------------------
List(org.apache.spark.OneToOneDependency@627d8516)
----------------------
List(org.apache.spark.ShuffleDependency@a518813)
3)全局搜索org.apache.spark.OneToOneDependency
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
注意:要想理解RDDS是如何工作的,最重要的就是理解Transformations。
RDD 之间的关系可以从两个维度来理解: 一个是 RDD 是从哪些 RDD 转换而来, 也就是 RDD 的 parent RDD(s)是什么; 另一个就是 RDD 依赖于 parent RDD(s)的哪些 Partition(s). 这种关系就是 RDD 之间的依赖.
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
2.7.3窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
2.7.4宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的 transformations 包括: sort, reduceByKey, groupByKey, join, 和调用rePartition函数的任何操作.
宽依赖对 Spark 去评估一个 transformations 有更加重要的影响, 比如对性能的影响.
2.7.5Spark中的Job调度
一个Spark应用包含一个驱动进程(driver process,在这个进程中写Spark的逻辑代码)和多个执行器进程(executor process,跨越集群中的多个节点)。Spark 程序自己是运行在驱动节点, 然后发送指令到执行器节点。
一个Spark集群可以同时运行多个Spark应用, 这些应用是由集群管理器(cluster manager)来调度。
Spark应用可以并发的运行多个job, job对应着给定的应用内的在RDD上的每个 action操作。
Spark应用
一个Spark应用可以包含多个Spark job, Spark job是在驱动程序中由SparkContext 来定义的。
当启动一个 SparkContext 的时候, 就开启了一个 Spark 应用。 一个驱动程序被启动了, 多个执行器在集群中的多个工作节点(worker nodes)也被启动了。 一个执行器就是一个 JVM, 一个执行器不能跨越多个节点, 但是一个节点可以包括多个执行器。
一个 RDD 会跨多个执行器被并行计算. 每个执行器可以有这个 RDD 的多个分区, 但是一个分区不能跨越多个执行器.
Spark Job 的划分
由于Spark的懒执行, 在驱动程序调用一个action之前, Spark 应用不会做任何事情,
针对每个action,Spark 调度器就创建一个执行图(execution graph)和启动一个 Spark job。
每个 job 由多个stages 组成, 这些 stages 就是实现最终的 RDD 所需的数据转换的步骤。一个宽依赖划分一个stage。每个 stage 由多个 tasks 来组成, 这些 tasks 就表示每个并行计算, 并且会在多个执行器上执行。
2.7.6Stage任务划分(面试重点)
1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。例如,DAG记录了RDD的转换过程和任务的阶段。
2)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
3)代码实现
object Stage01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2. Application:初始化一个SparkContext即生成一个Application;
val sc: SparkContext = new SparkContext(conf)
//3. 创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,1,2),2)
//3.1 聚合
val resultRDD: RDD[(Int, Int)] = dataRDD.map((_,1)).reduceByKey(_+_)
// Job:一个Action算子就会生成一个Job;
//3.2 job1打印到控制台
resultRDD.collect().foreach(println)
//3.3 job2输出到磁盘
resultRDD.saveAsTextFile("output")
Thread.sleep(1000000)
//4.关闭连接
sc.stop()
}
}
4)查看Job个数
查看http://localhost:4040/jobs/,发现Job有两个。
5)查看Stage个数
查看Job0的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
查看Job1的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
6)Task个数
查看Job0的Stage0的Task个数
查看Job0的Stage1的Task个数
查看Job1的Stage2的Task个数
查看Job1的Stage3的Task个数
注意:如果存在shuffle过程,系统会自动进行缓存,UI界面显示skipped的部分
2.7.7Stage任务划分源码分析
2.8RDD持久化
2.8.1RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
1)代码实现
object cache01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input1")
//3.1.业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {
word => {
println("************")
(word, 1)
}
}
//3.5 cache操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
//3.4 数据缓存。
wordToOneRdd.cache()
//3.6 可以更改存储级别
// wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
//3.2 触发执行逻辑
wordToOneRdd.collect()
println("-----------------")
println(wordToOneRdd.toDebugString)
//3.3 再次触发执行逻辑
wordToOneRdd.collect()
//4.关闭连接
sc.stop()
}
}
2)源码解析
mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
3)自带缓存算子
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
object cache02 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input1")
//3.1.业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {
word => {
println("************")
(word, 1)
}
}
// 采用reduceByKey,自带缓存
val wordByKeyRDD: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)
//3.5 cache操作会增加血缘关系,不改变原有的血缘关系
println(wordByKeyRDD.toDebugString)
//3.4 数据缓存。
//wordByKeyRDD.cache()
//3.2 触发执行逻辑
wordByKeyRDD.collect()
println("-----------------")
println(wordByKeyRDD.toDebugString)
//3.3 再次触发执行逻辑
wordByKeyRDD.collect()
//4.关闭连接
sc.stop()
}
}
访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的。
2.8.2RDD CheckPoint检查点
1)检查点:是通过将RDD中间结果写入磁盘。
2)为什么要做检查点?
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统阿善看到
4)检查点数据存储格式为:二进制的文件
5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
6)检查点触发时间:对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍
7)设置检查点步骤
(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")
(2)调用检查点方法:wordToOneRdd.checkpoint()
8)代码实现
object checkpoint01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 需要设置路径,否则抛异常:Checkpoint directory has not been set in the SparkContext
sc.setCheckpointDir("./checkpoint1")
//3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input1")
//3.1.业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
//3.5 增加缓存,避免再重新跑一个job做checkpoint
// wordToOneRdd.cache()
//3.4 数据检查点:针对wordToOneRdd做检查点计算
wordToOneRdd.checkpoint()
//3.2 触发执行逻辑
wordToOneRdd.collect().foreach(println)
// 会立即启动一个新的job来专门的做checkpoint运算
//3.3 再次触发执行逻辑
wordToOneRdd.collect().foreach(println)
wordToOneRdd.collect().foreach(println)
Thread.sleep(10000000)
//4.关闭连接
sc.stop()
}
}
9)执行结果
访问http://localhost:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。
(1)只增加checkpoint,没有增加Cache缓存打印
第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
(hadoop,1577960215526)
。。。。。。
(hello,1577960215526)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
(2)增加checkpoint,也增加Cache缓存打印
第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
2.8.3缓存和检查点区别
1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
4)如果使用完了缓存,可以通过unpersist()方法释放缓存
2.8.4检查点存储到HDFS集群
如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。
object checkpoint02 {
def main(args: Array[String]): Unit = {
// 设置访问HDFS集群的用户名
System.setProperty("HADOOP_USER_NAME","atguigu")
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
//3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input1")
//3.1.业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
//3.4 增加缓存,避免再重新跑一个job做checkpoint
wordToOneRdd.cache()
//3.3 数据检查点:针对wordToOneRdd做检查点计算
wordToOneRdd.checkpoint()
//3.2 触发执行逻辑
wordToOneRdd.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.9键值对RDD数据分区
Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
1)注意:
(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
2)获取RDD分区
object partitioner01_get {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3 创建RDD
val pairRDD: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))
//3.1 打印分区器
println(pairRDD.partitioner)
//3.2 使用HashPartitioner对RDD进行重新分区
val partitionRDD: RDD[(Int, Int)] = pairRDD.partitionBy(new HashPartitioner(2))
//3.3 打印分区器
println(partitionRDD.partitioner)
//4.关闭连接
sc.stop()
}
}
2.9.1Hash分区
2.9.2Ranger分区
2.9.3自定义分区
详见2.4.3.2。
第3章数据读取与保存
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS以及数据库。
3.1文件类数据读取与保存
3.1.1Text文件
1)数据读取:textFile(String)
2)数据保存:saveAsTextFile(String)
3)代码实现
object Operate_Text {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
//3.2 保存数据
inputRDD.saveAsTextFile("output")
//4.关闭连接
sc.stop()
}
}
4)注意:如果是集群路径:hdfs://hadoop102:9000/input/1.txt
3.1.2Json文件
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
1)数据准备
在input目录下创建1.txt文件,里面存储如下内容
{"username": "zhangsan","age": 20}
{"username": "lisi","age": 18}
{"username": "wangwu","age": 16}
2)代码实现
object Operate_Json {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 读取Json输入文件
val jsonRDD: RDD[String] = sc.textFile("input/user.json")
//3.2 导入解析Json所需的包并解析Json
import scala.util.parsing.json.JSON
val resultRDD: RDD[Option[Any]] = jsonRDD.map(JSON.parseFull)
//3.3 打印结果
resultRDD.collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
3)修改输入文件格式
[{"username": "zhangsan","age": 20},
{"username": "lisi","age": 18},
{"username": "wangwu","age": 16}
]
再次执行程序,发现解析失败。原因是一行一行的读取文件。
注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。
3.1.3Sequence文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
1)代码实现
object Operate_Sequence {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建rdd
val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))
//3.2 保存数据为SequenceFile
dataRDD.saveAsSequenceFile("output")
//3.3 读取SequenceFile文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2)注意:SequenceFile文件只针对PairRDD
3.1.4Object对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
1)代码实现
object Operate_Object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
//3.2 保存数据
dataRDD.saveAsObjectFile("output")
//3.3 读取数据
sc.objectFile[(Int)]("output").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
3.2文件系统类数据读取与保存
3.2.1HDFS
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口
3.2.2MySQL
支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:
(1)添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
(2)从Mysql读取数据
package com.atguigu
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.创建spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.创建SparkContext
val sc = new SparkContext(sparkConf)
//3.定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=? and `id`<=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最后结果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
(3)往Mysql写入数据
def main(args: Array[String]) {
package com.atguigu.spark.day06
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: Felix
* Date: 2020/1/8
* Desc: Spark操作MySQL数据库
*/
object Spark02_MySQL {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//数据库连接4要素
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop202:3306/test"
val userName = "root"
val passWd = "123456"
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("qiaofeng",18),("duanyu",20),("xuzhu",21)))
/*
//在循环中创建对象,效率低
rdd.foreach{
case (name,age)=>{
//注册驱动
Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url,userName,passWd)
//执行的sql
var sql:String = "insert into user(name,age) values(?,?)"
//获取数据库操作对象
val ps: PreparedStatement = conn.prepareStatement(sql)
//给参数赋值
ps.setString(1,name)
ps.setInt(2,age)
//执行sql语句
ps.executeUpdate()
//释放资源
ps.close()
conn.close()
}
}*/
/*
//注册驱动
Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url,userName,passWd)
//执行的sql
var sql:String = "insert into user(name,age) values(?,?)"
//获取数据库操作对象
val ps: PreparedStatement = conn.prepareStatement(sql)
rdd.foreach{
case (name,age)=>{
//给参数赋值
ps.setString(1,name)
ps.setInt(2,age)
//执行sql语句
ps.executeUpdate()
}
}
//释放资源
ps.close()
conn.close()*/
rdd.foreachPartition{
datas=>{
//注册驱动
Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url,userName,passWd)
//执行的sql
var sql:String = "insert into user(name,age) values(?,?)"
//获取数据库操作对象
val ps: PreparedStatement = conn.prepareStatement(sql)
datas.foreach{
case (name,age)=>{
//给参数赋值
ps.setString(1,name)
ps.setInt(2,age)
//执行sql语句
ps.executeUpdate()
}
}
//释放资源
ps.close()
conn.close()
}
}
// 关闭连接
sc.stop()
}
}
第4章累加器
累加器:分布式共享只写变量。(Task和Task之间不能读数据)
累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
4.1系统累加器
1)代码实现
object accumulator01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建RDD
val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
//3.1 打印单词出现的次数(a,10) 代码执行了shuffle
dataRDD.reduceByKey(_ + _).collect().foreach(println)
//3.2 如果不用shuffle,怎么处理呢?
var sum = 0
// 打印是在Executor端
dataRDD.foreach {
case (a, count) => {
sum = sum + count
println("sum=" + sum)
}
}
// 打印是在Driver端
println(("a", sum))
//3.3 使用累加器实现数据的聚合功能
// Spark自带常用的累加器
//3.3.1 声明累加器
val sum1: LongAccumulator = sc.longAccumulator("sum1")
dataRDD.foreach{
case (a, count)=>{
//3.3.2 使用累加器
sum1.add(count)
}
}
//3.3.3 获取累加器
println(sum1.value)
//4.关闭连接
sc.stop()
}
}
通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是 add)增加累加器的值。驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。
注意:
(1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。
(2)对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。
4.2自定义累加器
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。
1)自定义累加器步骤
(1)继承AccumulatorV2,设定输入、输出泛型
(2)重写方法
2)需求:自定义累加器,统计集合中首字母为“H”单词出现的次数。
List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark")
3)代码实现
object accumulator_define {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建RDD
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark"))
//3.1 创建累加器
val accumulator1: MyAccumulator = new MyAccumulator()
//3.2 注册累加器
sc.register(accumulator1,"wordcount")
//3.3 使用累加器
rdd.foreach(
word =>{
accumulator1.add(word)
}
)
//3.4 获取累加器的累加结果
println(accumulator1.value)
//4.关闭连接
sc.stop()
}
}
// 声明累加器
// 1.继承AccumulatorV2,设定输入、输出泛型
// 2.重新方法
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
// 定义输出数据集合
var map = mutable.Map[String, Long]()
// 是否为初始化状态,如果集合数据为空,即为初始化状态
override def isZero: Boolean = map.isEmpty
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
// 重置累加器
override def reset(): Unit = map.clear()
// 增加数据
override def add(v: String): Unit = {
// 业务逻辑
if (v.startsWith("H")) {
map(v) = map.getOrElse(v, 0L) + 1L
}
}
// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
var map1 = map
var map2 = other.value
map = map1.foldLeft(map2)(
(map,kv)=>{
map(kv._1) = map.getOrElse(kv._1, 0L) + kv._2
map
}
)
}
// 累加器的值,其实就是累加器的返回结果
override def value: mutable.Map[String, Long] = map
}
第5章广播变量
广播变量:分布式共享只读变量。
在多个并行操作中(Executor)使用同一个变量,Spark默认会为每个任务(Task)分别发送,这样如果共享比较大的对象,会占用很大工作节点的内存。
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。
1)使用广播变量步骤:
(1)通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象,任何可序列化的类型都可以这么实现。
(2)通过value属性访问该对象的值(在Java中为value()方法)。
(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
2)原理说明
3)代码实现
object broadcast {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建RDD
//val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
//val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
//3.1 采用RDD的方式实现 rdd1 join rdd2,用到Shuffle,性能比较低
//rdd1.join(rdd2).collect().foreach(println)
//3.2 采用集合的方式,实现rdd1和list的join
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val list: List[(String, Int)] = List(("a", 4), ("b", 5), ("c", 6))
// 声明广播变量
val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (k1, v1) => {
var v2: Int = 0
// 使用广播变量
//for ((k3, v3) <- list.value) {
for ((k3, v3) <- broadcastList.value) {
if (k1 == k3) {
v2 = v3
}
}
(k1, (v1, v2))
}
}
resultRDD.foreach(println)
//4.关闭连接
sc.stop()
}
}
第6章SparkCore项目实战
6.1数据准备
本项目的数据是采集电商网站的用户行为数据,主要包含用户的4种行为:搜索、点击、下单和支付。
1)数据格式
(1)数据采用_分割字段
(2)每一行表示用户的一个行为,所以每一行只能是四种行为中的一种。
(3)如果搜索关键字是null,表示这次不是搜索
(4)如果点击的品类id和产品id是-1表示这次不是点击
(5)下单行为来说一次可以下单多个产品,所以品类id和产品id都是多个,id之间使用逗号,分割。如果本次不是下单行为,则他们相关数据用null来表示
(6)支付行为和下单行为类似
2)数据详细字段说明
编号 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的ID
3 session_id String Session的ID
4 page_id Long 某个页面的ID
5 action_time String 动作的时间点
6 search_keyword String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的ID
8 click_product_id Long 某一个商品的ID
9 order_category_ids String 一次订单中所有品类的ID集合
10 order_product_ids String 一次订单中所有商品的ID集合
11 pay_category_ids String 一次支付中所有品类的ID集合
12 pay_product_ids String 一次支付中所有商品的ID集合
13 city_id Long 城市 id
6.2需求1:Top10热门品类
需求说明:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
生活用品 点击数 下单数 支付数
例如,综合排名=点击数*20%+下单数*30%+支付数*50%
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
6.2.1需求分析一
思路1
分别统计每个品类点击的次数,下单的次数和支付的次数。
6.2.2代码实现一
1)用来封装用户行为的样例类
//用户访问动作表
case class UserVisitAction(date: String,//用户点击行为的日期
user_id: Long,//用户的ID
session_id: String,//Session的ID
page_id: Long,//某个页面的ID
action_time: String,//动作的时间点
search_keyword: String,//用户搜索的关键词
click_category_id: Long,//某一个商品品类的ID
click_product_id: Long,//某一个商品的ID
order_category_ids: String,//一次订单中所有品类的ID集合
order_product_ids: String,//一次订单中所有商品的ID集合
pay_category_ids: String,//一次支付中所有品类的ID集合
pay_product_ids: String,//一次支付中所有商品的ID集合
city_id: Long)//城市 id
// 输出结果表
case class CategoryCountInfo(categoryId: String,//品类id
clickCount: Long,//点击次数
orderCount: Long,//订单次数
payCount: Long)//支付次数
2)核心业务代码实现
object Spark01_TopN_Req1_1 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 读取数据
val dataRDD: RDD[String] = sc.textFile("E:\\Felix课程\\大数据\\大数据_190826\\Felix_02_尚硅谷大数据技术之Spark\\2.资料\\spark-core数据\\")
//将原始数据进行转换(分解)
val actionRDD: RDD[UserVisitAction] = dataRDD.map {
line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong
)
}
}
//CategoryCountInfo(鞋,1,0,0)
//CategoryCountInfo(鞋,0,1,0)
//CategoryCountInfo(鞋,0,0,1)
//=>最终希望变成:CategoryCountInfo(鞋,1,1,1)
//再次转换分解的数据,封装为类别点击封装 CategoryCountInfo
//这里下单和支付操作可能会有多个品类,所以我们这里使用扁平化进行分解
//点击、下单、支付的动作获取品类的方式是不一样的,所以需要判断不同的行为
val infoRDD: RDD[CategoryCountInfo] = actionRDD.flatMap {
userAction => {
if (userAction.click_category_id != -1) {
//点击
List(CategoryCountInfo(userAction.click_category_id + "", 1, 0, 0))
} else if (userAction.order_category_ids != "null") {
//下单
val ids: Array[String] = userAction.order_category_ids.split(",")
val list: ListBuffer[CategoryCountInfo] = new ListBuffer[CategoryCountInfo]
for (id <- ids) {
list.append(CategoryCountInfo(id, 0, 1, 0))
}
list
} else if (userAction.pay_category_ids != "null") {
//搜索
val ids: Array[String] = userAction.pay_category_ids.split(",")
val list: ListBuffer[CategoryCountInfo] = new ListBuffer[CategoryCountInfo]
for (id <- ids) {
list.append(CategoryCountInfo(id, 0, 0, 1))
}
list
} else {
Nil
}
}
}
//将相同品类的分成一组
val groupRDD: RDD[(String, Iterable[CategoryCountInfo])] = infoRDD.groupBy(info=>info.categoryId)
//将分组后的数据进行聚合处理: 返回一个元组(品类id, 聚合后的CategoryCountInfo)
val reduceRDD: RDD[(String, CategoryCountInfo)] = groupRDD.mapValues {
datas =>
datas.reduce {
(info1, info2) => {
info1.clickCount = info1.clickCount + info2.clickCount
info1.orderCount = info1.orderCount + info2.orderCount
info1.payCount = info1.payCount + info2.payCount
info1
}
}
}
//转换结构为 RDD(聚合后的CategoryCountInfo)
val mapRDD: RDD[CategoryCountInfo] = reduceRDD.map(_._2)
//排序取前10 元组会按照顺序排序
val resRDD: Array[CategoryCountInfo] = mapRDD
.sortBy(info=>(info.clickCount,info.orderCount,info.payCount),false)
.take(10)
resRDD.foreach(println)
// 关闭连接
sc.stop()
}
}
3)使用reduceByKey的预聚合优化
object Spark02_TopN_Req1_2 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 读取数据
val dataRDD: RDD[String] = sc.textFile("E:\\Felix课程\\大数据\\大数据_190826\\Felix_02_尚硅谷大数据技术之Spark\\2.资料\\spark-core数据\\")
//将原始数据进行转换(分解)
val actionRDD: RDD[UserVisitAction] = dataRDD.map {
line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong
)
}
}
//CategoryCountInfo(鞋,1,0,0)
//CategoryCountInfo(鞋,0,1,0)
//CategoryCountInfo(鞋,0,0,1)
//=>最终希望变成:CategoryCountInfo(鞋,1,1,1)
//再次转换分解的数据,封装为类别点击封装 CategoryCountInfo
//这里下单和支付操作可能会有多个品类,所以我们这里使用扁平化进行分解
//点击、下单、支付的动作获取品类的方式是不一样的,所以需要判断不同的行为
val infoRDD: RDD[(String,CategoryCountInfo)] = actionRDD.flatMap {
userAction => {
if (userAction.click_category_id != -1) {
//点击
List((userAction.click_category_id+"",CategoryCountInfo(userAction.click_category_id + "", 1, 0, 0)))
} else if (userAction.order_category_ids != "null") {
//下单
val ids: Array[String] = userAction.order_category_ids.split(",")
val list: ListBuffer[(String,CategoryCountInfo)] = new ListBuffer[(String,CategoryCountInfo)]
for (id <- ids) {
list.append((id,CategoryCountInfo(id, 0, 1, 0)))
}
list
} else if (userAction.pay_category_ids != "null") {
//搜索
val ids: Array[String] = userAction.pay_category_ids.split(",")
val list: ListBuffer[(String,CategoryCountInfo)] = new ListBuffer[(String,CategoryCountInfo)]
for (id <- ids) {
list.append((id,CategoryCountInfo(id, 0, 0, 1)))
}
list
} else {
Nil
}
}
}
//将reduceByKey进行预聚合处理
val reduceRDD: RDD[(String, CategoryCountInfo)] = infoRDD.reduceByKey {
(info1, info2) => {
info1.clickCount = info1.clickCount + info2.clickCount
info1.orderCount = info1.orderCount + info2.orderCount
info1.payCount = info1.payCount + info2.payCount
info1
}
}
//转换结构为 RDD(聚合后的CategoryCountInfo)
val mapRDD: RDD[CategoryCountInfo] = reduceRDD.map(_._2)
//排序取前10 元组会按照顺序排序
val resRDD: Array[CategoryCountInfo] = mapRDD
.sortBy(info=>(info.clickCount,info.orderCount,info.payCount),false)
.take(10)
resRDD.foreach(println)
// 关闭连接
sc.stop()
}
}
6.2.3需求分析二
采用累加器,避免shuffle过程。
思路2
最好的办法应该是遍历一次能够计算出来上述的3个指标。使用累加器可以达成我们的需求。
(1)遍历全部日志数据,根据品类id和操作类型分别累加,需要用到累加器
定义累加器,当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
(2)遍历完成之后就得到每个品类 id 和操作类型的数量.
(3)按照点击下单支付的顺序来排序
(4)取出 Top10
6.2.4代码实现二
1)定义品类行为统计累加器
//品类行为统计累加器:
// ((鞋,click),1)
// ((鞋,order),1)
// ((鞋,pay),1)
// ((衣服,pay),1)
// 1)继承AccumulatorV2,声明泛型
// 2)重写方法
class CategoryCountAccumulator extends AccumulatorV2[UserVisitAction,mutable.Map[(String,String),Long]]{
var map = mutable.Map[(String,String),Long]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[UserVisitAction, mutable.Map[(String, String), Long]] = {
var newMap = new CategoryCountAccumulator
newMap.map = this.map
newMap
}
override def reset(): Unit = {
map.clear()
}
//((鞋,click),1)
override def add(action: UserVisitAction): Unit = {
if(action.click_category_id != -1){
//点击
var key = (action.click_category_id.toString,"click")
map(key) = map.getOrElse(key,0L) + 1L
}else if(action.order_category_ids != "null"){
//下单
val ids: Array[String] = action.order_category_ids.split(",")
for (id <- ids) {
var key = (id,"order")
map(key) = map.getOrElse(key,0L) + 1L
}
}else if(action.pay_category_ids != "null"){
//支付
val ids: Array[String] = action.pay_category_ids.split(",")
for (id <- ids) {
var key = (id,"pay")
map(key) = map.getOrElse(key,0L) + 1L
}
}
}
override def merge(other: AccumulatorV2[UserVisitAction, mutable.Map[(String, String), Long]]): Unit = {
var map1 = map
var map2 = other.value
map = map1.foldLeft(map2)(
(mmpp,kv)=>{
mmpp(kv._1) = mmpp.getOrElse(kv._1,0L) + kv._2
mmpp
}
)
}
override def value: mutable.Map[(String, String), Long] = map
}
2)代码实现
object Spark02_TopN_Req1_3 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 读取数据
val dataRDD: RDD[String] = sc.textFile("E:\\Felix课程\\大数据\\大数据_190826\\Felix_02_尚硅谷大数据技术之Spark\\2.资料\\spark-core数据\\")
//将原始数据进行转换(分解)
val actionRDD: RDD[UserVisitAction] = dataRDD.map {
line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong
)
}
}
//3.3 创建累加器
val acc: CategoryCountAccumulator = new CategoryCountAccumulator()
//3.4 注册累加器
sc.register(acc, "CategoryCountAccumulator")
actionRDD.foreach(
action => {
acc.add(action)
}
)
//3.5 获取累加器的值
//((鞋,click),10)
//((鞋,order),20)
//((鞋,pay),30)
val accMap: mutable.Map[(String, String), Long] = acc.value
//对累加出来的数据按照类别进行分组,注意:这个时候每个类别之后有三条记录,数据量不会很大
val groupMap: Map[String, mutable.Map[(String, String), Long]] = accMap.groupBy(_._1._1)
//对分组后的数据进行结构的转换:CategoryCountInfo
val infoes: immutable.Iterable[CategoryCountInfo] = groupMap.map {
case (id, map) => {
CategoryCountInfo(
id,
map.getOrElse((id, "click"), 0L),
map.getOrElse((id, "order"), 0L),
map.getOrElse((id, "pay"), 0L)
)
}
}
//将转换后的数据进行排序(降序)取前10名
infoes.toList.sortWith(
(left, right) => {
if (left.clickCount > right.clickCount) {
true
} else if (left.clickCount == right.clickCount) {
if (left.orderCount > right.orderCount) {
true
} else if (left.orderCount == right.orderCount) {
left.payCount > right.payCount
} else {
false
}
} else {
false
}
}
).take(10).foreach(println)
// 关闭连接
sc.stop()
}
}
6.3需求2:Top10热门品类中每个品类的Top10活跃Session统计
6.3.1需求分析
1)需求描述
对于排名前10的品类,分别获取每个品类点击次数排名前10的sessionId。(注意: 这里我们只关注点击次数,不关心下单和支付次数)
这个就是说,对于top10的品类,每一个都要获取对它点击次数排名前10的sessionId。这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的session的行为。
2)分析思路
通过需求1,获取TopN热门品类的id
将原始数据进行过滤(1.保留热门品类 2.只保留点击操作)
对session的点击数进行转换 (category-session,1)
对session的点击数进行统计 (category-session,sum)
将统计聚合的结果进行转换 (category,(session,sum))
将转换后的结构按照品类进行分组 (category,Iterator[(session,sum)])
对分组后的数据降序 取前10
6.3.2代码实现
//*******************需求二实现***********************
//1.获取TopN热门品类的id topNList来源需求一
val ids: List[String] = topNList.map(_.categoryId)
//因为这个ids要分给每个任务,所以可以使用广播变量
val broadcastIds: Broadcast[List[String]] = sc.broadcast(ids)
//2.将原始数据进行过滤(1.保留热门品类 2.只保留点击操作)
val filterRDD: RDD[UserVisitAction] = actionRDD.filter(action => {
if (action.click_category_id != -1) {
broadcastIds.value.contains(action.click_category_id.toString)
} else {
false
}
})
//3.对session的点击数进行转换 (category-session,1)
val mapRDD1: RDD[(String, Int)] = filterRDD.
map(action=>(action.click_category_id+"_"+action.session_id,1))
//4.对session的点击数进行统计 (category-session,sum)
val reduceRDD1: RDD[(String, Int)] = mapRDD1.reduceByKey(_+_)
//5.将统计聚合的结果进行转换 (category,(session,sum))
val mapRDD2: RDD[(String, (String, Int))] = reduceRDD1.map {
case (k, sum) => {
val categoryAndSession: Array[String] = k.split("_")
(categoryAndSession(0), (categoryAndSession(1), sum))
}
}
//6.将转换后的结构按照品类进行分组 (category,Iterator[(session,sum)])
val groupRDD2: RDD[(String, Iterable[(String, Int)])] = mapRDD2.groupByKey()
//7.对分组后的数据降序 取前10
val resRDD2: RDD[(String, List[(String, Int)])] = groupRDD2.mapValues {
datas => {
datas.toList.sortWith {
case (left, right) => {
left._2 > right._2
}
}.take(10)
}
}
resRDD2.foreach(println)
扩展:定义CategorySession 类,封装最终写入到数据库的数据
case class CategorySession(categoryId: String,
sessionId: String,
clickCount: Long)
6.4需求3:页面单跳转化率统计
6.4.1需求分析
1)需求描述
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率.
产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。
企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。
在该模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应得 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。
需要注意的一点是,页面的访问时有先后的,要做好排序。
2)思路分析
读取原始数据
将原始数据映射为样例类
将原始数据根据session进行分组
将分组后的数据根据时间进行排序(升序)
将排序后的数据进行结构的转换(pageId,1)
计算分母-将相同的页面id进行聚合统计(pageId,sum)
计算分子-将页面id进行拉链,形成连续的拉链效果,转换结构(pageId-pageId2,1)
将转换结构后的数据进行聚合统计(pageId-pageId2,sum)
计算页面单跳转换率
6.4.2代码实现
object Spark05_TopN_Req3_1{
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 读取数据
val dataRDD: RDD[String] = sc.textFile("E:\\Felix课程\\大数据\\大数据_190826\\Felix_02_尚硅谷大数据技术之Spark\\2.资料\\spark-core数据\\")
//将原始数据进行转换(分解)
val actionRDD: RDD[UserVisitAction] = dataRDD.map {
line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong
)
}
}
//*************需求三*****************
//计算分母-将相同的页面id进行聚合统计(pageId,sum)
val pageIdRDD: RDD[(Long, Long)] = actionRDD.map(action => {
(action.page_id, 1L)
})
val fmIdsMap: Map[Long, Long] = pageIdRDD.reduceByKey(_+_).collect().toMap
//计算分子-将页面id进行拉链,形成连续的拉链效果,转换结构(pageId-pageId2,1)
//将原始数据根据session进行分组
val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionRDD.groupBy(_.session_id)
//将分组后的数据根据时间进行排序(升序)
var pageFlowRDD: RDD[(String, List[(String, Int)])] = sessionRDD.mapValues(datas => {
val actions: List[UserVisitAction] = datas.toList.sortWith(
(left, right) => {
left.action_time < right.action_time
}
)
//将排序后的数据进行结构的转换(pageId,1)
val pageIdToOneList: List[(Long, Int)] = actions.map(action => (action.page_id, 1))
val pageFlows: List[((Long, Int), (Long, Int))] = pageIdToOneList.zip(pageIdToOneList.tail)
pageFlows.map {
case ((pageId1, _), (pageId2, _)) => {
(pageId1 + "-" + pageId2, 1)
}
}
}
)
//将转换结构后的数据进行聚合统计(pageId-pageId2,sum)
val pageFlowMapRDD: RDD[(String, Int)] = pageFlowRDD.map(_._2).flatMap(list=>list)
val page1AndPage2ToSumRDD: RDD[(String, Int)] = pageFlowMapRDD.reduceByKey(_+_)
//计算页面单跳转换率
page1AndPage2ToSumRDD.foreach{
case (pageFlow,fz)=>{
val pageIds: Array[String] = pageFlow.split("-")
//为了避免分母不存在,这里默认值给1
val fmSum: Long = fmIdsMap.getOrElse(pageIds(0).toLong,1L)
println(pageFlow + "=" + fz.toDouble/fmSum)
}
}
// 关闭连接
sc.stop()
}
}
第7章企业面试题
7.1编写WordCount(读取一个本地文件),并打包到集群运行,说明需要添加的主要参数
sc.textFile("hdfs://linux1:9000/test.log").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
7.2RDD的五个主要特性
分区器、首选位置、计算方法、依赖关系、分区
7.3如何创建一个RDD,有几种方式,举例说明
7.4创建一个RDD,使其一个分区的数据转变为一个String。例如(Array("a","b","c","d"),2)=>("ab","cd")
7.5map与mapPartitions的区别
7.6coalesce与repartition两个算子的作用以及区别与联系
7.7使用zip算子时需要注意的是什么(即哪些情况不能使用)
7.8reduceByKey跟groupByKey之间的区别。
7.9reduceByKey跟aggregateByKey之间的区别与联系。
7.10combineByKey的参数作用,说明其参数调用时机。
7.11使用RDD实现Join的多种方式。
7.12aggregateByKey与aggregate之间的区别与联系。
7.13创建一个RDD,自定义一种分区规则并实现?spark中是否可以按照Value分区。
7.14读取文件,实现WordCount功能。(使用不同的算子实现,至少3种方式)
7.15说说你对RDD血缘关系的理解。
7.16Spark是如何进行任务切分的,请说明其中涉及到的相关概念。
7.17RDD的cache和checkPoint的区别和联系。
7.18创建一个RDD,自定义一种分区规则并实现。
7.19Spark读取HDFS文件默认的切片机制。
7.20说说你对广播变量的理解。
7.21自定义一个累加器,实现计数功能。
第8章附录
8.1向HBase读写数据
由于org.apache.hadoop.hbase.mapreduce.TableInputFormat类的实现,Spark可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.
Result。
(1)添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
(2)从HBase读取数据
package com.atguigu
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.Bytes
object HBaseSpark {
def main(args: Array[String]): Unit = {
//创建spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//创建SparkContext
val sc = new SparkContext(sparkConf)
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//从HBase读取数据形成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count: Long = hbaseRDD.count()
println(count)
//对hbaseRDD进行处理
hbaseRDD.foreach {
case (_, result) =>
val key: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
}
//关闭连接
sc.stop()
}
}
3)往HBase写入
def main(args: Array[String]) {
//获取Spark配置信息并创建与spark的连接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
//创建HbaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//定义往Hbase插入数据的方法
def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
//创建一个RDD
val initialRDD = sc.makeRDD(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
//将RDD内容写到Hbase
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
}
标签:SparkContext,String,val,SparkConf,SparkCore,技术,RDD,sc,数据 来源: https://www.cnblogs.com/shan13936/p/13947836.html