10.5 spark structured streaming在集群模式下运行
作者:互联网
版本spark2.4.0-cdh6.1.1
继10.2 spark structured streaming执行wordcount
打包后放在集群交给yarn运行
展示
输入端
输出端linux节点上
batch的时间可设置
Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).
To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter. For example,
自Spark 2.3中引入的一种新的实验性流执行模式,可实现低(~1 ms)端到端延迟,并且至少具有一次容错保证
支持写入
// Print new data to console noAggDF .writeStream .format("console") .start() // Write new data to Parquet files noAggDF .writeStream .format("parquet") .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start() // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() // Print updated aggregations to console aggDF .writeStream .outputMode("complete") .format("console") .start() // Have all the aggregates in an in-memory table aggDF .writeStream .queryName("aggregates") // this query name will be the table name .outputMode("complete") .format("memory") .start() spark.sql("select * from aggregates").show() // interactively query in-memory table
标签:10.5,start,format,structured,streaming,mode,query,console,writeStream 来源: https://blog.csdn.net/kk25114/article/details/98983798