【Spark Streaming Kafka】Spark流处理消费Kafka数据示例代码
作者:互联网
代码
package test import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.log4j.{Level, Logger} import java.util.Date /** * @Author yu * @Date 2022/6/8 22:45 * @Description * @Ref */ object spark_kafka { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN) //初始化StreamingContext val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka stream") //.setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(10)) // 配置相关的kafka消费者配置 var kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.GROUP_ID_CONFIG -> "console-group1", //这个一定要写成这样,而不能直接写true ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) //ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG-> 60000 //ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> 60000 ) //如果SparkStreaming设置的批处理时长超过kafka的心跳会话时间(默认30s), //那么适当的调大下面的KafkaConsumer以下的参数值,防止超时time-out,可以使用ConsumerConfig.XXX来配置也是极好的 kafkaParams = kafkaParams.+("heartbeat.interval.ms" -> "30000") kafkaParams = kafkaParams.+("session.timeout.ms" -> "60000") //TODO 对于5分钟的批处理,我们需要更改Broker上的group.max.session.timeout.ms配置 //获取到kafka相关的数据流,InputDStream[] extends DStream[] val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, //指定当前上下文环境 LocationStrategies.PreferConsistent, //大多数情况下使用这个策略 ConsumerStrategies.Subscribe[String, String](List("test1"), kafkaParams) //添加相关的消费策略 ) //在源流的基础上作相应的算子操作 val source: DStream[String] = kafkaStream.map(record => record.value()) source.cache() source.print(100) source.foreachRDD(rdd => { println("rdd " + rdd.id + " ------ " + new Date().getTime()) // val sdf = new java.text.SimpleDateFormat("yyyyMMdd") // val date = sdf.format(new Date()) // rdd.coalesce(1).saveAsTextFile("/tmp/kafka/test1/" + date + "/" + java.util.UUID.randomUUID()) }) println("source count: " + source.count().print()) val aggrStream: DStream[(String, Int)] = source.flatMap(_.split("\\s")) .map((_, 1)) .reduceByKey(_ + _) aggrStream.print() kafkaStream.foreachRDD(rdd => { if (rdd.count() > 0) { // offset val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //打印offset rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } println("=============================") // 等输出操作完成后提交offset kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //开始计算 ssc.start() ssc.awaitTermination() } }
POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>groupId</groupId> <artifactId>spark-streaming-kafka</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spark.version>2.4.5</spark.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <!-- 本地运行,注释掉此scope配置 --> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> <!-- 本地运行,注释掉此scope配置 --> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <!--java打包插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!--scala打包插件--> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <id>scala-compile-first</id> <goals> <goal>compile</goal> </goals> <configuration> <includes> <include>**/*.scala</include> </includes> </configuration> </execution> </executions> </plugin> <!--将依赖打入jar包--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
结果截图:
标签:ConsumerConfig,String,示例,Kafka,apache,org,Spark,kafka,spark 来源: https://www.cnblogs.com/144823836yj/p/16400015.html