其他分享
首页 > 其他分享> > (63)ADS 聚合以及可视化

(63)ADS 聚合以及可视化

作者:互联网

第1章 需求分析及实现思路 1.1 需求分析 热门品牌统计 为例,将数据写入到 ads 层,然后根据各种报表及可视化来生成统计数 据。通常这些报表及可视化都是基于某些维度的汇总统计。 热门商品统计(作业) 热门品类统计(作业) 交易用户性别对比(作业) 交易用户年龄段对比(作业) 交易额省市分布(作业) 1.2 业务流程图

1.3 实现思路 1.3.1 功能 1:ADS 层写入 1.3.2 功能 2 :发布查询接口 1.3.3 功能 3 :可视化查询 第2章 功能 1:ADS 层写入 2.1 分析 ads 层,主要是根据各种报表及可视化来生成统计数据。通常这些报表及可视化都是基于某 些维度的汇总统计。 统计表分为三个部分:时间点、维度、度量 时间点:即统计结果产生的时间,或者本批次数据中业务日期最早的时间。 维度:统计维度,比如地区、商品名称、性别 度量:汇总的数据,比如金额、数量 每个批次进行一次聚合,根据数据的及时性要求,可以调整批次的时间长度,将聚合后的结 果一波一波的存放到数据库中。 2.2 数据库的选型与难点 聚合数据本身并不麻烦,利用 reducebykey 或者 groupbykey 都可以聚合,但是麻烦 的是实现精确性一次消费。因为聚合数据不是明细,没有确定的主键,所以没有办法实现幂 等。那么如果想实现精确一次消费,就要考虑利用关系型数据库的事务处理。 用本地事务管理最大的问题是数据保存操作要放在 driver 端变成单线程操作,性能降 低。 但是由于本业务保存的是聚合后的数据所以数据量并不大,即使单线程保存也是可以 接受的,因此 数据库偏移量 选用 mysql 进行保存。 2.3 代码实现 在 gmall2020-realtime 中编写代码 2.3.1 在 pom.xml 文件中添加相关依赖

2.3.2 数据库以及表准备 (1)创建 gmall2020_rs 库

(2)创建保存偏移量的表 offset_2020

 (3)创建保存品牌聚合结果的表

 

2.3.3 创建相关工具类 (1)创建查询 MySQL 数据库的工具类 MySQLUtil
package com.atguigu.gmall.realtime.utils
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
Statement}
import com.alibaba.fastjson.JSONObject
import scala.collection.mutable.ListBuffer
/**
 * Author: Felix
 * Desc: 查询 MySQL 工具类
 */
object MySQLUtil {
 def main(args: Array[String]): Unit = {
 val list: List[ JSONObject] = queryList("select * from offset_2020")
 println(list)
 }
 def queryList(sql:String):List[JSONObject]={
 Class.forName("com.mysql.jdbc.Driver")
 val resultList: ListBuffer[JSONObject] = new ListBuffer[ JSONObject]()
 val conn: Connection = DriverManager.getConnection(
 
"jdbc:mysql://hadoop202:3306/gmall2020_rs?characterEncoding=utf-8&useSSL=fal
se",
 "root",
 "123456")
 val stat: Statement = conn.createStatement
 println(sql)
 val rs: ResultSet = stat.executeQuery(sql)
 val md: ResultSetMetaData = rs.getMetaData
 while ( rs.next ) {

 val rowData = new JSONObject();
 for (i <-1 to md.getColumnCount ) {
 rowData.put(md.getColumnName(i), rs.getObject(i))
 }
 resultList+=rowData
 }
 stat.close()
 conn.close()
 resultList.toList
 } }
(2)读取 MySQL 中偏移量的工具类 OffsetManagerM
package com.atguigu.gmall.realtime.utils
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.common.TopicPartition
/**
 * Author: Felix
 * Desc: 读取 MySQL 中偏移量的工具类
 */
object OffsetManagerM {
 /**
 * 从 Mysql 中读取偏移量
 * @param consumerGroupId
 * @param topic
 * @return
 */
 def getOffset(topic: String, consumerGroupId: String): Map[TopicPartition, 
Long] = {
 val sql=" select group_id,topic,topic_offset,partition_id from offset_2020 
" +
 " where topic='"+topic+"' and group_id='"+consumerGroupId+"'"
 val jsonObjList: List[JSONObject] = MySQLUtil.queryList(sql)
 val topicPartitionList: List[(TopicPartition, Long)] = jsonObjList.map {
 jsonObj =>{
 val topicPartition: TopicPartition = new TopicPartition(topic, 
 
jsonObj.getIntValue("partition_id"))
 val offset: Long = jsonObj.getLongValue("topic_offset")
 (topicPartition, offset)
 }
 }
 val topicPartitionMap: Map[TopicPartition, Long] = topicPartitionList.toMap
 topicPartitionMap
 } }
2.3.4 在 OrderWideApp 中将数据写回 Kafka
import sparkSession.implicits._
orderWideWithSplitDstream.foreachRDD{
 rdd=>{
 rdd.cache()
 //将数据保存到 ClickHouse
 val df: DataFrame = rdd.toDF()
 df.write.mode(SaveMode.Append)
 .option("batchsize", "100")
 .option("isolationLevel", "NONE") // 设置事务
 .option("numPartitions", "4") // 设置并发
 .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
 .jdbc("jdbc:clickhouse://hadoop202:8123/default","t_order_wide_2020",ne
w Properties())
 
 //将数据写回到 Kafka
 rdd.foreach{orderWide=>
 MyKafkaSink.send("dws_order_wide", JSON.toJSONString(orderWide,new 
SerializeConfig(true)))
 }
 //提交偏移量
 
OffsetManagerUtil.saveOffset(orderInfoTopic,orderInfoGroupId,orderInfoOffset
Ranges)
 
OffsetManagerUtil.saveOffset(orderDetailTopic,orderDetailGroupId,orderDetail
OffsetRanges)
 } }

ssc.awaitTermination()
2.3.5 关于本地事务保存 MySql 我们在处理事务的时候引用了一个 scala 的 MySQL 工具:scalikeJdbc (1)读取配置文件 默认从 classpath 下读取 application.conf,获取数据库连接信息,所以我们在 resources 下创建 application.conf 文件

 

(2)程序中加载配置 DBs.setup() (3)本地事务提交数据 凡是在 DB. localTx ( implicit session => { } )中的 SQL 全部被本地事务进行关联,一 条失败全部回滚 DB.localTx( implicit session => { SQL 1 SQL 2 } ) 2.3.6 创建 TrademarkStatApp(ads)

 

 

 

 

2.3.7 测试 ➢ 确保相关服务启动 Hdfs、ZK、Kafka、Redis、Hbase、Maxwell ➢ 运行 BaseDBMaxwellApp、OrderInfoApp、OrderDetailApp、OrderWideApp、 TrademarkStatApp,运行模拟生成业务数据 jar 包 ➢ 抛出异常查看数据结果,注释掉异常查看数据库表结果

 

标签:ADS,val,JSONObject,偏移量,topic,63,可视化,offset,2.3
来源: https://blog.csdn.net/song_quan_/article/details/119344273