spark streaming-DS,DF,RDD相互转换,submit,数据落盘
作者:互联网
spark streaming
DS转成DF写代码
package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
object Demo44DStreamTORDDAndDF {
def main(args: Array[String]): Unit = {
/**
* 创建sparkSession
*/
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ds")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
/**
* 读取一个socket得到一个ds
*
*/
val lineDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
/**
* DStream底层也是一个RDD,每隔一段时间将接收到的数据封装成一个 RDD。
* 每隔5秒一个rdd,rdd中的数据是不一样的
*
* 转换成rdd之后就不能使用有状态算子
*
*/
lineDS.foreachRDD((rdd: RDD[String]) => {
println("正在处理数据")
//在这里可以写rdd代码
rdd
.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
//.foreach(println)
/**
* rdd可以转换成DF,就可以写sql了
*/
val linesDF: DataFrame = rdd.toDF("line")
linesDF.createOrReplaceTempView("lines")
val countDF: DataFrame = spark.sql(
"""
|select word,count(1) as c from (
|select explode(split(line,',')) as word
|from lines
|) as a
|group by word
|
|""".stripMargin)
countDF.show()
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
RDD转成写DS代码
package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo55RDDToDStream {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ds")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
/**
* transform:每隔5秒传入一个rdd,在里面使用一个rdd的api处理数据
* 处理完之后再返回一个rdd
*
*/
val resultDS: DStream[(String, Int)] = linesDS.transform((rdd:RDD[String]) =>{
//rdd的计算是一个批次内部统计,并不是全局统计
val countRDD: RDD[(String, Int)] = rdd
.flatMap(_.split(','))
.map((_, 1))
.reduceByKey(_ + _)
//处理完了返回一个rdd,返回的rdd会构建成新的Dstream
countRDD
})
resultDS.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
将本地代码打包到集群环境中运行
package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo66Submit {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("ds")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
linesDS
.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
/**
*
* spark-submit --master yarn-client --class com.shujia.spark.streaming.Demo6Submit --num-executors 2 spark-1.0.jar
*/
}
}
将输入的数据落盘到磁盘中
package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object Demo77SaveFIle {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("ds")
.master("local[2]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
val countDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
//保存数据到磁盘
//滚动生成新的文件
countDS.saveAsTextFiles("data/stream", "txt")
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
标签:val,DF,落盘,RDD,org,apache,import,spark,ssc 来源: https://www.cnblogs.com/atao-BigData/p/16496768.html