Kafka+SparkStreaming+Mysql实践
作者:互联网
1 场景描述
1.1需求描述:教学平台产品需要实时更新具体课程浏览量。
1.2数据处理:埋点数据到达kafka以后、流计算根据主键更新mysql的数据。
1.3工程结构图
2 主代码
package RealOnline
import java.sql.{DriverManager, ResultSet}
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/*
kafka 造数据:echo "{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758004\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}" | ./kafka-console-producer.sh --broker-list kafka175.data:6667 --sync --topic edu-log-streaming
// val row3="{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}"
// val row2="[{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}{\"code\":\"COURSE_KNOWLEDGE_ADD_BATCH\",\"data\":[{\"courseId\":\"3454828321069758003\",\"id\":\"1\",\"parentId\":\"1\"}],\"time\":1609125827880,\"topic\":\"test\"}]"
*/
object jxpt_teach_activity_review_status {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getILoggerFactory.getLogger(this.getClass.getSimpleName)
logger.warn("WARN")
val sparkConf: SparkConf = new SparkConf()
sparkConf.setAppName(s"${this.getClass.getSimpleName}")
sparkConf.set("spark.streaming.kafka.maxRetries", "1000")
//设置背压参数
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "15000")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//设置任务并发数
sparkConf.set("spark.streaming.concurrentJobs", "8")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.speculation.interval", "1000ms")
sparkConf.set("spark.streaming.unpersist", "true")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("spark.executor.extraJavaOptions","-XX:+UseConcMarkSweepGC");
sparkConf.set("hive.metastore.uris", "thrift://node116.data:9083")
sparkConf.set("spark.sql.shuffle.partitions", "100")
val ssc = new StreamingContext(sparkConf, Seconds(args(0).toInt)) //shell脚本传时间参数
// val ssc = new StreamingContext(sparkConf, Seconds(1))//本代码里自行设置消费间隔时间
val groupId = "jxpt_teach_activity_review_status_1"
val topic = "edu-log-streaming"
//从redis读取偏移量
val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic)
//根据偏移起始点获得数据K
val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,groupId)
//val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId)
//获得偏移结束点
var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd =>
startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
//mysql连接
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "****")
prop.setProperty("password", "****")
val url="jdbc:mysql://****:3306/***?characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true"
//数据入库
//落地表结构为word,count
startupInputGetOffsetDstream.foreachRDD((rdd,batchTime)=>{
if(!rdd.isEmpty())(
rdd.foreachPartition(it=>{
val props = DriverManager.getConnection(url, prop) //这个放这儿不报错不用管,要不就放到foreach里面
it.foreach(row=>{
try{
val rowStr = row.value()
val jsonObj = JSON.parseObject(rowStr)
val message_type = jsonObj.getString("message_type")
if ("KNWLG_RSRC_STAT".equals(message_type)) {
val business_data_type = jsonObj.getString("business_data_type")
val business_operate = jsonObj.getString("business_operate")
val data = jsonObj.getJSONArray("data")
for (index <- 0 to data.size() - 1) {
var knowledge_id = data.getJSONObject(index).getString("knwlg_id")
var resource_type = data.getJSONObject(index).getOrDefault("resource_type", "").toString
var data_operate = data.getJSONObject(index).getOrDefault("data_operate", business_operate).toString
// var knwlg_id_resource_type = ""
//
// knwlg_id_resource_type = knwlg_id + "--" + resource_type
var resource_num = 0
if ("ADD".equals(data_operate)) {
resource_num = 1
} else if ("DEL".equals(data_operate)) {
resource_num = -1
}
val pstm1 = props.prepareStatement("select resource_num from ads_jxpt_knowledge_statistics where knowledge_id=? and resource_type=?")
pstm1.setString(1, knowledge_id)
pstm1.setString(2, resource_type)
val set: ResultSet = pstm1.executeQuery()
//表里有该主键
if (set.next()) {
val hisDat = set.getInt("resource_num")
//相加并更新
val newDat: Int = hisDat + resource_num
val pstm2 = props.prepareStatement("update ads_jxpt_knowledge_statistics set resource_num =? where knowledge_id=? and resource_type =?")
pstm2.setInt(1, newDat)
pstm2.setString(2, knowledge_id)
pstm2.setString(3, resource_type)
pstm2.executeUpdate()
} else {
//无该主键、直接插入
val pstm3 = props.prepareStatement("insert into ads_jxpt_knowledge_statistics(knowledge_id,resource_type,resource_num) values(?,?,?)")
pstm3.setString(1, knowledge_id)
pstm3.setString(2, resource_type)
pstm3.setInt(3, resource_num)
pstm3.executeUpdate()
}
}
}
} catch {
case ex: NullPointerException => println("--数据解析异常!!!")
}
props.close()
})
})
)
OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges)
})
ssc.start()
ssc.awaitTermination()
// ssc.stop()
}
}
3 公共类
3.1 偏移量管理
package RealOnline
import java.util
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis
import scala.collection.immutable.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
object OffsetManager {
/**
* 从Redis中读取偏移量
*
* @param groupId
* @param topic
* @return
*/
def getOffset(groupId: String, topic: String): Map[TopicPartition, Long] = {
var offsetMap = Map[TopicPartition, Long]()
val jedisClient: Jedis = RedisUtil.getJedisClient
val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:" + groupId + ":" + topic)
jedisClient.close()
if (redisOffsetMap != null && redisOffsetMap.isEmpty) {
null
} else {
val redisOffsetList: List[(String, String)] = redisOffsetMap.toList
val kafkaOffsetList: List[(TopicPartition, Long)] = redisOffsetList.map { case (partition, offset) =>
(new TopicPartition(topic, partition.toInt), offset.toLong)
}
kafkaOffsetList.toMap
}
}
/**
* 偏移量写入到Redis中
*
*/
val fromOffsets = collection.mutable.Map[TopicPartition, Long]()
def saveOffset(groupId: String, topic: String, offsetArray: Array[OffsetRange]): Unit = {
if (offsetArray != null && offsetArray.size > 0) {
val offsetMap: Map[String, String] = offsetArray.map { offsetRange =>
val partition: Int = offsetRange.partition
val untilOffset: Long = offsetRange.untilOffset
(partition.toString, untilOffset.toString)
}.toMap
val jedisClient: Jedis = RedisUtil.getJedisClient
//redis选库
jedisClient.select(4)
jedisClient.hmset("offset:" + groupId + ":" + topic, offsetMap)
jedisClient.close()
}
}
}
3.2 kafka信息
package RealOnline
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object MyKafkaUtil {
private val properties: Properties = PropertiesUtil.load("config.properties")
val broker_list = properties.getProperty("kafka.broker.list")
// kafka消费者配置
var kafkaParam = collection.mutable.Map(
"bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> "gmall_consumer_group",
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
//如果是false,会需要手动维护kafka偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 创建DStream,返回接收到的输入数据
// LocationStrategies:根据给定的主题和集群地址创建consumer
// LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
// ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
// ConsumerStrategies.Subscribe:订阅一系列主题
def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
dStream
}
def getKafkaStream(topic: String, ssc: StreamingContext, groupId: String): InputDStream[ConsumerRecord[String, String]] = {
kafkaParam("group.id") = groupId
val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
dStream
}
//选择手动偏移量进行消费
def getKafkaStream(topic: String, ssc: StreamingContext, offsets: Map[TopicPartition, Long], groupId: String): InputDStream[ConsumerRecord[String, String]] = {
kafkaParam("group.id") = groupId
val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam, offsets))
dStream
}
}
3.3 config.properties
# Kafka地址
kafka.broker.list=kafka175.data:6667,kafka176.data:6667,kafka177.data:6667
#Redis测试环境
#redis.host=***
#redis.port=6379
#Redis正式环境
redis.host=****
redis.port=6379
redis.password=****
redis.database=3
redis.timeout=2000
#mysql地址
mysql.host=*****:3306
mysql.user=****
mysql.password=******
标签:resource,String,val,SparkStreaming,topic,Mysql,import,Kafka,data 来源: https://blog.csdn.net/qq_42422698/article/details/114027202