其他分享
首页 > 其他分享> > |NO.Z.00046|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V04|---------------------------------------|PB数仓.v0

|NO.Z.00046|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V04|---------------------------------------|PB数仓.v0

作者:互联网



[BigDataHadoop:Hadoop&PB级数仓.V04]                                      [BigDataHadoop.PB级企业电商离线数仓][|章节一|Hadoop|核心交易分析:增量数据导入|——数据|]








一、增量数据导入
### --- 增量数据导入概述

~~~     # 3张增量表:
~~~     订单表 yanqi_trade_orders
~~~     订单产品表 yanqi_order_produce
~~~     产品信息表 yanqi_product_info
~~~     初始数据装载(执行一次);
~~~     可以将前面的全量加载作为初次装载每日加载增量数据(每日数据形成分区);
二、增量导入:订单表
### --- 订单表

yanqi_trade_orders ====> ods.ods_trade_orders
~~~     # MySQL 中的时间日期转换

select date_format(createTime, '%Y-%m-%d'), 
count(*) from yanqi_trade_orders
group by date_format(createTime, '%Y-%m-%d');
~~~     # 条件的选择,选择时间字段 modifiedTime

[root@hadoop02 ~]# vim /data/yanqidw/json/orders.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "connection": [
              {
                "querySql": [
                  "select orderId, orderNo, userId, status, productMoney, totalMoney, payMethod, isPay, areaId, tradeSrc, tradeType, isRefund, dataFlag, createTime, payTime, modifiedTime from yanqi_trade_orders where date_format(modifiedTime, '%Y-%m-%d')='$do_date'"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/orders/dt=$do_date",
            "fileName": "orders_$do_date",
            "column": [
              {
                "name": "orderId",
                "type": "INT"
              },
              {
                "name": "orderNo",
                "type": "STRING"
              },
              {
                "name": "userId",
                "type": "BIGINT"
              },
              {
                "name": "status",
                "type": "TINYINT"
              },
              {
                "name": "productMoney",
                "type": "Float"
              },
              {
                "name": "totalMoney",
                "type": "Float"
              },
              {
                "name": "payMethod",
                "type": "TINYINT"
              },
              {
                "name": "isPay",
                "type": "TINYINT"
              },
              {
                "name": "areaId",
                "type": "INT"
              },
              {
                "name": "tradeSrc",
                "type": "TINYINT"
              },
              {
                "name": "tradeType",
                "type": "INT"
              },
              {
                "name": "isRefund",
                "type": "TINYINT"
              },
              {
                "name": "dataFlag",
                "type": "TINYINT"
              },
              {
                "name": "createTime",
                "type": "STRING"
              },
              {
                "name": "payTime",
                "type": "STRING"
              },
              {
                "name": "modifiedTime",
                "type": "STRING"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下

~~~     # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p /user/data/trade.db/orders/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/orders.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive下建表

DROP TABLE IF EXISTS `ods.ods_trade_orders`;

CREATE EXTERNAL TABLE `ods.ods_trade_orders`(
`orderid` int,
`orderno` string,
`userid` bigint,
`status` tinyint,
`productmoney` decimal(10, 0),
`totalmoney` decimal(10, 0),
`paymethod` tinyint,
`ispay` tinyint,
`areaid` int,
`tradesrc` tinyint,
`tradetype` int,
`isrefund` tinyint,
`dataflag` tinyint,
`createtime` string,
`paytime` string,
`modifiedtime` string)
COMMENT '订单表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/orders/';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_orders \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_orders;
partition
dt=2020-07-12

hive (default)> select count(*) from ods.ods_trade_orders  where dt='2020-07-12' limit 5;
354                
三、增量导入:订单明细表
### --- 订单明细表

yanqi_order_product ====> ods.ods_trade_order_product
[root@hadoop02 ~]# vim /data/yanqidw/json/order_product.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "connection": [
              {
                "querySql": [
                  "select id, orderId, productId, productNum, productPrice, money, extra, createTime from yanqi_order_product where date_format(createTime, '%Y-%m-%d') = '$do_date' "
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/order_product/dt=$do_date",
            "fileName": "order_product_$do_date.dat",
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "orderId",
                "type": "INT"
              },
              {
                "name": "productId",
                "type": "INT"
              },
              {
                "name": "productNum",
                "type": "INT"
              },
              {
                "name": "productPrice",
                "type": "Float"
              },
              {
                "name": "money",
                "type": "Float"
              },
              {
                "name": "extra",
                "type": "STRING"
              },
              {
                "name": "createTime",
                "type": "STRING"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下

~~~     # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p \
/user/data/trade.db/order_product/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/order_product.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive下建表

DROP TABLE IF EXISTS `ods.ods_trade_order_product`;

CREATE EXTERNAL TABLE `ods.ods_trade_order_product`(
`id` string,
`orderid` decimal(10,2),
`productid` string,
`productnum` string,
`productprice` string,
    `money` string,
`extra` string,
`createtime` string)
COMMENT '订单明细表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/order_product/';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_order_product \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_order_product;
partition
dt=2020-07-12

hive (default)> select count(*) from ods.ods_trade_order_product  where dt='2020-07-12' limit 5;
1259               
四、增量导入:产品信息表
### --- 产品信息表
yanqi_product_info ====> ods.ods_trade_product_info

[root@hadoop02 ~]# vim /data/yanqidw/json/product_info.json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "12345678",
            "connection": [
              {
                "querySql": [
                  "select productid, productname, shopid, price, issale, status, categoryid, createtime, modifytime from yanqi_product_info where date_format(modifyTime, '%Y-%m-%d') = '$do_date' "
                ],
                "jdbcUrl": [
                  "jdbc:mysql://hadoop05:3306/ebiz"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://hadoop01:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/product_info/dt=$do_date",
            "fileName": "product_info_$do_date.dat",
            "column": [
              {
                "name": "productid",
                "type": "BIGINT"
              },
              {
                "name": "productname",
                "type": "STRING"
              },
              {
                "name": "shopid",
                "type": "STRING"
              },
              {
                "name": "price",
                "type": "FLOAT"
              },
              {
                "name": "issale",
                "type": "TINYINT"
              },
              {
                "name": "status",
                "type": "TINYINT"
              },
              {
                "name": "categoryid",
                "type": "STRING"
              },
              {
                "name": "createTime",
                "type": "STRING"
              },
              {
                "name": "modifytime",
                "type": "STRING"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}
### --- 从mysql导入数据到hdfs下

~~~     # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~     # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p \
/user/data/trade.db/product_info/dt=$do_date
~~~     # 数据迁移

[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/product_info.json
### --- 从hdfs导入数据到hive层的ods下
~~~     # hive下建表

DROP TABLE IF EXISTS `ods.ods_trade_product_info`;

CREATE EXTERNAL TABLE `ods.ods_trade_product_info`(
`productid` bigint,
`productname` string,
`shopid` string,
`price` decimal(10,0),
`issale` tinyint,
`status` tinyint,
`categoryid` string,
`createtime` string,
`modifytime` string)
COMMENT '产品信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/product_info/';
~~~     # 加载数据

[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_product_info \
add partition(dt='$do_date')"
~~~     # 查看数据是否加载进来

hive (default)> show partitions ods.ods_trade_product_info;
partition
dt=2020-07-12

hive (default)> select count(*) from ods.ods_trade_product_info  where dt='2020-07-12' limit 5;
15807              








===============================END===============================


Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor



来自为知笔记(Wiz)

标签:---------------------------------------,product,name,V04,ods,trade,PB,date,type
来源: https://www.cnblogs.com/yanqivip/p/16125891.html