其他分享
首页 > 其他分享> > 状态流累加

状态流累加

作者:互联网

object Socket_Streaming_State extends App {
val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setMaster(“local[*]”).setAppName(“hdfs”).set(“spark.executor memory”,“4g”))
val ssc = new StreamingContext(sc,Seconds(2))
//必须要设置检查点
ssc.checkpoint(“file:///D:/Practise/chekpoint”)
val inputDstream= ssc.socketTextStream(“192.168.56.100”,9999)
val wordDstream: DStream[String] = inputDstream.flatMap(.split(" "))
val wordAndOneDstream: DStream[(String, Int)] = wordDstream.map((
,1))
//updateSateByKey是以DStream中的数据进行按key做reduce操作
// 参数是当前value(每个key新增的值的集合 如:hello [1,1,2])和上次value 如hello:5
//返回some
def updateFunc(currentValue:Seq[Int],preValue:Option[Int])={
val currsum= currentValue.sum
val pre = preValue.getOrElse(0)
Some(currsum+pre)
}

val value: DStream[(String, Int)] = wordAndOneDstream.updateStateByKey(updateFunc)
value.saveAsTextFiles("","")

ssc.start()
ssc.awaitTermination()

}

标签:状态,String,val,Int,value,累加,DStream,ssc
来源: https://blog.csdn.net/aosimth/article/details/112299247