其他分享
首页 > 其他分享> > sparkstreaming转换算子--窗口函数

sparkstreaming转换算子--窗口函数

作者:互联网

window

package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 滑动窗口算子
 *    每隔一段时间,对原始DStream中多个批次数据整合  成为新的DStream中一个批次数据
 *
 * Spark Streaming中,一个批次执行一次,不会积攒当前批次的数据。滑动窗口算子可以实现将多个批次数据积攒下来,然后再去做统一的运算
 *   窗口算子最为基础核心的算子 window 会给我们返回一个新的DStream,但是这个DStream包含多个未被处理的批次数据
 *      窗口函数中需要传递核心参数
 *      windowDuration: Duration,  窗口时间长度--一般是batchSize(批次时间)的整数倍
 *      slideDuration: Duration:  滑动时间长度----一般是batchSize(批次时间)的整数倍
 */object ByWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
    val ssc = new StreamingContext(conf, Milliseconds(10000))

    val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
    // 窗口长度 30s  滑动间隔 10s  每个10s时间将DStream中前30秒的数据 整合为一个批次数据处理
    val ds1 = ds.window(Seconds(10), Seconds(10))
    ds1.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 使用window窗口函数算子 严格意义上只负责去对原始DStream进行窗口检测,形成窗口批次数据的DStream,如果我们要对窗口批次数据
 * 进行处理的话,还得需要对窗口批次数据的DStream使用转换算子和行动算子计算逻辑
 *
 * windows函数也有一些变种的窗口函数算子:既可以实现窗口批次数据的检测,也可以实现一些相关的计算功能
 *   countByWindow 对每个滑动窗口的数据执行count操作
 *   reduceByWindow 对每个滑动窗口的数据执行reduce操作
 *   reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
 *   countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
 */
object ByWindow2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state02").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(10000))
    ssc.checkpoint("hdfs://node1:9000/sparkstreaming")

    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)
    val ds1 = ds.map((_, 1))
    val ds2 = ds1.reduceByKeyAndWindow((a: Int, b: Int)=>(a+b), Seconds(10), Seconds(10))
    ds2.print()
    println(",,,,,,,,,,")
    val ds3: DStream[Long] = ds1.countByWindow(Seconds(10), Seconds(10))
    ds3.print()
    println(",,,,,,,,,,")
    val ds4 = ds1.reduceByWindow((a, b) => (a._1+b._1, 0), Seconds(10), Seconds(10))
    ds4.print()
    println(",,,,,,,,,,")
    // 需要设置检查点
    val ds5 = ds1.countByValueAndWindow(Seconds(10), Seconds(10))
    ds5.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

应用场景:黑名单

package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 黑名单统计
 *   有市场就有竞争,有竞争就少不来邪门外道
 *   A厂家投放了广告,广告每点击一次都是有记录的,但是不排初竞争对手的恶意点击
 *
 *   实时统计黑名单用户
 *   网站每隔3秒记录一批次用户的点击行为,记录的时候,认定如果在1分钟之内 用户点击次数超过10次 认定这个用户是一个黑名单用户
 *   需要把用户IP封掉
 *
 *   Spark Streaming去连接端口数据源:
 *     端口模拟用户的点击行为  发送数字 数字就代表某一个用户id
 */
object BlackUser {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state01").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(3000))
    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)

    val ds1:DStream[String] = ds.window(Minutes(1),Minutes(1))
    val ds2:DStream[(String,Int)] = ds1.map((_, 1)).reduceByKey(_ + _)
    //保留黑名单用户
    val ds3:DStream[(String,Int)] = ds2.filter(tuple=>{
      if(tuple._2>=10){
        true
      }else{
        false
      }
    })
    ds3.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

标签:10,窗口,val,--,sparkstreaming,Seconds,算子,DStream,ssc
来源: https://www.cnblogs.com/jsqup/p/16649421.html