其他分享
首页 > 其他分享> > 10.5 spark structured streaming在集群模式下运行

10.5 spark structured streaming在集群模式下运行

作者:互联网

版本spark2.4.0-cdh6.1.1

继10.2 spark structured streaming执行wordcount

打包后放在集群交给yarn运行

展示

输入端

输出端linux节点上

batch的时间可设置

Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).

To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter. For example,

 

自Spark 2.3中引入的一种新的实验性流执行模式,可实现低(~1 ms)端到端延迟,并且至少具有一次容错保证

支持写入

 

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table

 

标签:10.5,start,format,structured,streaming,mode,query,console,writeStream
来源: https://blog.csdn.net/kk25114/article/details/98983798