网站用户行为分析项目之会话切割(二)
作者:互联网
教程目录
0x00 教程内容- 项目分析
- 编程实现
- 结果展示
在上一教程:网站用户行为分析项目之会话切割(一) 我们已经做了很多准备工作,包括最后一步是实现了过滤不合法的数据,现在先来回顾一下我们的数据变化流程。
0x01 项目分析1. 项目回顾
- 数据流程回顾(
原始数据
=>rawRDD
=>parsedLogRDD
)
a. 原始数据
:
#type|server time|cookie|ip|url
pageview|2017-09-04 12:00:00|cookie1|127.0.0.3|https://www.baidu.com
click|2017-09-04 12:00:02|cookie1|127.0.0.3|https://www.baidu.com
pageview|2017-09-04 12:00:01|cookie2|127.0.0.4|https://www.baidu.com
click|2017-09-04 12:00:04|cookie1|127.0.0.3|https://www.baidu.com
pageview|2017-09-04 12:00:02|cookie2|127.0.0.4|http://news.baidu.com
click|2017-09-04 12:00:03|cookie2|127.0.0.4|http://news.baidu.com
pageview|2017-09-04 12:00:04|cookie2|127.0.0.4|http://music.baidu.com/?fr=tieba
pageview|2017-09-04 12:45:01|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
click|2017-09-04 12:45:02|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
click|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
hhhh|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
3333ss|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
b. 加载数据后,生成了rawRDD
,接着尝试将RDD转化成了下面的格式:
None
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "hhhh", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "3333ss", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
c. 但觉得这不是我们想要的格式,所以我们又转换成了parsedLogRDD
:
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}
{"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}
{"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}
d. 而此处的parsedLogRDD里面的value部分其实是与trackerLog对象的属性一一相对应的,trackerLog对象格式类似如下:
TrackerLog(pageview,2017-09-04 12:00:00,cookie1,127.0.0.3,https://www.baidu.com)
...
...
...
...
2. 项目目标
a. 会话切割类型
我们是想要进行会话切割,会话切割必定是cookie级别或者user级别的,即我们按cookie、按user进程切成切割,一个cookie或者user可以有多个会话。
如果无法理解,可以先往后面看,再由结果回过头来看。
0x02 编程实现1. 按cookie进行分组
现在,我们这里采取先用cookie分组,然后再按user切割的方式。即看一下有多少个cookie,类似于有多少个用户,然后再从用户中切成多少个会话,会话默认是每30分钟切一个。
a. 按照cookie进行分组
val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
分组之后,我们的数据形式类似于如下格式,即按cookie进行了分组:
cookie1 -> Iterator(trackerLog1,trackerLog2.....)
cookie2 -> Iterator(trackerLog4,trackerLog5.....)
此时的每个cookie分成一个key-value,value为装有trackerLog对象的迭代器,cookie均相同。
b. 实际得到的效果如下:
(cookie2,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}))
(cookie1,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}))
cookie2有4条数据,cookie1有6条数据。
2. 按user进行分组
按cookie进行切割之后,下面还需要进行按user进行切割,即每30分钟切割成一份。
a. 对每个cookie再按user进行会话切割
//4、按user进行分组
val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>
//处理每个user的日志
val processor = new OneUserTrackerLogsProcessor(iter.toArray)
processor.buildSessions()
}
sessionRDD.collect().foreach(println)
此处用了一个flatMapValues算子,此处给出一个案例说明,请观察变化:
>>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
>>> def f(x): return x
>>> x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
也就是我们原先的数据:
cookie1 -> Iterator(trackerLog1,trackerLog2.....)
cookie2 -> Iterator(trackerLog4,trackerLog5.....)
会变成类似于这样的数据:
cookie1 -> trackerLog1
cookie1 -> trackerLog2
cookie1 -> trackerLog3
……
cookie2 -> trackerLog10
但是右边trackerLogN
这样的数据不是我们想要的,我们的最终目的是要得到TrackerSession
,所以我们要想办法让它转化一下。OneUserTrackerLogsProcessor
就是让我们进行转化的单独的一个类。
b. 这里抽离出一部分代码,新建OneUserTrackerLogsProcessor
,后面再将逻辑结构补上
package com.shaonaiyi.spark.session
/**
* @Auther: shaonaiyi@163.com
* @Date: 2019/12/14 20:38
* @Description: 转化每个user的trackerLogs为trackerSession
*/
class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) {
def buildSessions() : Array[TrackerSession] = {
//1、会话切割
//2、生成会话
Array()
}
}
到目前为止,应该清楚sessionRDD
的结构,key是cookie,value是TrackerSession,此过程只不过是将value转化了下。
3. 将日志按时间进行排序
a. 用户的日志session间隔超过30分钟,则标记为一个新的session,那就需要对日志进行时间的比较,所以要先将日志进行排序,在OneUserTrackerLogsProcessor
添加语句:
private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString)
4. 切割会话
a. 此时OneUserTrackerLogsProcessor
的完整代码如下
package com.shaonaiyi.session
import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.commons.lang3.time.FastDateFormat
import scala.collection.mutable.ArrayBuffer
/**
* @Auther: shaonaiyi@163.com
* @Date: 2019/12/14 20:38
* @Description: 转化每个user的trackerLogs为trackerSession
*/
class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) {
private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString)
private val dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
//1、会话切割
val oneCuttingSessionLogs = new ArrayBuffer[TrackerLog]() //存放正在切割会话的所有日志
val initBuilder = ArrayBuffer.newBuilder[ArrayBuffer[TrackerLog]] //存放切割完的会话的所有日志
def buildSessions() : Array[TrackerSession] = {
sortedTrackerLogs.foldLeft((initBuilder, Option.empty[TrackerLog])) { case ((builder, preLog), currLog) =>
val currLogTime = dateFormat.parse(currLog.getLogServerTime.toString).getTime
if (preLog.nonEmpty &&
currLogTime - dateFormat.parse(preLog.get.getLogServerTime.toString).getTime >= 30 * 60 * 1000) {
//切割成新的会话
builder += oneCuttingSessionLogs.clone()
oneCuttingSessionLogs.clear()
}
oneCuttingSessionLogs += currLog
(builder, Some(currLog))
}
//2、生成会话
Array()
}
}
b. 根据时间排好序后,因为要进行时间比较,而日志的格式是无法进行比较的,所以需要将时间转化为时间戳。此处是使用FastDateFormat
类,注意引入的类应该是下面这句
import org.apache.commons.lang3.time.FastDateFormat
c. 判断的两条日志,此处定义为preLog,currLog,只需要进行时间的比较即可,如果比较的第一条日志,因为没有得比较,所以就略过,不需要进行操作。如果后一条日志的时间减去前一条时间的相差30分钟,则将当前遍历到的日志(oneCuttingSessionLogs)往builder里写一份,也就是往initBuilder里写一份,initBuilder存放的是切割完的会话的所有日志。
d. 写一份到initBuilder后,删除oneCuttingSessionLogs的内容,将currLog写入到oneCuttingSessionLogs。
e. 最后返回的已经切分好的会话的所有日志,以及当前的日志。
f. 此时可以再新建一个变量cuttedSessionLogsResult来获得想要的结果
val cuttedSessionLogsResult = sortedTrackerLogs.foldLeft((init.......}._1.result()
g. 最后一个会话也要放进去,如果有的话
if (oneCuttingSessionLogs.nonEmpty) {
cuttedSessionLogsResult += oneCuttingSessionLogs
}
最后,一组日志里面,又重新切成了一个又一个的会话,一个会话里面,可以有多条日志。
此时cuttedSessionLogsResult
返回的类型为:ArrayBuffer[ArrayBuffer[TrackerLog]]
5. 生成会话
目前我们的会话已经切割完成了,现在要将切割后的会话再进行完善,以达到我们想要的TrackerSession,所以我们需要对数据进行整合。回顾上一篇教程:网站用户行为分析项目之会话切割(一) ,我们的目的是得到下面两张表:TrackerLog
表,字段为:
TrackerSession
表,字段为:
所以现在需要一个一个拼凑出来。
a. 代码如下:
//2、生成会话
cuttedSessionLogsResult.map { case sessionLogs =>
val session = new TrackerSession()
session.setSessionId(UUID.randomUUID().toString)
session.setSessionServerTime(sessionLogs.head.getLogServerTime)
session.setCookie(sessionLogs.head.getCookie)
session.setIp(sessionLogs.head.getIp)
val pageviewLogs = sessionLogs.filter(_.getLogType.toString.equals("pageview"))
if(pageviewLogs.length == 0) {
session.setLandingUrl("-")
} else {
session.setLandingUrl(pageviewLogs.head.getUrl)
}
session.setPageviewCount(pageviewLogs.length)
val clickLogs = sessionLogs.filter(_.getLogType.toString.equals("click"))
session.setClickCount(clickLogs.length)
if (pageviewLogs.length == 0) {
session.setDomain("-")
} else {
val url = new URL(pageviewLogs.head.getUrl.toString)
session.setDomain(url.getHost)
}
session
}
b. 删除原来的Array(),将buildSessions
返回的类型修改为ArrayBuffer
def buildSessions() : ArrayBuffer[TrackerSession] = {
6. 当前结果查看
至此,还有cookie_label、domain_label两个字段没有加进去。
a. 在TrackerSession
加上实现序列化接口Serializable。然后执行,得到一下结果。
(cookie2,{"session_id": "38059172-e0aa-4d37-97da-12778a5a6455", "session_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "cookie_label": null, "ip": "127.0.0.4", "landing_url": "https://www.baidu.com", "pageview_count": 3, "click_count": 1, "domain": "www.baidu.com", "domain_label": null})
(cookie1,{"session_id": "218fdf54-8b34-484d-b53b-0769ea5d1421", "session_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https://www.baidu.com", "pageview_count": 1, "click_count": 2, "domain": "www.baidu.com", "domain_label": null})
(cookie1,{"session_id": "ec2f3a38-3335-45f5-99f8-c27947bca687", "session_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https://tieba.baidu.com/index.html", "pageview_count": 1, "click_count": 2, "domain": "tieba.baidu.com", "domain_label": null})
b. 结果讲解parsedLogRDD
一共有十条数据,现在是得到3个会话。观察每个会话的pageview_count、click_count两个字段,(3+1)+(1+2)+(1+2)=10条。也就是cookie2有一个会话,此会话里面有3个pageview事件,1个click事件。而cookie1因为它的日志里面,时间间隔有超过30分钟的,所以进行了切分,切分成了两个会话。
7. 实现domain_label字段
a. 观察前面的会话结果,其实cookie_label、domain_label两个字段都还是NULL的,现在我们需要统计一下,先完成domain_label,我们的domain_label数据量比较小,所以我们可以存放在传统数据库里面。因为这里只是演示,所以我就直接在代码中写死了。在SessionCutETL中添加代码
//网站域名标签数据,此处只是演示,其实可以存放在数据库里
val domainLabelMap = Map(
"www.baidu.com" -> "level1",
"www.taobao.com" -> "level2",
"jd.com" -> "level3",
"youku.com" -> "level4"
)
b. 因为数据量比较小,所以,还可以将此数据广播出去
//广播
val domainLabelMapB = sc.broadcast(domainLabelMap)
c. 将domainLabelMapB传进buildSessions函数,以参数的形式传,修改两行代码为:
processor.buildSessions(domainLabelMapB.value)
def buildSessions(domainLabelMap:Map[String, String]) : ArrayBuffer[TrackerSession] = {
d. 设置domainLabel,根据domain获得相对应的domainLabel,没有就用“-”
val domainLabel = domainLabelMap.getOrElse(session.getDomain.toString, "-")
session.setDomainLabel(domainLabel)
e. 执行,查看结果,发现标签已经有了
8. 实现cookie_label字段
a. 获取cookie_label数据
//5、给会话的cookie打标签
val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt").map { case line =>
val temp = line.split("\\|")
(temp(0), temp(1)) // (cookie, cookie_label)
}
b. sessionRDD、cookieLabelRDD的key都是cookie,所以可以进行关联,sessionRDD的数据肯定是要的,只不过是加入cookieLabelRDD的数据而已
val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)
val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
case (cookie, (session, cookieLabelOpt)) =>
if (cookieLabelOpt.nonEmpty) {
session.setCookieLabel(cookieLabelOpt.get)
} else {
session.setCookieLabel("-")
}
session
}
cookieLabeledSessionRDD.collect().foreach(println)
因为左关联后,cookieLabelRDD所对应的value可能是空的,所以对应的应该是Option[String]。
c. 执行,查看结果,发现cookie标签已经有了
9. 保存统计结果
a. 因为是以parquet方式保存,所以需要引入一个jar包,勿忘!
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>
b. 保存parsedLogRDD
//6、保存数据
//6.1、保存TrackerLog,对应的是parsedLogRDD
val trackerLogOutputPath = "data/output/trackerLog"
AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerLog.SCHEMA$)
parsedLogRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath,
classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFormat[TrackerLog]]
)
c. 保存cookieLabeledSessionRDD
//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
val trackerSessionOutputPath = "data/output/trackerSession"
AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerSession.SCHEMA$)
cookieLabeledSessionRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath,
classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFormat[TrackerSession]]
)
d. 然后执行,发现报错,第一个错是一直都有的,第二个错是新的。
10. 解决报错
a. 请查看本博主另一篇文章:
Windows本地安装Hadoop
a. 删除报错时所生成的文件夹,不然会报错
b. 删除data/output/trackerLog文件夹,然后重新执行,即可得到想要的答案
- 数据转化的过程比较繁琐,想要自己多动手尝试,了解其来龙去脉,反复看多几遍。
- 网站用户行为分析项目系列:
网站用户行为分析项目之会话切割(一)
网站用户行为分析项目之会话切割(二)
网站用户行为分析项目之会话切割(三)
作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。
标签:baidu,12,log,04,用户,之会话,cookie,com,切割 来源: https://blog.51cto.com/u_12564104/2896705