其他分享
首页 > 其他分享> > trigger:使用structuredStreaming实时计算

trigger:使用structuredStreaming实时计算

作者:互联网

使用trigger

package com.qf.sparkstreaming.day04


import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger

/**
 * trigger函数:
 *   sparkStreaming是一个准实时的计算框架,微批处理
 *   structuredStreaming是一个实时的计算框架,但是底层使用的sparksql的api,
 *   并且是sparkStreaming的进化版,比微批处理更快,也有微小的时间段,最快可以达到 `100ms` 左右的端到端延迟。
 *   而使用trigger函数可以做到1ms的端到端延迟。
 */

object _09Trigger {
    def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
        session.sparkContext.setLogLevel("ERROR")

        //作为消费者,从kafka读取数据,获取到的数据有schema,
        // 分别是 key|value|topic|partition|offset|timestamp|timestampType|
        val frame: DataFrame = session.readStream.format("kafka")
          .option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
          //          .option("startingOffsets","earliest")
          .option("subscribe","pet").load()

        //处理一下数据
        val frame1: DataFrame = frame.selectExpr("cast(value as String)")

        //保存到kafka中
        frame1.writeStream
          .format("console")
          .trigger(Trigger.ProcessingTime(0))
          .start()
          .awaitTermination()
    }
}

标签:option,val,trigger,structuredStreaming,9092,kafka,session,实时
来源: https://blog.csdn.net/qq_50166024/article/details/110942303