其他分享
首页 > 其他分享> > sparkStreaming

sparkStreaming

作者:互联网

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 创建一个本地模式的StreamingContext, 两个工作线程, 1s的批处理间隔
//Master要求2个核,以防出现饥饿情况
object Socket {
def main(args: Array[String]): Unit = {
// Spark配置项
val conf = new SparkConf().setAppName("Socket").setMaster("local[*]")

// 创建流式上下文,1s批处理间隔
val ssc = new StreamingContext(conf, Seconds(1))
// 创建一个DStream,链接指定的hostname: prot, 比如localhost: 9999
val lines = ssc.socketTextStream("localhost", 9999)
// 将收到的每条信息分割成词语
val words = lines.flatMap(_.split(" "))
// 统计每个batch的词频
val pairs = words.map(word => (word, 1))

// 词频汇总
val WordCounts = pairs.reduceByKey(_+_)

// 打印从Dstream中生成的RDD的前10个元素到控制台
WordCounts.print()
ssc.start() //开始计算
ssc.awaitTermination() //等待计算结束
}

}

标签:Socket,val,sparkStreaming,词频,words,StreamingContext,ssc
来源: https://www.cnblogs.com/tan2022/p/14962772.html