其他分享
首页 > 其他分享> > 网站用户行为分析项目之会话切割(二)

网站用户行为分析项目之会话切割(二)

作者:互联网

教程目录

0x00 教程内容
  1. 项目分析
  2. 编程实现
  3. 结果展示

在上一教程:网站用户行为分析项目之会话切割(一) 我们已经做了很多准备工作,包括最后一步是实现了过滤不合法的数据,现在先来回顾一下我们的数据变化流程。

0x01 项目分析
1. 项目回顾

a. 原始数据

#type|server time|cookie|ip|url
pageview|2017-09-04 12:00:00|cookie1|127.0.0.3|https://www.baidu.com
click|2017-09-04 12:00:02|cookie1|127.0.0.3|https://www.baidu.com
pageview|2017-09-04 12:00:01|cookie2|127.0.0.4|https://www.baidu.com
click|2017-09-04 12:00:04|cookie1|127.0.0.3|https://www.baidu.com
pageview|2017-09-04 12:00:02|cookie2|127.0.0.4|http://news.baidu.com
click|2017-09-04 12:00:03|cookie2|127.0.0.4|http://news.baidu.com
pageview|2017-09-04 12:00:04|cookie2|127.0.0.4|http://music.baidu.com/?fr=tieba
pageview|2017-09-04 12:45:01|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
click|2017-09-04 12:45:02|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
click|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
hhhh|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
3333ss|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html

b. 加载数据后,生成了rawRDD,接着尝试将RDD转化成了下面的格式:

None
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"})
Some({"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "hhhh", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})
Some({"log_type": "3333ss", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"})

c. 但觉得这不是我们想要的格式,所以我们又转换成了parsedLogRDD

{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}
{"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}
{"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}
{"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}
{"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}

d. 而此处的parsedLogRDD里面的value部分其实是与trackerLog对象的属性一一相对应的,trackerLog对象格式类似如下:

TrackerLog(pageview,2017-09-04 12:00:00,cookie1,127.0.0.3,https://www.baidu.com)
...
...
...
...
2. 项目目标

a. 会话切割类型

我们是想要进行会话切割,会话切割必定是cookie级别或者user级别的,即我们按cookie、按user进程切成切割,一个cookie或者user可以有多个会话。

如果无法理解,可以先往后面看,再由结果回过头来看。

0x02 编程实现
1. 按cookie进行分组

现在,我们这里采取先用cookie分组,然后再按user切割的方式。即看一下有多少个cookie,类似于有多少个用户,然后再从用户中切成多少个会话,会话默认是每30分钟切一个。

a. 按照cookie进行分组

val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)

分组之后,我们的数据形式类似于如下格式,即按cookie进行了分组:

cookie1 -> Iterator(trackerLog1,trackerLog2.....)
cookie2 -> Iterator(trackerLog4,trackerLog5.....)

此时的每个cookie分成一个key-value,value为装有trackerLog对象的迭代器,cookie均相同。

b. 实际得到的效果如下:

(cookie2,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https://www.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}))
(cookie1,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://www.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}))

cookie2有4条数据,cookie1有6条数据。

2. 按user进行分组

按cookie进行切割之后,下面还需要进行按user进行切割,即每30分钟切割成一份。

a. 对每个cookie再按user进行会话切割

    //4、按user进行分组
    val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>

      //处理每个user的日志
      val processor = new OneUserTrackerLogsProcessor(iter.toArray)
        processor.buildSessions()
    }
    sessionRDD.collect().foreach(println)

此处用了一个flatMapValues算子,此处给出一个案例说明,请观察变化:

    >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    >>> def f(x): return x
    >>> x.flatMapValues(f).collect()
    [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

也就是我们原先的数据:

cookie1 -> Iterator(trackerLog1,trackerLog2.....)
cookie2 -> Iterator(trackerLog4,trackerLog5.....)

会变成类似于这样的数据:

cookie1 -> trackerLog1
cookie1 -> trackerLog2
cookie1 -> trackerLog3
……
cookie2 -> trackerLog10

但是右边trackerLogN这样的数据不是我们想要的,我们的最终目的是要得到TrackerSession,所以我们要想办法让它转化一下。OneUserTrackerLogsProcessor就是让我们进行转化的单独的一个类。

b. 这里抽离出一部分代码,新建OneUserTrackerLogsProcessor,后面再将逻辑结构补上

package com.shaonaiyi.spark.session

/**
  * @Auther: shaonaiyi@163.com
  * @Date: 2019/12/14 20:38
  * @Description: 转化每个user的trackerLogs为trackerSession
  */
class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) {

  def buildSessions() : Array[TrackerSession] = {
    //1、会话切割

  	//2、生成会话
    Array()
  }

}

到目前为止,应该清楚sessionRDD的结构,key是cookie,value是TrackerSession,此过程只不过是将value转化了下。

3. 将日志按时间进行排序

a. 用户的日志session间隔超过30分钟,则标记为一个新的session,那就需要对日志进行时间的比较,所以要先将日志进行排序,在OneUserTrackerLogsProcessor添加语句:

  private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString)
4. 切割会话

a. 此时OneUserTrackerLogsProcessor的完整代码如下

package com.shaonaiyi.session

import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.commons.lang3.time.FastDateFormat

import scala.collection.mutable.ArrayBuffer

/**
  * @Auther: shaonaiyi@163.com
  * @Date: 2019/12/14 20:38
  * @Description: 转化每个user的trackerLogs为trackerSession
  */
class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) {

  private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString)

  private val dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  //1、会话切割
  val oneCuttingSessionLogs = new ArrayBuffer[TrackerLog]()   //存放正在切割会话的所有日志
  val initBuilder = ArrayBuffer.newBuilder[ArrayBuffer[TrackerLog]]   //存放切割完的会话的所有日志

  def buildSessions() : Array[TrackerSession] = {

    sortedTrackerLogs.foldLeft((initBuilder, Option.empty[TrackerLog])) { case ((builder, preLog), currLog) =>

      val currLogTime = dateFormat.parse(currLog.getLogServerTime.toString).getTime

      if (preLog.nonEmpty &&
        currLogTime - dateFormat.parse(preLog.get.getLogServerTime.toString).getTime >= 30 * 60 * 1000) {

        //切割成新的会话
        builder += oneCuttingSessionLogs.clone()
        oneCuttingSessionLogs.clear()
      }
      oneCuttingSessionLogs += currLog

      (builder, Some(currLog))
    }

    //2、生成会话
    Array()
  }

}

b. 根据时间排好序后,因为要进行时间比较,而日志的格式是无法进行比较的,所以需要将时间转化为时间戳。此处是使用FastDateFormat类,注意引入的类应该是下面这句

import org.apache.commons.lang3.time.FastDateFormat

c. 判断的两条日志,此处定义为preLog,currLog,只需要进行时间的比较即可,如果比较的第一条日志,因为没有得比较,所以就略过,不需要进行操作。如果后一条日志的时间减去前一条时间的相差30分钟,则将当前遍历到的日志(oneCuttingSessionLogs)往builder里写一份,也就是往initBuilder里写一份,initBuilder存放的是切割完的会话的所有日志。

d. 写一份到initBuilder后,删除oneCuttingSessionLogs的内容,将currLog写入到oneCuttingSessionLogs。

e. 最后返回的已经切分好的会话的所有日志,以及当前的日志。

f. 此时可以再新建一个变量cuttedSessionLogsResult来获得想要的结果

val cuttedSessionLogsResult = sortedTrackerLogs.foldLeft((init.......}._1.result()

在这里插入图片描述
g. 最后一个会话也要放进去,如果有的话

    if (oneCuttingSessionLogs.nonEmpty) {
      cuttedSessionLogsResult += oneCuttingSessionLogs
    }

在这里插入图片描述
最后,一组日志里面,又重新切成了一个又一个的会话,一个会话里面,可以有多条日志。

此时cuttedSessionLogsResult返回的类型为:ArrayBuffer[ArrayBuffer[TrackerLog]]

5. 生成会话

目前我们的会话已经切割完成了,现在要将切割后的会话再进行完善,以达到我们想要的TrackerSession,所以我们需要对数据进行整合。回顾上一篇教程:网站用户行为分析项目之会话切割(一) ,我们的目的是得到下面两张表:
TrackerLog表,字段为:

TrackerSession表,字段为:

所以现在需要一个一个拼凑出来。
a. 代码如下:

    //2、生成会话
    cuttedSessionLogsResult.map { case sessionLogs =>
        val session = new TrackerSession()
        session.setSessionId(UUID.randomUUID().toString)
        session.setSessionServerTime(sessionLogs.head.getLogServerTime)
        session.setCookie(sessionLogs.head.getCookie)

        session.setIp(sessionLogs.head.getIp)
        val pageviewLogs = sessionLogs.filter(_.getLogType.toString.equals("pageview"))

        if(pageviewLogs.length == 0) {
          session.setLandingUrl("-")
        } else {
          session.setLandingUrl(pageviewLogs.head.getUrl)
        }
        session.setPageviewCount(pageviewLogs.length)

        val clickLogs = sessionLogs.filter(_.getLogType.toString.equals("click"))
        session.setClickCount(clickLogs.length)

      if (pageviewLogs.length == 0) {
        session.setDomain("-")
      } else {
        val url = new URL(pageviewLogs.head.getUrl.toString)
        session.setDomain(url.getHost)
      }
        session
    }

b. 删除原来的Array(),将buildSessions返回的类型修改为ArrayBuffer

def buildSessions() : ArrayBuffer[TrackerSession] = {
6. 当前结果查看

至此,还有cookie_label、domain_label两个字段没有加进去。

a. 在TrackerSession加上实现序列化接口Serializable。然后执行,得到一下结果。

(cookie2,{"session_id": "38059172-e0aa-4d37-97da-12778a5a6455", "session_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "cookie_label": null, "ip": "127.0.0.4", "landing_url": "https://www.baidu.com", "pageview_count": 3, "click_count": 1, "domain": "www.baidu.com", "domain_label": null})
(cookie1,{"session_id": "218fdf54-8b34-484d-b53b-0769ea5d1421", "session_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https://www.baidu.com", "pageview_count": 1, "click_count": 2, "domain": "www.baidu.com", "domain_label": null})
(cookie1,{"session_id": "ec2f3a38-3335-45f5-99f8-c27947bca687", "session_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https://tieba.baidu.com/index.html", "pageview_count": 1, "click_count": 2, "domain": "tieba.baidu.com", "domain_label": null})

b. 结果讲解
parsedLogRDD一共有十条数据,现在是得到3个会话。观察每个会话的pageview_count、click_count两个字段,(3+1)+(1+2)+(1+2)=10条。也就是cookie2有一个会话,此会话里面有3个pageview事件,1个click事件。而cookie1因为它的日志里面,时间间隔有超过30分钟的,所以进行了切分,切分成了两个会话。

7. 实现domain_label字段

a. 观察前面的会话结果,其实cookie_label、domain_label两个字段都还是NULL的,现在我们需要统计一下,先完成domain_label,我们的domain_label数据量比较小,所以我们可以存放在传统数据库里面。因为这里只是演示,所以我就直接在代码中写死了。在SessionCutETL中添加代码

    //网站域名标签数据,此处只是演示,其实可以存放在数据库里
    val domainLabelMap = Map(
      "www.baidu.com" -> "level1",
      "www.taobao.com" -> "level2",
      "jd.com" -> "level3",
      "youku.com" -> "level4"
    )

在这里插入图片描述
b. 因为数据量比较小,所以,还可以将此数据广播出去

    //广播
    val domainLabelMapB = sc.broadcast(domainLabelMap)

c. 将domainLabelMapB传进buildSessions函数,以参数的形式传,修改两行代码为:

processor.buildSessions(domainLabelMapB.value)
def buildSessions(domainLabelMap:Map[String, String]) : ArrayBuffer[TrackerSession] = {

d. 设置domainLabel,根据domain获得相对应的domainLabel,没有就用“-”

      val domainLabel = domainLabelMap.getOrElse(session.getDomain.toString, "-")
      session.setDomainLabel(domainLabel)

在这里插入图片描述
e. 执行,查看结果,发现标签已经有了
在这里插入图片描述

8. 实现cookie_label字段

a. 获取cookie_label数据

    //5、给会话的cookie打标签
    val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt").map { case line =>
      val temp = line.split("\\|")
      (temp(0), temp(1)) // (cookie, cookie_label)
    }

b. sessionRDD、cookieLabelRDD的key都是cookie,所以可以进行关联,sessionRDD的数据肯定是要的,只不过是加入cookieLabelRDD的数据而已

    val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)

    val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
      case (cookie, (session, cookieLabelOpt)) =>
        if (cookieLabelOpt.nonEmpty) {
          session.setCookieLabel(cookieLabelOpt.get)
        } else {
          session.setCookieLabel("-")
        }
        session
    }

    cookieLabeledSessionRDD.collect().foreach(println)

因为左关联后,cookieLabelRDD所对应的value可能是空的,所以对应的应该是Option[String]。

c. 执行,查看结果,发现cookie标签已经有了
在这里插入图片描述

9. 保存统计结果

a. 因为是以parquet方式保存,所以需要引入一个jar包,勿忘!

   <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
       <version>1.8.1</version>
   </dependency>

b. 保存parsedLogRDD

    //6、保存数据
    //6.1、保存TrackerLog,对应的是parsedLogRDD
    val trackerLogOutputPath = "data/output/trackerLog"
    AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerLog.SCHEMA$)
    parsedLogRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath,
      classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFormat[TrackerLog]]
    )

c. 保存cookieLabeledSessionRDD

    //6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
    val trackerSessionOutputPath = "data/output/trackerSession"
    AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerSession.SCHEMA$)
    cookieLabeledSessionRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath,
      classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFormat[TrackerSession]]
    )

d. 然后执行,发现报错,第一个错是一直都有的,第二个错是新的。
在这里插入图片描述

10. 解决报错

a. 请查看本博主另一篇文章:
Windows本地安装Hadoop

0x03 结果展示

a. 删除报错时所生成的文件夹,不然会报错
在这里插入图片描述
b. 删除data/output/trackerLog文件夹,然后重新执行,即可得到想要的答案
在这里插入图片描述

0xFF 总结
  1. 数据转化的过程比较繁琐,想要自己多动手尝试,了解其来龙去脉,反复看多几遍。
  2. 网站用户行为分析项目系列:
    网站用户行为分析项目之会话切割(一)
    网站用户行为分析项目之会话切割(二)
    网站用户行为分析项目之会话切割(三)

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


标签:baidu,12,log,04,用户,之会话,cookie,com,切割
来源: https://blog.51cto.com/u_12564104/2896705