其他分享
首页 > 其他分享> > 【Spark Streaming Kafka】Spark流处理消费Kafka数据示例代码

【Spark Streaming Kafka】Spark流处理消费Kafka数据示例代码

作者:互联网

代码

package test

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}

import java.util.Date

/**
 * @Author yu
 * @Date 2022/6/8 22:45
 * @Description
 * @Ref
 */
object spark_kafka {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)

    //初始化StreamingContext
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka stream") //.setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    // 配置相关的kafka消费者配置
    var kafkaParams = Map[String, Object](

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.GROUP_ID_CONFIG -> "console-group1",
      //这个一定要写成这样,而不能直接写true
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
      //ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG-> 60000
      //ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> 60000
    )

    //如果SparkStreaming设置的批处理时长超过kafka的心跳会话时间(默认30s),
    //那么适当的调大下面的KafkaConsumer以下的参数值,防止超时time-out,可以使用ConsumerConfig.XXX来配置也是极好的
    kafkaParams = kafkaParams.+("heartbeat.interval.ms" -> "30000")
    kafkaParams = kafkaParams.+("session.timeout.ms" -> "60000")
    //TODO 对于5分钟的批处理,我们需要更改Broker上的group.max.session.timeout.ms配置


    //获取到kafka相关的数据流,InputDStream[] extends DStream[]
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc, //指定当前上下文环境
      LocationStrategies.PreferConsistent, //大多数情况下使用这个策略
      ConsumerStrategies.Subscribe[String, String](List("test1"), kafkaParams) //添加相关的消费策略
    )

    //在源流的基础上作相应的算子操作
    val source: DStream[String] = kafkaStream.map(record => record.value())

    source.cache()

    source.print(100)



    source.foreachRDD(rdd => {
      println("rdd " + rdd.id + " ------ " + new Date().getTime())
      //      val sdf = new java.text.SimpleDateFormat("yyyyMMdd")
      //      val date = sdf.format(new Date())
      //      rdd.coalesce(1).saveAsTextFile("/tmp/kafka/test1/" + date + "/" + java.util.UUID.randomUUID())
    })

    println("source count: " + source.count().print())
    val aggrStream: DStream[(String, Int)] = source.flatMap(_.split("\\s"))
      .map((_, 1))
      .reduceByKey(_ + _)

    aggrStream.print()

    kafkaStream.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        // offset
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //打印offset
        rdd.foreachPartition { iter =>
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        println("=============================")
        // 等输出操作完成后提交offset
        kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    //开始计算
    ssc.start()
    ssc.awaitTermination()
  }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>spark-streaming-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spark.version>2.4.5</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- 本地运行,注释掉此scope配置 -->
            <!-- <scope>provided</scope> -->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- 本地运行,注释掉此scope配置 -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!--java打包插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--scala打包插件-->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--将依赖打入jar包-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

结果截图:

 

标签:ConsumerConfig,String,示例,Kafka,apache,org,Spark,kafka,spark
来源: https://www.cnblogs.com/144823836yj/p/16400015.html