数据库
首页 > 数据库> > Kafka+SparkStreaming+Mysql实践

Kafka+SparkStreaming+Mysql实践

作者:互联网

1 场景描述

1.1需求描述:教学平台产品需要实时更新具体课程浏览量。
1.2数据处理:埋点数据到达kafka以后、流计算根据主键更新mysql的数据。
1.3工程结构图
在这里插入图片描述

2 主代码

package RealOnline

import java.sql.{DriverManager, ResultSet}

import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/*
kafka 造数据:echo "{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758004\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}" | ./kafka-console-producer.sh  --broker-list kafka175.data:6667 --sync --topic edu-log-streaming
//          val row3="{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}"
//          val row2="[{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}]"

 */
object jxpt_teach_activity_review_status {
  def main(args: Array[String]): Unit = {
    val logger = LoggerFactory.getILoggerFactory.getLogger(this.getClass.getSimpleName)
    logger.warn("WARN")
    val sparkConf: SparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.set("spark.streaming.kafka.maxRetries", "1000")
    //设置背压参数
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "15000")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //设置任务并发数
    sparkConf.set("spark.streaming.concurrentJobs", "8")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.speculation.interval", "1000ms")
    sparkConf.set("spark.streaming.unpersist", "true")
    sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
    sparkConf.set("spark.executor.extraJavaOptions","-XX:+UseConcMarkSweepGC");
    sparkConf.set("hive.metastore.uris", "thrift://node116.data:9083")
    sparkConf.set("spark.sql.shuffle.partitions", "100")


    val ssc = new StreamingContext(sparkConf, Seconds(args(0).toInt))  //shell脚本传时间参数
//    val ssc = new StreamingContext(sparkConf, Seconds(1))//本代码里自行设置消费间隔时间
    val groupId = "jxpt_teach_activity_review_status_1"
    val topic = "edu-log-streaming"


    //从redis读取偏移量
    val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic)

    //根据偏移起始点获得数据K
    val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,groupId)
    //val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId)

    //获得偏移结束点
    var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd =>
      startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }
    //mysql连接
    val prop = new java.util.Properties
    prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    prop.setProperty("user", "****")
    prop.setProperty("password", "****")
    val url="jdbc:mysql://****:3306/***?characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true"

    //数据入库
    //落地表结构为word,count
    startupInputGetOffsetDstream.foreachRDD((rdd,batchTime)=>{

          if(!rdd.isEmpty())(
            rdd.foreachPartition(it=>{
              val props = DriverManager.getConnection(url, prop)   //这个放这儿不报错不用管,要不就放到foreach里面
              it.foreach(row=>{
                try{
                val rowStr = row.value()
                val jsonObj = JSON.parseObject(rowStr)
                val message_type = jsonObj.getString("message_type")

                  if ("KNWLG_RSRC_STAT".equals(message_type)) {
                    val business_data_type = jsonObj.getString("business_data_type")
                    val business_operate = jsonObj.getString("business_operate")
                    val data = jsonObj.getJSONArray("data")
                    for (index <- 0 to data.size() - 1) {

                      var knowledge_id = data.getJSONObject(index).getString("knwlg_id")
                      var resource_type = data.getJSONObject(index).getOrDefault("resource_type", "").toString
                      var data_operate = data.getJSONObject(index).getOrDefault("data_operate", business_operate).toString

//                      var knwlg_id_resource_type = ""
//
//                      knwlg_id_resource_type = knwlg_id + "--" + resource_type

                      var resource_num = 0
                      if ("ADD".equals(data_operate)) {
                        resource_num = 1
                      } else if ("DEL".equals(data_operate)) {
                        resource_num = -1
                      }

                      val pstm1 = props.prepareStatement("select resource_num from ads_jxpt_knowledge_statistics  where knowledge_id=? and resource_type=?")
                      pstm1.setString(1, knowledge_id)
                      pstm1.setString(2, resource_type)
                      val set: ResultSet = pstm1.executeQuery()
                      //表里有该主键
                      if (set.next()) {
                        val hisDat = set.getInt("resource_num")
                        //相加并更新
                        val newDat: Int = hisDat + resource_num

                        val pstm2 = props.prepareStatement("update  ads_jxpt_knowledge_statistics set resource_num =? where knowledge_id=? and resource_type =?")
                        pstm2.setInt(1, newDat)
                        pstm2.setString(2, knowledge_id)
                        pstm2.setString(3, resource_type)
                        pstm2.executeUpdate()

                      } else {
                        //无该主键、直接插入
                        val pstm3 = props.prepareStatement("insert into ads_jxpt_knowledge_statistics(knowledge_id,resource_type,resource_num) values(?,?,?)")
                        pstm3.setString(1, knowledge_id)
                        pstm3.setString(2, resource_type)
                        pstm3.setInt(3, resource_num)
                        pstm3.executeUpdate()
                      }


                    }
                  }
                } catch {
                  case ex: NullPointerException => println("--数据解析异常!!!")
                }
                props.close()
              })

            })

          )


      OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges)
    })


    ssc.start()
    ssc.awaitTermination()
//    ssc.stop()

    }
}

3 公共类

3.1 偏移量管理

package RealOnline

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis
import scala.collection.immutable.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

object OffsetManager {


  /**
   * 从Redis中读取偏移量
   *
   * @param groupId
   * @param topic
   * @return
   */
  def getOffset(groupId: String, topic: String): Map[TopicPartition, Long] = {
    var offsetMap = Map[TopicPartition, Long]()

    val jedisClient: Jedis = RedisUtil.getJedisClient

    val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:" + groupId + ":" + topic)
    jedisClient.close()
    if (redisOffsetMap != null && redisOffsetMap.isEmpty) {
      null
    } else {

      val redisOffsetList: List[(String, String)] = redisOffsetMap.toList

      val kafkaOffsetList: List[(TopicPartition, Long)] = redisOffsetList.map { case (partition, offset) =>
        (new TopicPartition(topic, partition.toInt), offset.toLong)
      }
      kafkaOffsetList.toMap
    }
  }

  /**
   * 偏移量写入到Redis中
   *
   */
  val fromOffsets = collection.mutable.Map[TopicPartition, Long]()

  def saveOffset(groupId: String, topic: String, offsetArray: Array[OffsetRange]): Unit = {

    if (offsetArray != null && offsetArray.size > 0) {
      val offsetMap: Map[String, String] = offsetArray.map { offsetRange =>
        val partition: Int = offsetRange.partition
        val untilOffset: Long = offsetRange.untilOffset
        (partition.toString, untilOffset.toString)
      }.toMap

      val jedisClient: Jedis = RedisUtil.getJedisClient
      //redis选库
      jedisClient.select(4)
      jedisClient.hmset("offset:" + groupId + ":" + topic, offsetMap)
      jedisClient.close()
    }


  }


}

3.2 kafka信息

package RealOnline

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {
  private val properties: Properties = PropertiesUtil.load("config.properties")
  val broker_list = properties.getProperty("kafka.broker.list")

  // kafka消费者配置
  var kafkaParam = collection.mutable.Map(
    "bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    //用于标识这个消费者属于哪个消费团体
    "group.id" -> "gmall_consumer_group",
    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",
    //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
    //如果是false,会需要手动维护kafka偏移量
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  // 创建DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建consumer
  // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
  // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题


  def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
    dStream
  }


  def getKafkaStream(topic: String, ssc: StreamingContext, groupId: String): InputDStream[ConsumerRecord[String, String]] = {
    kafkaParam("group.id") = groupId
    val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
    dStream
  }

  //选择手动偏移量进行消费
  def getKafkaStream(topic: String, ssc: StreamingContext, offsets: Map[TopicPartition, Long], groupId: String): InputDStream[ConsumerRecord[String, String]] = {
    kafkaParam("group.id") = groupId
    val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam, offsets))
    dStream
  }
}

3.3 config.properties

# Kafka地址
kafka.broker.list=kafka175.data:6667,kafka176.data:6667,kafka177.data:6667
#Redis测试环境
#redis.host=***
#redis.port=6379
#Redis正式环境
redis.host=****
redis.port=6379
redis.password=****
redis.database=3
redis.timeout=2000
#mysql地址
mysql.host=*****:3306
mysql.user=****
mysql.password=******

标签:resource,String,val,SparkStreaming,topic,Mysql,import,Kafka,data
来源: https://blog.csdn.net/qq_42422698/article/details/114027202