其他分享
首页 > 其他分享> > qwererrwedbnbb

qwererrwedbnbb

作者:互联网



package com.njbdqn

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object Exam {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("country").getOrCreate()
    val rdd = sparkSession.sparkContext.textFile("hdfs://192.168.126.200:9000/app/data/exam/countrydata.csv").map(a=>{
      val arr = a.split(",")
      (arr(0),arr(1),arr(2),arr(3),arr(4),arr(5),arr(6))
    })
    //统计每个国家在数据截止统计时的累计确诊人数。
//    val confirmedCount = rdd.map(a=>{
//          (a._2,a._4,a._5)
//        }).groupBy(_._3).flatMap(x=>{
//          x._2.toArray.sortBy(x => x._2).reverse.take(1)
//        })

    //统计全世界在数据截止统计时的总感染人数
//    val sumPeople = confirmedCount.map(_._1.toInt).sum().toInt
//    print(sumPeople)

//    统计每个大洲中每日累计确诊人数最多的国家及确诊人数,
//    并输出 20200607 这一天各大洲当日累计确诊人数最多的国家及确诊人数
//      val dayPeople = rdd.map(a=>{
//        (a._3.toInt,a._7+a._4,a._5,a._4)
//      }).groupBy(_._2).flatMap(x=>{
//        x._2.toArray.sortBy(x => x._1).reverse.take(1)
//      })
//    dayPeople.filter(_._4.equals("20200408")).map(x=>(x._1,x._3)).foreach(println)

    //统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各
    //大洲当日累计确诊人数最多的国家及确诊人数。
//    val daySumPeople = rdd.map(a=>{
//              (a._2.toInt,a._7+a._4,a._5,a._4)
//            }).groupBy(_._2).flatMap(x=>{
//              x._2.toArray.sortBy(x => x._1).reverse.take(1)
//            })
//    daySumPeople.filter(_._4.equals("20200607")).map(x=>(x._1,x._3)).foreach(println)

    //统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。
    val mouthSumPeople = rdd.map(a=>{
                    (a._2.toInt,a._7+a._4.substring(0,6),a._7,a._4.substring(0,6))
                  }).groupBy(_._2).map(x=>{
                      (x._2.toArray.map(t=>t._1.toInt).max,
                        x._2.toArray.map(t=>t._4).distinct.mkString(""),
                        x._2.toArray.map(t=>t._3).distinct.mkString(""))
                  })
    mouthSumPeople.filter(_._2.equals("202006")).map(x=>(x._1,x._3)).foreach(println)
    sparkSession.stop()
  }

}


1.数据准备(共 10 分) 
请在 HDFS 中创建目录/app/data/exam,并将 countrydata.csv 传到该目录。 
hdfs dfs -mkdir -p /app/data/exam
hdfs dfs -put /root/countrydata.csv /app/data/exam





2.在 Spark-Shell 中,加载 HDFS 文件系统 countrydata.csv 文件,并使用 RDD 完成以下 
统计计算。(共 45 分) 
①统计每个国家在数据截止统计时的累计确诊人数。(9 分) 
package com.njbdqn

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object Exam {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("country").getOrCreate()
    val rdd = sparkSession.sparkContext.textFile("hdfs://192.168.126.200:9000/app/data/exam/countrydata.csv").map(a=>{
      val arr = a.split(",")
      (arr(0),arr(1),arr(2),arr(3),arr(4),arr(5),arr(6))
    })
    //统计每个国家在数据截止统计时的累计确诊人数。
    val confirmedCount = rdd.map(a=>{
          (a._2,a._4,a._5)
        }).groupBy(_._3).flatMap(x=>{
          x._2.toArray.sortBy(x => x._2).reverse.take(1)
        }).foreach(println)


    sparkSession.stop()
  }

}




②统计全世界在数据截止统计时的总感染人数。(9 分) 
val sumPeople = confirmedCount.map(_._1.toInt).sum().toInt
print(sumPeople)



③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各 
大洲当日新增确诊人数最多的国家及确诊人数。(9 分) 
val dayPeople = rdd.map(a=>{
    (a._3.toInt,a._7+a._4,a._5,a._4)
  }).groupBy(_._2).flatMap(x=>{
    x._2.toArray.sortBy(x => x._1).reverse.take(1)
  })
dayPeople.filter(_._4.equals("20200408")).map(x=>(x._1,x._3)).foreach(println)


④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各 
大洲当日累计确诊人数最多的国家及确诊人数。(9 分) 
val daySumPeople = rdd.map(a=>{
          (a._2.toInt,a._7+a._4,a._5,a._4)
        }).groupBy(_._2).flatMap(x=>{
          x._2.toArray.sortBy(x => x._1).reverse.take(1)
        })
daySumPeople.filter(_._4.equals("20200607")).map(x=>(x._1,x._3)).foreach(println)


⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。(9分) 


3.创建 HBase 数据表(共 5 分) 
在 HBase 中创建命名空间(namespace)exam,在该命名空间下创建 covid19_world 表, 
使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 record。record 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家 record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。 

create 'exam:covid19_world','record'


4.请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向 
/app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 HBase 中的 
exam:covid19_world 表的 record 列族(共 15 分) 
create database exam;
create external table ex_exam_record(
id string,
confirmedCount int,
confirmedIncr int,
recordDate string,
countryName string,
countryShortCode string,
continent string
)
row format delimited fields terminated by ','
location '/app/data/exam';

create external table ex_exam_covid19_record(
key string,
maxIncreaseCountry string,
maxIncreaseCount int
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
With serdeproperties ("hbase.columns.mapping"=":key,
record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties("hbase.table.name"="exam:covid19_world");

5. 使用 ex_exam_record 表中的数据(共 25 分) 
①统计每个大洲中每日新增确诊人数最多的国家,将 continent 和 recordDate 合并成 
rowkey,并保存到 ex_exam_covid19_record 表中。(20 分) 
insert into table ex_exam_covid19_record 
select rowkey,countryName,confirmedIncr from (
select concat(continent,recordDate) as rowkey,
rank()over(partition by continent,recordDate order by 	confirmedIncr desc) as rank,countryName,confirmedIncr
from ex_exam_record)a 
where rank=1;


②完成统计后,在 HBase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。(5 分)
scan 'exam:covid19_world',{LIMIT=>20}





































标签:map,确诊,exam,arr,record,._,qwererrwedbnbb
来源: https://blog.csdn.net/Of_the_new/article/details/109956114