Spark—算子—spark缓存策略
作者:互联网
Spark—算子—spark缓存策略
转换算子和操作算子
转换算子
转换算子:将一个RDD转换成另一个RDD,转换算子是懒执行,需要action算子来触发执行
操作算子
触发任务执行,一个action算子会触发一次任务执行,同时每一个action算子都会触发前面的代码执行
package com.core.day2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo16Action {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo16Action")
val sc = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("data/students.txt")
/**
* 转换算子:将一个RDD转换成另一个RDD,转换算子是懒执行,需要action算子来触发执行
*
* 操作算子:触发任务执行,一个action算子会触发一次任务执行,同时每一个action算子都会
* 触发前面的代码执行
*
*
*/
val studentRDD: RDD[(String, String, Int, String, String)] = linesRDD
.map(_.split(","))
.map{
case Array(id:String,name:String,age:String,gender:String,clazz:String) =>
println("============================")
(id,name,age.toInt,gender,clazz)
}
studentRDD.foreach(println)
println("=================================")
studentRDD.foreach(println)
/**
* action算子:action算子的返回值不一定是rdd,每一个action算子都会触发一个job任务执行
* foreach:循环rdd
* saveAsTextFile:保存数据
* count:统计行数
* collect:将rdd转换成集合
* take:取top
* reduce:全局聚合
* sum:求和,rdd必须可以求和
*
*/
//保存数据
studentRDD.saveAsTextFile("data/temp")
/**
* 将rdd转换成数组
*
* 处理的数据量很大时,会导致内存益处
*
*/
val array: Array[(String, String, Int, String, String)] = studentRDD.collect()
//取出top
val top: Array[(String, String, Int, String, String)] = studentRDD.take(10)
val stuRDD: RDD[Int] = studentRDD.map(s => 1)
val reduce: Int = stuRDD.reduce((x, y) => x + y)
val sum: Double = stuRDD.sum()
println(reduce)
println(sum)
while(true){
}
}
}
Spark缓存策略
package com.core.day2
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Demo17Cache {
def main(args: Array[String]): Unit = {
/**
* 缓存
*
*/
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo16Action")
val sc = new SparkContext(conf)
//设置checkpoint保存路径
//sc.setCheckpointDir("data/checkpoint")
//读取学生表数据
val linesRDD: RDD[String] = sc.textFile("data/students.txt")
//整理取出字段
val mapRDD: RDD[Array[String]] = linesRDD.map(_.split(","))
val studentRDD: RDD[(String, String, Int, String, String)] = mapRDD.map {
case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
println("=======map============")
(id, name, age.toInt, gender, clazz)
}
/**
* 对多次使用的RDD进行缓存
*/
//缓存在内存中
//studentRDD.cache()
//studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
//studentRDD.persist(StorageLevel.MEMORY_AND_DISK)
studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
/**
* checkpoint:将RDD的数据缓存到活hdfs中,任务失败了,数据也不会丢失
* checkpoint: 主要是再spark streaming中使用,用来保证任务的高可用
* cache:将数据缓存,在spark执行的服务器的内存或者磁盘上,如果任务失败,数据也就没来
*
*/
//studentRDD.persist()
// studentRDD.checkpoint()
//1、统计班级人数
studentRDD
.map {
case (_, _, _, _, clazz: String) =>
(clazz, 1)
}
.reduceByKey(_ + _)
.saveAsTextFile("data/clazz_num")
println("=" * 100)
//统计性别的人数
studentRDD
.map {
case (_, _, _, gender: String, _) =>
(gender, 1)
}
.reduceByKey(_ + _)
.saveAsTextFile("data/gender_num")
//统计年龄的人数
studentRDD
.map {
case (_, _, age: Int, _, _) =>
(age, 1)
}
.reduceByKey(_ + _)
.saveAsTextFile("data/age_num")
while (true) {
}
}
}
标签:缓存,String,val,RDD,算子,Spark,studentRDD,spark 来源: https://www.cnblogs.com/atao-BigData/p/16472132.html