(63)ADS 聚合以及可视化
作者:互联网
第1章 需求分析及实现思路
1.1 需求分析
以
热门品牌统计
为例,将数据写入到 ads 层,然后根据各种报表及可视化来生成统计数
据。通常这些报表及可视化都是基于某些维度的汇总统计。
热门商品统计(作业)
热门品类统计(作业)
交易用户性别对比(作业)
交易用户年龄段对比(作业)
交易额省市分布(作业)
1.2 业务流程图
1.3 实现思路
1.3.1 功能 1:ADS 层写入
1.3.2 功能 2
:发布查询接口
1.3.3 功能 3
:可视化查询
第2章 功能 1:ADS 层写入
2.1 分析
ads 层,主要是根据各种报表及可视化来生成统计数据。通常这些报表及可视化都是基于某
些维度的汇总统计。
统计表分为三个部分:时间点、维度、度量
时间点:即统计结果产生的时间,或者本批次数据中业务日期最早的时间。
维度:统计维度,比如地区、商品名称、性别
度量:汇总的数据,比如金额、数量
每个批次进行一次聚合,根据数据的及时性要求,可以调整批次的时间长度,将聚合后的结
果一波一波的存放到数据库中。
2.2 数据库的选型与难点
聚合数据本身并不麻烦,利用 reducebykey 或者 groupbykey 都可以聚合,但是麻烦
的是实现精确性一次消费。因为聚合数据不是明细,没有确定的主键,所以没有办法实现幂
等。那么如果想实现精确一次消费,就要考虑利用关系型数据库的事务处理。
用本地事务管理最大的问题是数据保存操作要放在 driver 端变成单线程操作,性能降
低。 但是由于本业务保存的是聚合后的数据所以数据量并不大,即使单线程保存也是可以
接受的,因此
数据库
和
偏移量
选用 mysql 进行保存。
2.3 代码实现
在 gmall2020-realtime 中编写代码
2.3.1 在 pom.xml 文件中添加相关依赖
2.3.2 数据库以及表准备
(1)创建 gmall2020_rs 库
(2)创建保存偏移量的表 offset_2020
(3)创建保存品牌聚合结果的表
2.3.3 创建相关工具类 (1)创建查询 MySQL 数据库的工具类 MySQLUtil
package com.atguigu.gmall.realtime.utils
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData,
Statement}
import com.alibaba.fastjson.JSONObject
import scala.collection.mutable.ListBuffer
/**
* Author: Felix
* Desc: 查询 MySQL 工具类
*/
object MySQLUtil {
def main(args: Array[String]): Unit = {
val list: List[ JSONObject] = queryList("select * from offset_2020")
println(list)
}
def queryList(sql:String):List[JSONObject]={
Class.forName("com.mysql.jdbc.Driver")
val resultList: ListBuffer[JSONObject] = new ListBuffer[ JSONObject]()
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://hadoop202:3306/gmall2020_rs?characterEncoding=utf-8&useSSL=fal
se",
"root",
"123456")
val stat: Statement = conn.createStatement
println(sql)
val rs: ResultSet = stat.executeQuery(sql)
val md: ResultSetMetaData = rs.getMetaData
while ( rs.next ) {
val rowData = new JSONObject();
for (i <-1 to md.getColumnCount ) {
rowData.put(md.getColumnName(i), rs.getObject(i))
}
resultList+=rowData
}
stat.close()
conn.close()
resultList.toList
} }
(2)读取 MySQL 中偏移量的工具类 OffsetManagerM
package com.atguigu.gmall.realtime.utils
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.common.TopicPartition
/**
* Author: Felix
* Desc: 读取 MySQL 中偏移量的工具类
*/
object OffsetManagerM {
/**
* 从 Mysql 中读取偏移量
* @param consumerGroupId
* @param topic
* @return
*/
def getOffset(topic: String, consumerGroupId: String): Map[TopicPartition,
Long] = {
val sql=" select group_id,topic,topic_offset,partition_id from offset_2020
" +
" where topic='"+topic+"' and group_id='"+consumerGroupId+"'"
val jsonObjList: List[JSONObject] = MySQLUtil.queryList(sql)
val topicPartitionList: List[(TopicPartition, Long)] = jsonObjList.map {
jsonObj =>{
val topicPartition: TopicPartition = new TopicPartition(topic,
jsonObj.getIntValue("partition_id"))
val offset: Long = jsonObj.getLongValue("topic_offset")
(topicPartition, offset)
}
}
val topicPartitionMap: Map[TopicPartition, Long] = topicPartitionList.toMap
topicPartitionMap
} }
2.3.4 在 OrderWideApp 中将数据写回 Kafka
import sparkSession.implicits._
orderWideWithSplitDstream.foreachRDD{
rdd=>{
rdd.cache()
//将数据保存到 ClickHouse
val df: DataFrame = rdd.toDF()
df.write.mode(SaveMode.Append)
.option("batchsize", "100")
.option("isolationLevel", "NONE") // 设置事务
.option("numPartitions", "4") // 设置并发
.option("driver","ru.yandex.clickhouse.ClickHouseDriver")
.jdbc("jdbc:clickhouse://hadoop202:8123/default","t_order_wide_2020",ne
w Properties())
//将数据写回到 Kafka
rdd.foreach{orderWide=>
MyKafkaSink.send("dws_order_wide", JSON.toJSONString(orderWide,new
SerializeConfig(true)))
}
//提交偏移量
OffsetManagerUtil.saveOffset(orderInfoTopic,orderInfoGroupId,orderInfoOffset
Ranges)
OffsetManagerUtil.saveOffset(orderDetailTopic,orderDetailGroupId,orderDetail
OffsetRanges)
} }
ssc.awaitTermination()
2.3.5 关于本地事务保存 MySql
我们在处理事务的时候引用了一个 scala 的 MySQL 工具:scalikeJdbc
(1)读取配置文件
默认从 classpath 下读取 application.conf,获取数据库连接信息,所以我们在
resources 下创建 application.conf 文件
(2)程序中加载配置 DBs.setup() (3)本地事务提交数据 凡是在 DB. localTx ( implicit session => { } )中的 SQL 全部被本地事务进行关联,一 条失败全部回滚 DB.localTx( implicit session => { SQL 1 SQL 2 } ) 2.3.6 创建 TrademarkStatApp(ads)
2.3.7 测试 ➢ 确保相关服务启动 Hdfs、ZK、Kafka、Redis、Hbase、Maxwell ➢ 运行 BaseDBMaxwellApp、OrderInfoApp、OrderDetailApp、OrderWideApp、 TrademarkStatApp,运行模拟生成业务数据 jar 包 ➢ 抛出异常查看数据结果,注释掉异常查看数据库表结果
标签:ADS,val,JSONObject,偏移量,topic,63,可视化,offset,2.3 来源: https://blog.csdn.net/song_quan_/article/details/119344273