其他分享
首页 > 其他分享> > |NO.Z.00045|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V03|---------------------------------------|PB数仓.v0

|NO.Z.00045|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V03|---------------------------------------|PB数仓.v0

作者:互联网



[BigDataHadoop:Hadoop&PB级数仓.V03]                                      [BigDataHadoop.PB级企业电商离线数仓][|章节一|Hadoop|核心交易分析:全量数据导入|数据]








一、全量数据导入
### --- 全量数据导入

~~~     MySQL => HDFS => Hive
~~~     每日加载全量数据,形成新的分区;(ODS如何建表有指导左右)
二、全量数据导入:导入产品分类表
### --- 产品分类表导入json文件
~~~     数据量小的表没有必要使用多个channel;使用多个channel会生成多个小文件
~~~     MySQLReader ===> HdfsWriter ebiz.yanqi_product_category ===> ods.ods_trade_product_category

[root@hadoop02 ~]# vim /data/yanqidw/json/product_category.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "column": [
              "catId",
              "parentId",
              "catName",
              "isShow",
              "sortNum",
              "isDel",
              "createTime",
              "level"
            ],
            "connection": [
              {
                "table": [
                  "yanqi_product_category"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/product_category/dt=$do_date",
            "fileName": "product_category_$do_date",
            "column": [
              {
                "name": "catId",
                "type": "INT"
              },
              {
                "name": "parentId",
                "type": "INT"
              },
              {
                "name": "catName",
                "type": "STRING"
              },
              {
                "name": "isShow",
                "type": "TINYINT"
              },
              {
                "name": "sortNum",
                "type": "INT"
              },
              {
                "name": "isDel",
                "type": "TINYINT"
              },
              {
                "name": "createTime",
                "type": "STRING"
              },
              {
                "name": "level",
                "type": "TINYINT"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下
~~~     执行命令之前要在HDFS上创建对应的目录:/user/data/trade.db/product_category/dt=yyyy-mm-dd

~~~     # 定义一个时间变量
[root@hadoop02 ~]# do_date='2020-07-01'
 
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p \
/user/data/trade.db/product_category/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/product_category.json
### --- 从hdfs导入数据到hive层的ods下

~~~     # hive上建表
DROP TABLE IF EXISTS `ods.ods_trade_product_category`;

CREATE EXTERNAL TABLE `ods.ods_trade_product_category`(
`catid` int,
`parentid` int,
`catname` string,
`isshow` tinyint,
`sortnum` int,
`isdel` tinyint,
`createtime` string,
`level` tinyint)
COMMENT '产品分类表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/product_category';
~~~     # 加载数据

[root@hadoop02 ~]#  hive -e "alter table ods.ods_trade_product_category \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_product_category;
partition
dt=2020-07-01

hive (default)> select count(*) from ods.ods_trade_product_category  where dt='2020-07-01' limit 5;
571                    
三、全量数据导入:商家店铺表
### --- 商家店铺表
yanqi_shops ====> ods.ods_trade_shops

[root@hadoop02 ~]# vim /data/yanqidw/json/shops.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "column": [
              "shopId",
              "userId",
              "areaId",
              "shopName",
              "shopLevel",
              "status",
              "createTime",
              "modifyTime"
            ],
            "connection": [
              {
                "table": [
                  "yanqi_shops"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/shops/dt=$do_date",
            "fileName": "shops_$do_date",
            "column": [
              {
                "name": "shopId",
                "type": "INT"
              },
              {
                "name": "userId",
                "type": "INT"
              },
              {
                "name": "areaId",
                "type": "INT"
              },
              {
                "name": "shopName",
                "type": "STRING"
              },
              {
                "name": "shopLevel",
                "type": "TINYINT"
              },
              {
                "name": "status",
                "type": "TINYINT"
              },
              {
                "name": "createTime",
                "type": "STRING"
              },
              {
                "name": "modifyTime",
                "type": "STRING"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下
[root@hadoop02 ~]# do_date='2020-07-02'
 
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p /user/data/trade.db/shops/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/shops.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive上建表

DROP TABLE IF EXISTS `ods.ods_trade_shops`;

CREATE EXTERNAL TABLE `ods.ods_trade_shops`(
    `shopid` int,
`userid` int,
`areaid` int,
`shopname` string,
`shoplevel` tinyint,
`status` tinyint,
`createtime` string,
`modifytime` string)
COMMENT '商家店铺表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/shops';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_shops \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_shops;
partition
dt=2020-07-02

hive (default)> select count(*) from ods.ods_trade_shops  where dt='2020-07-02' limit 5;
5266                   
四、全量数据导入:商家地域组织表
### --- 商家地域组织表
yanqi_shop_admin_org ====> ods.ods_trade_shop_admin_org

[root@hadoop02 ~]# vim /data/yanqidw/json/shop_org.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "column": [
              "id",
              "parentId",
              "orgName",
              "orgLevel",
              "isDelete",
              "createTime",
              "updateTime",
              "isShow",
              "orgType"
            ],
            "connection": [
              {
                "table": [
                  "yanqi_shop_admin_org"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/shop_org/dt=$do_date",
            "fileName": "shop_admin_org_$do_date.dat",
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "parentId",
                "type": "INT"
              },
              {
                "name": "orgName",
                "type": "STRING"
              },
              {
                "name": "orgLevel",
                "type": "TINYINT"
              },
              {
                "name": "isDelete",
                "type": "TINYINT"
              },
              {
                "name": "createTime",
                "type": "STRING"
              },
              {
                "name": "updateTime",
                "type": "STRING"
              },
              {
                "name": "isShow",
                "type": "TINYINT"
              },
              {
                "name": "orgType",
                "type": "TINYINT"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下
[root@hadoop02 ~]# do_date='2020-07-01'
 
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p /user/data/trade.db/shop_org/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/shop_org.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive下创建表

DROP TABLE IF EXISTS `ods.ods_trade_shop_admin_org`;

CREATE EXTERNAL TABLE `ods.ods_trade_shop_admin_org`(
    `id` int,
`parentid` int,
`orgname` string,
`orglevel` tinyint,
`isdelete` tinyint,
`createtime` string,
`updatetime` string,
`isshow` tinyint,
`orgType` tinyint)
COMMENT '商家地域组织表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/shop_org/';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_shop_admin_org \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_shop_admin_org;
partition
dt=2020-07-01

hive (default)> select count(*) from ods.ods_trade_shop_admin_org  where dt='2020-07-01' limit 5;
332                   
五、全量数据导入:支付方式表
### --- 支付方式表

yanqi_payments ====> ods.ods_trade_payments
[root@hadoop02 ~]# vim /data/yanqidw/json/payments.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "column": [
              "id",
              "payMethod",
              "payName",
              "description",
              "payOrder",
              "online"
            ],
            "connection": [
              {
                "table": [
                  "yanqi_payments"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/payments/dt=$do_date",
            "fileName": "payments_$do_date.dat",
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "payMethod",
                "type": "STRING"
              },
              {
                "name": "payName",
                "type": "STRING"
              },
              {
                "name": "description",
                "type": "STRING"
              },
              {
                "name": "payOrder",
                "type": "INT"
              },
              {
                "name": "online",
                "type": "TINYINT"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下
[root@hadoop02 ~]# do_date='2020-07-01'
 
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p /user/data/trade.db/payments/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/payments.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive下建表

DROP TABLE IF EXISTS `ods.ods_trade_payments`;

CREATE EXTERNAL TABLE `ods.ods_trade_payments`(
`id` string,
`paymethod` string,
`payname` string,
`description` string,
`payorder` int,
`online` tinyint)
COMMENT '支付方式表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/payments/';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_payments \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_payments;
partition
dt=2020-07-01

hive (default)> select count(*) from ods.ods_trade_payments  where dt='2020-07-01' limit 5;
6                 








===============================END===============================


Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor



来自为知笔记(Wiz)

标签:---------------------------------------,name,v03,ods,trade,PB,date,dt,type
来源: https://www.cnblogs.com/yanqivip/p/16125887.html