其他分享
首页 > 其他分享> > spark streaming-DS,DF,RDD相互转换,submit,数据落盘

spark streaming-DS,DF,RDD相互转换,submit,数据落盘

作者:互联网

spark streaming

DS转成DF写代码

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}

object Demo44DStreamTORDDAndDF {
  def main(args: Array[String]): Unit = {
    /**
     * 创建sparkSession
     */
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    /**
     * 读取一个socket得到一个ds
     *
     */
    val lineDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * DStream底层也是一个RDD,每隔一段时间将接收到的数据封装成一个 RDD。
     * 每隔5秒一个rdd,rdd中的数据是不一样的
     *
     * 转换成rdd之后就不能使用有状态算子
     *
     */
    lineDS.foreachRDD((rdd: RDD[String]) => {
      println("正在处理数据")
      //在这里可以写rdd代码
      rdd
        .flatMap(_.split(","))
        .map((_, 1))
        .reduceByKey(_ + _)
        //.foreach(println)

      /**
       * rdd可以转换成DF,就可以写sql了
       */
      val linesDF: DataFrame = rdd.toDF("line")

      linesDF.createOrReplaceTempView("lines")

      val countDF: DataFrame = spark.sql(
        """
          |select word,count(1) as c from (
          |select explode(split(line,',')) as word
          |from lines
          |) as a
          |group by word
          |
          |""".stripMargin)

      countDF.show()

    })
      
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

RDD转成写DS代码

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo55RDDToDStream {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * transform:每隔5秒传入一个rdd,在里面使用一个rdd的api处理数据
     * 处理完之后再返回一个rdd
     *
     */
    val resultDS: DStream[(String, Int)] = linesDS.transform((rdd:RDD[String]) =>{
      //rdd的计算是一个批次内部统计,并不是全局统计
      val countRDD: RDD[(String, Int)] = rdd
      .flatMap(_.split(','))
      .map((_, 1))
      .reduceByKey(_ + _)

      //处理完了返回一个rdd,返回的rdd会构建成新的Dstream
      countRDD
    })
    
    resultDS.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

将本地代码打包到集群环境中运行

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo66Submit {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder()
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    linesDS
      .flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()


    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

    /**
     *
     * spark-submit --master yarn-client --class com.shujia.spark.streaming.Demo6Submit --num-executors 2 spark-1.0.jar
     */
  }
}

将输入的数据落盘到磁盘中

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo77SaveFIle {
  def main(args: Array[String]): Unit = {

  val spark: SparkSession = SparkSession
    .builder()
    .appName("ds")
    .master("local[2]")
    .config("spark.sql.shuffle.partitions", 1)
    .getOrCreate()

  val sc: SparkContext = spark.sparkContext

  val ssc = new StreamingContext(sc, Durations.seconds(5))

  val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

  val countDS: DStream[(String, Int)] = linesDS
    .flatMap(_.split(","))
    .map((_, 1))
    .reduceByKey(_ + _)

  //保存数据到磁盘
  //滚动生成新的文件
  countDS.saveAsTextFiles("data/stream", "txt")

  ssc.start()
  ssc.awaitTermination()
  ssc.stop()

}
}

标签:val,DF,落盘,RDD,org,apache,import,spark,ssc
来源: https://www.cnblogs.com/atao-BigData/p/16496768.html