其他分享
首页 > 其他分享> > sparkcore案例三:获取每一种状态码对应的访问量

sparkcore案例三:获取每一种状态码对应的访问量

作者:互联网

/**
 * 清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量
 *   1、读取清洗完成的数据成为RDD[String]
 *   2、可以把上一步得到的RDD通过map算子转换成一个键值对类型的RDD,以状态码为key  以不同用户的访问日志为value的数据
 *   3、键值对类型的RDD通过countByKey行动算子计算每种状态码的访问量(不灵活)。也可以使用aggregateByKey或者combineByKey实现访问量的统计
 */
object A3StatusCount {
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setAppName("demo02").setMaster("local[3]")
    val sc:SparkContext = new SparkContext(sparkConf)

    val value: RDD[String] = sc.textFile("hdfs://node1:9000/dc")
    val map = value.map((line: String) => {
      val str: String = line.split(" ")(8)
      (str, 1)
    })
    val count = map.reduceByKey(_ + _)
    count.foreach(println(_))

    sc.stop()
  }
}
object A3StatusCount {
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setAppName("demo02").setMaster("local[3]")
    val sc:SparkContext = new SparkContext(sparkConf)

    val value: RDD[String] = sc.textFile("hdfs://node1:9000/dc")

    val map: RDD[(String, String)] = value.map((line: String) => {
      val strs = line.split(" ")
      (strs(8), line)
    })
    val agg = map.aggregateByKey(0L)(
      (a: Long, b: String) => {
        a + 1L
      },
      (a: Long, b: Long) => {
        a + b
      }
    )
    agg.foreach(println(_))

    sc.stop()
  }
}

标签:map,String,val,RDD,sparkcore,案例,访问量,sc,line
来源: https://www.cnblogs.com/jsqup/p/16630182.html