Spark_常用算子
作者:互联网
Spark_常用算子
sortBy-sortBy: 指定一个字段进行排序,默认是升序, ascending = false: 降序
package com.core.day2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo13Sort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("Demo13Sort")
conf.setMaster("local")
val sc = new SparkContext(conf)
val kvRDD: RDD[(String, Int)] = sc.textFile("data/score.txt")
.map(_.split(","))
.filter(_.length == 3)
.map{
case Array(sid:String,_,sco:String) =>
(sid,sco.toInt)
}
val sum_scoreRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)
/**
* sortBy: 指定一个字段进行排序,默认是升序
* ascending = false: 降序
*
*/
val sortRDD: RDD[(String, Int)] = sum_scoreRDD.sortBy(kv => -kv._2)
sortRDD.foreach(println)
}
}
mapValues:: 对value作处理,key可以不变
package com.core.day2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo14MapValues {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("Demo14MapValues")
conf.setMaster("local")
val sc = new SparkContext(conf)
// 读取-分割-清洗-取出数据
val idAndScoADD: RDD[(String, Int)] = sc.textFile("data/score.txt")
.map(_.split(","))
.filter(_.length == 3)
.map{
case Array(sid:String,cid:String,sco:String) =>
(sid,sco.toInt)
}
//统计总分
val kvRDD: RDD[(String, Int)] = idAndScoADD.reduceByKey(_ + _)
/**
* mapValues: 对value作处理,key可以不变
*
*/
// 对上述所有的数据 乘以100
val sco_100: RDD[(String, Int)] = kvRDD.mapValues(sco => sco * 100)
sco_100.foreach(println)
}
}
mapPartitions
mapPartitions:一次处理一个分区的数据,一个一个传递给后面的函数
迭代器中是一个分区的数据
函数的返回值也是一个迭代器
mapPartitionsWithIndex:对一个分区进行编号
package com.core.day2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo15MapPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("Demo15MapPartition")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("data/words")
//println(lineRDD.getNumPartitions)
/**
* mapPartitions:一次处理一个分区的数据,一个一个传递给后面的函数
* 迭代器中是一个分区的数据
* 函数的返回值也是一个迭代器
*
*/
val wordsRDD: RDD[String] = lineRDD.mapPartitions((iter:Iterator[String]) => {
//在函数类对一个分区的数据进行处理
val words: Iterator[String] = iter.flatMap(_.split(","))
words
})
wordsRDD.foreach(println)
/**
* mapPartitionsWithIndex:对一个分区进行编号
*
*/
wordsRDD.mapPartitionsWithIndex{
case (index:Int,iter:Iterator[String]) =>
println(s"mapPartitionsWithIndex:$index")
iter
}
.foreach(println)
}
}
标签:常用,String,val,Int,sco,RDD,conf,算子,Spark 来源: https://www.cnblogs.com/atao-BigData/p/16472048.html