其他分享
首页 > 其他分享> > 1.Spark 学习成果转化—德国人贷款情况分析—各职业人群贷款目的Top3

1.Spark 学习成果转化—德国人贷款情况分析—各职业人群贷款目的Top3

作者:互联网

本文目录如下:

第1例 德国贷款群体情况分析

1.1 数据准备

1.1.1 数据库表准备

0: jdbc:hive2://hadoop100:10000> create database xinge;

: 德国人信用数据集可从UCI机器学习资料库中进行下载,如有需要请前往: 德国人信用数据集 进行下载。

CREATE TABLE `german_credit`(
	`balance` string,
	`duration` int,
	`history` string,
	`purpose` string,
	`amount` bigint,
	`savings` string,
	`employment` string,
	`instPercent` int,
	`sexMarried` string,
	`guarantors` string,
	`residenceDuration` int,
	`assets` string,
	`age` int,
	`concCredit` string,
	`apartment` string,
	`credits` int,
	`occupation` string,
	`dependents` int,
	`hasPhone` string,
	`foreign` string,
	`con1` int)
row format delimited fields terminated by ' ';
load data local inpath 'datas2/german_credit.txt' into table xinge.german_credit;

CREATE TABLE `purpose_info`(
	`purpose` string,
	`purpose_name` string)
row format delimited fields terminated by '\t';
load data local inpath 'datas2/purpose_info.txt' into table xinge.purpose_info;

CREATE TABLE `occupation_info`(
	`occupation` string,
	`occupation_name` string)
row format delimited fields terminated by '\t';
load data local inpath 'datas2/occupation_info.txt' into table xinge.occupation_info;

1.1.2 数据库表字段解释

属性类型含义
balancestring当前账户余额
durationint申请贷款的期限
historystring是否有不良的贷款记录
purposestring贷款目的
amountbigint申请金额
savingsstring每月结余
employmentstring就业状况
instPercentint利息百分比
sexMarriedstring性别与婚姻状况
guarantorsstring有无担保人
residenceDurationint在当期地址的居住时间
assetsstring净资产
ageint申请人年龄
concCreditstring并行信用
apartmentstring居留身份
creditsint当前信用
occupationstring职业
dependentsint家属人数
hasPhonestring是否使用电话
foreignstring申请人是否为外国人
con1int其他

1.1.3 在 IDEA 中 创建数据库表 并 导入数据

def main(args: Array[String]): Unit = {
  System.setProperty("HADOOP_USER_NAME", "xqzhao")

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

  spark.sql("use xinge")

  // 准备数据
  spark.sql(
    """
      |CREATE TABLE `german_credit`(
      |	`balance` string,
      |	`duration` int,
      |	`history` string,
      |	`purpose` string,
      |	`amount` bigint,
      |	`savings` string,
      |	`employment` string,
      |	`instPercent` int,
      |	`sexMarried` string,
      |	`guarantors` string,
      |	`residenceDuration` int,
      |	`assets` string,
      |	`age` int,
      |	`concCredit` string,
      |	`apartment` string,
      |	`credits` int,
      |	`occupation` string,
      |	`dependents` int,
      |	`hasPhone` string,
      |	`foreign` string,
      |	`con1` int)
      |row format delimited fields terminated by ' '
    """.stripMargin
  )

  spark.sql(
    """
      |load data local inpath 'datas2/german_credit.txt' into table xinge.german_credit
    """.stripMargin
  )

  spark.sql(
    """
      |CREATE TABLE `purpose_info`(
      |	`purpose` string,
      |	`purpose_name` string)
      |row format delimited fields terminated by '\t'
    """.stripMargin
  )

  spark.sql(
    """
      |load data local inpath 'datas2/purpose_info.txt' into table xinge.purpose_info
    """.stripMargin
  )

  spark.sql(
    """
      |CREATE TABLE `occupation_info`(
      |	`occupation` string,
      |	`occupation_name` string)
      |row format delimited fields terminated by '\t'
    """.stripMargin
  )

  spark.sql(
    """
      |load data local inpath 'datas2/occupation_info.txt' into table xinge.occupation_info
    """.stripMargin
  )

  spark.sql("""select * from german_credit""").show

  spark.close()
}

: 这里查询输出最多显示 20 条数据,可以在 show 里面添加一个参数 false, 如 show(false),可以防止内容过长被截取


1.2 需求1:各职业人群贷款目的Top3

1.2.1 需求简介

例如:

职业贷款人数贷款目的备注
技术工人/官方人员630音响/电视 30%, 购买新车 20%, 家具/设备 20%, 其他30%
失业/没有技能-永久居民200购买新车 29%, 音响/电视 28%, 家具/设备 16%, 其他27%
经理/个体经营/高薪人员148购买二手车 23%, 购买新车 22%, 音响/电视 17%, 其他38%
失业/没有技能-移民22购买新车 50%, 维修 9%, 商业 9%, 其他32%

1.2.2 需求分析

1.2.3 功能实现

上面三个步骤可以通过下面的数据库查询语句获得:

select
	occupation_name,
	count(*) as buyCnt
from (
	select
		a.*,
		b.occupation_name,
		c.purpose_name
	from german_credit a
	join occupation_info b on a.occupation = b.occupation
	join purpose_info c on a.purpose = c.purpose
) t1 group by occupation_name order by buyCnt desc;
def main(args: Array[String]): Unit = {
  System.setProperty("HADOOP_USER_NAME", "xqzhao")

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

  spark.sql("use xinge")

  // 查询基本数据
  spark.sql(
    """
      |select
      |	a.*,
      |	b.occupation_name,
      |	c.purpose_name
      |from german_credit a
      |join occupation_info b on a.occupation = b.occupation
      |join purpose_info c on a.purpose = c.purpose
    """.stripMargin).createOrReplaceTempView("t1")

  // 根据区域、商品进行数据聚合
  spark.udf.register("purposeRemark", functions.udaf(new PurposeRemarkUDAF()))
  spark.sql(
    """
      |select
      |  occupation_name,
      |  count(*) as buyCnt,
      |  purposeRemark(purpose_name) as purpose_remark
      |from t1 group by occupation_name order by buyCnt desc
      """.stripMargin).show(false)

  spark.close()
}

case class Buffer(var total: Long, var purposeMap: mutable.Map[String, Long])
// 自定义聚合函数: 实现城市备注功能
// 1.继承Aggregator,定义泛型
//    IN : 购买目的名称
//    BUF :【总购买数量, Map[(purpose, cnt), (purpose, cnt)]】
//    OUT : 备注信息
// 2.重写方法 (6)
class PurposeRemarkUDAF extends Aggregator[String, Buffer, String] {
  // 缓冲区初始化
  override def zero: Buffer = {
    Buffer(0, mutable.Map[String, Long]())
  }

  // 更新缓冲区
  override def reduce(buff: Buffer, city: String): Buffer = {
    buff.total += 1
    val newCount = buff.purposeMap.getOrElse(city, 0L)+ 1
    buff.purposeMap.update(city, newCount)
    buff
  }
  override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
    buff1.total += buff2.total
    val map1 = buff1.purposeMap
    val map2 = buff2.purposeMap

//      buff1.cityMap = map1.foldLeft(map2) {
//        case (map, (city, cnt)) => {
//          val newCount = map.getOrElse(city, 0L) + cnt
//          map.update(city, newCount)
//          map
//        }
//      }
    // 上面的写法不太容易看懂,因此换一种合并方法
    map2.foreach{
      case (purpose, cnt) => {
        val newCount = map1.getOrElse(purpose, 0L) + cnt
        map1.update(purpose, newCount)
      }
    }
    buff1.purposeMap = map1
    buff1
  }

  // 将统计的结果生成字符串信息
  override def finish(buff: Buffer): String = {
    val remarkList = ListBuffer[String]()
    val totalcnt = buff.total
    val purposeMap = buff.purposeMap

    // 降序排列
    var cityCntList = purposeMap.toList.sortWith(
      (left, right) => {
        left._2 > right._2
      }
    ).take(3)

    val hasMore = purposeMap.size > 3
    var rsum = 0L
    cityCntList.foreach {
      case (purpose, cnt) => {
        val r = cnt * 100 / totalcnt
        remarkList.append(s"${purpose} ${r}%")
        rsum += r
      }
    }
    if (hasMore) {
      remarkList.append(s"其他${100 - rsum}%")
    }

    remarkList.mkString(",  ")
  }

  override def bufferEncoder: Encoder[Buffer] = Encoders.product

  override def outputEncoder: Encoder[String] = Encoders.STRING
}

输出结果:

+----------------------+------+-------------------------------------------------------+
|occupation_name       |buyCnt|purpose_remark                                         |
+----------------------+------+-------------------------------------------------------+
|技术工人/官方人员        |630   |音响/电视 30%,  购买新车 20%,  家具/设备 20%,  其他30%      |
|失业/没有技能-永久居民    |200   |购买新车 29%,  音响/电视 28%,  家具/设备 16%,  其他27%      |
|经理/个体经营/高薪人员    |148   |购买二手车 23%,  购买新车 22%,  音响/电视 17%,  其他38%     |
|失业/没有技能-移民       |购买新车|购买新车 50%,  维修 9%,  商业 9%,  其他32%                 |
+----------------------+------+-------------------------------------------------------+

1.3 需求2:各职业人群贷款目的Top3 (II)

1.2.1 需求简介

例如:

职业贷款目的贷款目的备注
技术工人/官方人员音响/电视音响/电视 30%, 购买新车 20%, 家具/设备 20%, 其他30%
技术工人/官方人员购买新车购买新车 29%, 音响/电视 28%, 家具/设备 16%, 其他27%
技术工人/官方人员家具/设备购买二手车 23%, 购买新车 22%, 音响/电视 17%, 其他38%
失业/没有技能-永久居民22购买新车 50%, 维修 9%, 商业 9%, 其他32%

1.2.2 需求分析

1.2.3 功能实现

上面三个步骤可以通过下面的数据库查询语句获得:

select 
	*
from (
	select 
		*,
		rank() over( distribute by occupation_name order by buyCnt desc ) as rank
	from (
		select
			occupation_name,
			purpose_name,
			count(*) as buyCnt
		from (
			select
				a.*,
				b.occupation_name,
				c.purpose_name
			from german_credit a
			join occupation_info b on a.occupation = b.occupation
			join purpose_info c on a.purpose = c.purpose
		) t1 group by occupation_name, purpose_name
	) t2
) t3 where rank <= 3;

声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK

标签:info,string,Top3,name,purpose,贷款,Spark,occupation
来源: https://blog.csdn.net/affluent6/article/details/120296899