日活需求
作者:互联网
1、需求分析&实现思路
1.1、用户首次登录趋势图
从项目的日志中获取用户的启动日志,如果是当日第一次启动,纳入统计。将统计结果保存到ES中,利用Kibana进行分析展示
1.2、实现思路
第一步:SparkStreaming 消费Kafka数据:Kafka作为数据来源,从kafka中获取日志,kafka中的日志类型有两种,启动和事件,我们这里统计日活,只获取启动日志即可;
第二步:使用redis 对以及完成首次登录的数据进行剔重:每个用户每天可能启动多次。要想计算日活,我们只需要把当前用户每天的第一次启动日志获取即可,所以要对启动日志进行去重,相当于做了一次清洗。
第三步:对剔重Jon过后的明细数据保存到ES中
第四步、利用 Kibana 进行展示
2、功能实现
2.1、创建maven 工程,导入相关依赖
<properties> <spark.version>3.0.0</spark.version> <scala.version>2.12.11</scala.version> <kafka.version>2.4.1</kafka.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>5.3.3</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>3.0.16</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.6.0</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>View Code
相关工具类参见之前的文章
2.2、SparkStreaming 消费kafka 数据
- 模拟日志程序运行生成启动和事件日志
- 请求交给Nginx进行处理
- Nginx反向代理三台处理日志的服务器
- 日志处理服务将日志写到Kafka的主题中
- 编写基本业务类,使用SparkStreming从Kafka主题中消费数据
- 目前只做打印输出
代码实现
import java.lang import java.text.SimpleDateFormat import java.util.Date import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.wdh01.gmall.realtime.bean.DauInfo import org.wdh01.gmall.realtime.util.{MyESutil, MyKafkaUtil, MyRedisUtil, OffsetManagerUtil} import redis.clients.jedis.Jedis import scala.collection.mutable.ListBuffer /** * 日活 */ object DauAPP { def main(args: Array[String]): Unit = { //使用 SparkStreaming 消费 kafka 数据 val conf: SparkConf = new SparkConf().setAppName("DauAPP").setMaster("local[4]") val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) var topic = "gmall_start_0423" var groupId = "gmall_dau_0423" //从 redis 获取跑偏移量 val offsetMap: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(topic, groupId) var recordDStream: InputDStream[ConsumerRecord[String, String]] = null if (offsetMap != null && offsetMap.size > 0) { // //offsetMap!=null && offsetMap.size>0 表示非首次消费 //redis 存在当前消费者组的偏移量信息,那么从指定偏移量位置开始消费 recordDStream = MyKafkaUtil.getKafkaStream(topic, ssc, offsetMap, groupId) } else { // 如果redsi 没有存放偏移量信息,从开始最新位置开始消费 recordDStream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId) } var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] //获取采集周期消费 kafka 的起始偏移量和结束偏移量 val offsetDStream: DStream[ConsumerRecord[String, String]] = recordDStream.transform { rdd => { // recordDStream 底层封装的是 KafkaRDD,混入了 HasOffsetRange 特质, // 其底层提供了 可以获取偏移量范围的方法 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } } // jsonStr.print() //对读取的数据进行处理 val jsonDStream: DStream[JSONObject] = offsetDStream.map { record => { val str: String = record.value() //转换为对象 val jsonObject: JSONObject = JSON.parseObject(str) //获取时间戳 val ts: lang.Long = jsonObject.getLong("ts") //将 ts 转换为日期 小时 val dateStr: String = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts)) //切分日期 和 小时 val date: Array[String] = dateStr.split(" ") val dt: String = date(0) val hr: String = date(1) //向原有 json 增加两个字段,即 日期 & 小时 jsonObject.put("dt", dt) jsonObject.put("hr", hr) //返回新的 json jsonObject } }
测试:启动 zk,kafka,Nginx,log 处理jar 程序,运行idea,在启动 模拟日志即可。
2.3、使用Redis进行剔重
- 利用Redis保存今天访问过系统的用户清单 即SparkStreaming从Kafka中读取到用户的启动日志之后,将用户的启动日志保存到Redis中,进行去重
- 根据保存反馈得到用户是否已存在 Redis的五大数据类型中,String和Set都可以完成去重功能,但是String管理不适合整体操作,比如设置失效时间或者获取当天用户等操作,所以我们项目中使用的是Set类型,处理批量管理以外,还可以根据saddAPI的返回结果判断用户是否已经存在
Key |
Value |
dau:2019-01-22 |
设备id |
/** * 对采集到的数据进行剔重 方式 1 :此处使用 redis Set进行天然剔重 * Redis 使用 set key:dau:2022-04-23 value:mid expire:3600*24 * 方式1 缺点 :采集周期每一条数据都要获取 jedis 连接,浪费资源 */ /* val filterDStream: DStream[JSONObject] = jsonDStream.filter { jsonObj => { //获取登录日期 val dt: String = jsonObj.getString("dt") //获取设备ID val mid: String = jsonObj.getJSONObject("common").getString("mid") //拼接 key val dauKey: String = "dau:" + dt //链接 redis 获取jedis val jedis: Jedis = MyRedisUtil.getJedisClient() //判断是否已经存在 :isFirst 表示是否添加成功:成功 1 表示第一次登录 失败 0 非第一次登录 val isFirst: lang.Long = jedis.sadd(dauKey, mid) if (jedis.ttl(dauKey) < 0) { //jedis.ttl(dauKey)<0 key 永久有效 //设置有效时间 一天 jedis.expire(dauKey, 3600 * 24) } //关闭链接 jedis.close() if (isFirst == 1L) { true } else { false } } }*/ /** * 对采集到的数据进行剔重 方式 2 :此处使用 redis Set进行天然剔重 * * Redis 使用 set key:dau:2022-04-23 value:mid expire:3600*24 * * 方式2 使用 mapPartition ,每一个分区获取一个 jedis 连接 */ val filterDStream1: DStream[JSONObject] = jsonDStream.mapPartitions { jsonObjItr => { //以分区为单位处理数据 val jedis: Jedis = MyRedisUtil.getJedisClient() //声明一个集合;存放当前分区首次登录的数据 val listBuffer: ListBuffer[JSONObject] = new ListBuffer[JSONObject] //对分区数据进行遍历 for (jsonObj <- jsonObjItr) { //获取 jsonObj 相关属性 //获取日期 val dt: String = jsonObj.getString("dt") //获取设备ID val mid: String = jsonObj.getJSONObject("common").getString("mid") //拼接key val dauKey: String = "dau:" + dt //判断是否首次登录 :isFirst 表示是否添加成功:成功 1 表示第一次登录 失败 0 非第一次登录 val isFirst: lang.Long = jedis.sadd(dauKey, mid) if (jedis.ttl(dauKey) < 0) { //jedis.ttl(dauKey)<0 key 永久有效 //设置有效时间 一天 jedis.expire(dauKey, 3600 * 24) } if (isFirst == 1L) { listBuffer.append(jsonObj) } } //关闭链接 jedis.close() listBuffer.toIterator } }
测试时需要在启动 redis 即可,此处建议使用方案2实现,必将频频繁创建redis 连接,过多消耗资源。
2.4、批量保存 ES
将去重后的结果保存的ElasticSearch中,以便后续业务操作
首先创建 ES 模板
PUT _template/gmall0423_dau_info_template { "index_patterns": ["gmall0423_dau_info*"], "settings": { "number_of_shards": 3 }, "aliases" : { "{index}-query": {}, "gmall0423_dau_info-query":{} }, "mappings": { "_doc":{ "properties":{ "mid":{ "type":"keyword" }, "uid":{ "type":"keyword" }, "ar":{ "type":"keyword" }, "ch":{ "type":"keyword" }, "vc":{ "type":"keyword" }, "dt":{ "type":"keyword" }, "hr":{ "type":"keyword" }, "mi":{ "type":"keyword" }, "ts":{ "type":"date" } } } } }
封装实体类
package org.wdh01.gmall.realtime.bean /** * 样例类 * @param mid * @param uid * @param ar * @param ch * @param vc * @param dt * @param hr * @param mi * @param ts */ case class DauInfo( mid: String, //设备id uid: String, //用户id ar: String, //地区 ch: String, //渠道 vc: String, //版本 var dt: String, //日期 var hr: String, //小时 var mi: String, //分钟 ts: Long //时间戳 ) {}
保存ES
//将数据批量保存 ES filterDStream1.foreachRDD { rdd => { //以分区为单位对数据进行处理 rdd.foreachPartition { jsonObjItr => { val dauInfolist: List[(String, DauInfo)] = jsonObjItr.map { jsonObj => { //每次处理的是一个json对象 将json对象封装为样例类 val commonJsonObj: JSONObject = jsonObj.getJSONObject("common") val dauInfo: DauInfo = DauInfo( commonJsonObj.getString("mid"), commonJsonObj.getString("uid"), commonJsonObj.getString("ar"), commonJsonObj.getString("ch"), commonJsonObj.getString("vc"), jsonObj.getString("dt"), jsonObj.getString("hr"), "00", //分钟我们前面没有转换,默认00 jsonObj.getLong("ts") ) (dauInfo.mid, dauInfo) } }.toList //批量保存ES val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) MyESutil.bulkInsert(dauInfolist, "gmall0423_dau_info_" + dt) } } //提交偏移量到 Redis OffsetManagerUtil.saveOffset(topic, groupId, offsetRanges) } } filterDStream1.count().print() ssc.start() ssc.awaitTermination() } } /** * 查看所有topic * bin/kafka-topic.sh --bootstrap-server hadoop201:9092 --list * 消费数据测试 * bin/kafka-console-consumer.sh --bootstrap-server hadoop201:9092 --topic gmall_start_0423 * jsonObject 样例数据 * {"dt":"2022-04-23","common":{"ar":"230000","uid":"149","os":"Android 10.0","ch":"xiaomi","md":"Xiaomi 9","mid":"mid_50","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":5519,"loading_time":1737,"open_ad_id":20},"hr":"03","ts":1650656885000} * redis * /usr/local/bin/redis-server /etc/redis.conf * /usr/local/bin/redis-cli * 127.0.0.1:6379> keys * * 1) "dau:2022-04-23" * 127.0.0.1:6379> Smembers dau:2022-04-23 * redis 格式化所有数据 * flushall * ES 查看所有模板 * #查看所有idnex * GET /_cat/indices * #查看指定 idnex 内容 * GET /gmall0423_dau_info_2022-05-01/_search * */
启动 es & kiban 进行测试即可。
标签:需求,String,val,redis,kafka,日活,import,org 来源: https://www.cnblogs.com/wdh01/p/16217871.html