其他分享
首页 > 其他分享> > (32)功能 3:把每批次新增的当日日活信息保存到 ES 中

(32)功能 3:把每批次新增的当日日活信息保存到 ES 中

作者:互联网

2.4.1 思路 将去重后的结果保存的 ElasticSearch 中,以便后续业务操作 2.4.2 代码实现 ➢ 在 ES 中创建索引模板

 

 

 ➢ 创建一个样例类,用于封装需要的日志数据(并不是所有采集到的字段都需要)

 ➢ 在 MyESUtil 工具类中提供批量添加 bulkInsert 的方法

 

 ➢ 在 DauApp 类中完成插入的功能

//=============== 向 ES 中保存数据 =================== filteredDStream.foreachRDD{ rdd=>{ // 获取 DS 中的 RDD rdd.foreachPartition{ // 以分区为单位对 RDD 中的数据进行处理,方便批量插入 jsonItr=>{ val dauList: List [DauInfo] = jsonItr.map { jsonObj => { // 每次处理的是一个 json 对象 将 json 对象封装为样例类 val commonJsonObj: JSONObject = jsonObj.getJSONObject( "common" ) DauInfo ( commonJsonObj.getString( "mid" ) , commonJsonObj.getString( "uid" ) , commonJsonObj.getString( "ar" ) , commonJsonObj.getString( "ch" ) , commonJsonObj.getString( "vc" ) , jsonObj.getString( "dt" ) , jsonObj.getString( "hr" ) , "00" , // 分钟我们前面没有转换,默认 00 jsonObj.getLong( "ts" ) ) } }.toList // 对分区的数据进行批量处理 // 获取当前日志字符串 val dt: String = new SimpleDateFormat( "yyyy-MM-dd" ).format( new Date()) MyESUtil. bulkInsert (dauList , "gmall2020_dau_info_" + dt) } } } } 2.4.3 测试 ➢ 启动 Zookeeper ➢ 启动 Kafka ➢ 启动 logger.sh(日志处理服务-Nginx 和 SpringBoot 程序) ➢ 启动 Redis,清空 Redis 中所有数据 127.0.0.1:6379> FLUSHALL ➢ Idea 中运行程序 ➢ 运行模拟生成日志的 jar 注意:因为涉及 classpath 环境变量,要切换到 jar 包所在目录下执行 ➢ 查看输出效果 ◼ 控制台

◼ ES

标签:jsonObj,getString,32,日活,commonJsonObj,日志,dt,ES
来源: https://blog.csdn.net/song_quan_/article/details/118928840