首页 > 其他分享> > |NO.Z.00046|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V04|---------------------------------------|PB数仓.v0
|NO.Z.00046|——————————|^^ 数据 ^^|——|Hadoop&PB级数仓.V04|---------------------------------------|PB数仓.v0
作者:互联网
[BigDataHadoop:Hadoop&PB级数仓.V04] [BigDataHadoop.PB级企业电商离线数仓][|章节一|Hadoop|核心交易分析:增量数据导入|——数据|]
一、增量数据导入
### --- 增量数据导入概述
~~~ # 3张增量表:
~~~ 订单表 yanqi_trade_orders
~~~ 订单产品表 yanqi_order_produce
~~~ 产品信息表 yanqi_product_info
~~~ 初始数据装载(执行一次);
~~~ 可以将前面的全量加载作为初次装载每日加载增量数据(每日数据形成分区);
二、增量导入:订单表### --- 订单表
yanqi_trade_orders ====> ods.ods_trade_orders
~~~ # MySQL 中的时间日期转换
select date_format(createTime, '%Y-%m-%d'),
count(*) from yanqi_trade_orders
group by date_format(createTime, '%Y-%m-%d');
~~~ # 条件的选择,选择时间字段 modifiedTime
[root@hadoop02 ~]# vim /data/yanqidw/json/orders.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"querySql": [
"select orderId, orderNo, userId, status, productMoney, totalMoney, payMethod, isPay, areaId, tradeSrc, tradeType, isRefund, dataFlag, createTime, payTime, modifiedTime from yanqi_trade_orders where date_format(modifiedTime, '%Y-%m-%d')='$do_date'"
],
"jdbcUrl": [
"jdbc:mysql://hadoop05:3306/ebiz"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop01:9000",
"fileType": "text",
"path": "/user/data/trade.db/orders/dt=$do_date",
"fileName": "orders_$do_date",
"column": [
{
"name": "orderId",
"type": "INT"
},
{
"name": "orderNo",
"type": "STRING"
},
{
"name": "userId",
"type": "BIGINT"
},
{
"name": "status",
"type": "TINYINT"
},
{
"name": "productMoney",
"type": "Float"
},
{
"name": "totalMoney",
"type": "Float"
},
{
"name": "payMethod",
"type": "TINYINT"
},
{
"name": "isPay",
"type": "TINYINT"
},
{
"name": "areaId",
"type": "INT"
},
{
"name": "tradeSrc",
"type": "TINYINT"
},
{
"name": "tradeType",
"type": "INT"
},
{
"name": "isRefund",
"type": "TINYINT"
},
{
"name": "dataFlag",
"type": "TINYINT"
},
{
"name": "createTime",
"type": "STRING"
},
{
"name": "payTime",
"type": "STRING"
},
{
"name": "modifiedTime",
"type": "STRING"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
### --- 从mysql导入数据到hdfs下
~~~ # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~ # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p /user/data/trade.db/orders/dt=$do_date
~~~ # 数据迁移
[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/orders.json
### --- 从hdfs导入数据到hive层的ods下
~~~ # hive下建表
DROP TABLE IF EXISTS `ods.ods_trade_orders`;
CREATE EXTERNAL TABLE `ods.ods_trade_orders`(
`orderid` int,
`orderno` string,
`userid` bigint,
`status` tinyint,
`productmoney` decimal(10, 0),
`totalmoney` decimal(10, 0),
`paymethod` tinyint,
`ispay` tinyint,
`areaid` int,
`tradesrc` tinyint,
`tradetype` int,
`isrefund` tinyint,
`dataflag` tinyint,
`createtime` string,
`paytime` string,
`modifiedtime` string)
COMMENT '订单表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/orders/';
~~~ # 加载数据
[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_orders \
add partition(dt='$do_date')"
~~~ # 查看数据是否加载进来
hive (default)> show partitions ods.ods_trade_orders;
partition
dt=2020-07-12
hive (default)> select count(*) from ods.ods_trade_orders where dt='2020-07-12' limit 5;
354
三、增量导入:订单明细表### --- 订单明细表
yanqi_order_product ====> ods.ods_trade_order_product
[root@hadoop02 ~]# vim /data/yanqidw/json/order_product.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"querySql": [
"select id, orderId, productId, productNum, productPrice, money, extra, createTime from yanqi_order_product where date_format(createTime, '%Y-%m-%d') = '$do_date' "
],
"jdbcUrl": [
"jdbc:mysql://hadoop05:3306/ebiz"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop01:9000",
"fileType": "text",
"path": "/user/data/trade.db/order_product/dt=$do_date",
"fileName": "order_product_$do_date.dat",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "orderId",
"type": "INT"
},
{
"name": "productId",
"type": "INT"
},
{
"name": "productNum",
"type": "INT"
},
{
"name": "productPrice",
"type": "Float"
},
{
"name": "money",
"type": "Float"
},
{
"name": "extra",
"type": "STRING"
},
{
"name": "createTime",
"type": "STRING"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
### --- 从mysql导入数据到hdfs下
~~~ # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~ # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p \
/user/data/trade.db/order_product/dt=$do_date
~~~ # 数据迁移
[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/order_product.json
### --- 从hdfs导入数据到hive层的ods下
~~~ # hive下建表
DROP TABLE IF EXISTS `ods.ods_trade_order_product`;
CREATE EXTERNAL TABLE `ods.ods_trade_order_product`(
`id` string,
`orderid` decimal(10,2),
`productid` string,
`productnum` string,
`productprice` string,
`money` string,
`extra` string,
`createtime` string)
COMMENT '订单明细表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/order_product/';
~~~ # 加载数据
[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_order_product \
add partition(dt='$do_date')"
~~~ # 查看数据是否加载进来
hive (default)> show partitions ods.ods_trade_order_product;
partition
dt=2020-07-12
hive (default)> select count(*) from ods.ods_trade_order_product where dt='2020-07-12' limit 5;
1259
四、增量导入:产品信息表### --- 产品信息表
yanqi_product_info ====> ods.ods_trade_product_info
[root@hadoop02 ~]# vim /data/yanqidw/json/product_info.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"querySql": [
"select productid, productname, shopid, price, issale, status, categoryid, createtime, modifytime from yanqi_product_info where date_format(modifyTime, '%Y-%m-%d') = '$do_date' "
],
"jdbcUrl": [
"jdbc:mysql://hadoop05:3306/ebiz"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop01:9000",
"fileType": "text",
"path": "/user/data/trade.db/product_info/dt=$do_date",
"fileName": "product_info_$do_date.dat",
"column": [
{
"name": "productid",
"type": "BIGINT"
},
{
"name": "productname",
"type": "STRING"
},
{
"name": "shopid",
"type": "STRING"
},
{
"name": "price",
"type": "FLOAT"
},
{
"name": "issale",
"type": "TINYINT"
},
{
"name": "status",
"type": "TINYINT"
},
{
"name": "categoryid",
"type": "STRING"
},
{
"name": "createTime",
"type": "STRING"
},
{
"name": "modifytime",
"type": "STRING"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
### --- 从mysql导入数据到hdfs下
~~~ # 定义时间变量
[root@hadoop02 ~]# do_date='2020-07-12'
~~~ # 创建目录
[root@hadoop02 ~]# hdfs dfs -mkdir -p \
/user/data/trade.db/product_info/dt=$do_date
~~~ # 数据迁移
[root@hadoop02 ~]# python $DATAX_HOME/bin/datax.py \
-p "-Ddo_date=$do_date" \
/data/yanqidw/json/product_info.json
### --- 从hdfs导入数据到hive层的ods下
~~~ # hive下建表
DROP TABLE IF EXISTS `ods.ods_trade_product_info`;
CREATE EXTERNAL TABLE `ods.ods_trade_product_info`(
`productid` bigint,
`productname` string,
`shopid` string,
`price` decimal(10,0),
`issale` tinyint,
`status` tinyint,
`categoryid` string,
`createtime` string,
`modifytime` string)
COMMENT '产品信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/product_info/';
~~~ # 加载数据
[root@hadoop02 ~]# hive -e "alter table ods.ods_trade_product_info \
add partition(dt='$do_date')"
~~~ # 查看数据是否加载进来
hive (default)> show partitions ods.ods_trade_product_info;
partition
dt=2020-07-12
hive (default)> select count(*) from ods.ods_trade_product_info where dt='2020-07-12' limit 5;
15807
===============================END===============================
Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart ——W.S.Landor
来自为知笔记(Wiz)
标签:---------------------------------------,product,name,V04,ods,trade,PB,date,type 来源: https://www.cnblogs.com/yanqivip/p/16125891.html