kettle实战-巧用合并记录组件完成业务对账
作者:互联网
kettle实战-巧用合并记录组件轻松完成业务对账
应用场景:
对于互联网公司而言,无论是支付还是其他产品的交易操作都难免会涉及到对账的环节,这是保证公司与支付平台或者公司与渠道之间交易对等的关键。kettle作为开源的ETL工具为我们提供了强大的组件支持,在实际的开发中数据处理应用到的地方也是非常的广泛,之前在实际的开发中碰巧遇到了这么一个对账的场景,便借此机会运用kettle的合并记录组件完成了这一功能的开发,相当的方便快捷,极大地降低了开发的工作量。
技术
kettle:pdi-ce-8.3.0.0-371
组件简介-合并记录
1.功能介绍:
该步骤用于将两个不同来源的数据合并,这两个来源的数据分别为旧数据和新数据,该步骤将旧数据和新数据按照指定的关键字匹配、比较、合并。
2.设置参数:
旧数据来源:旧数据来源的步骤
新数据来源:新数据来源的步骤
关键字段:用于定位两个数据源中的同一条记录
比较字段:对于两个数据源中的同一条记录中,指定需要比较的字段
3.运行结果分析:
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果:
第一列 | 第二列 |
---|---|
identical | 旧数据和新数据一样 |
changed | 数据发生了变化 |
new | 新数据中有而旧数据中没有的记录 |
deleted | 旧数据中有而新数据中没有的记录 |
4.关键点:
旧数据和新数据需要事先按照关键字段排序
旧数据和新数据要有相同的字段名称
业务场景
每天下午五点从SFTP上获取渠道交易数据(交易时间:昨日下午三点至今日下午三点,交易类型:转入、转出、退保等),文件名为 交易类型_当前日期.zip(例如:5_20201124.zip),加压缩后包含:交易类型_当前日期.csv(例如:5_20201124.csv)和交易类型_当前日期.csv.md5(例如:5_20201124.csv.md5)文件,解析时对csv文件+key进行MD5获取签名进行核对,然后进行业务对账。
简要流程
SFTP下载渠道侧对账文件——合并记录——结果汇总(落库)——异常数据报表(告警)
整体流程
流程拆解
第一步:设置变量
设置文件地址,SFTP配置信息,当然常用的配置也可选择配到kettle.properties文件中
第二步:设置系统时间变量
该步骤主要将系统时间格式化为自己需要使用的格式,例如:文件目录(2020/11/24,或者业务开始时间:2020-11-23 15:00:00,供sql查询使用)
第三步:下载SFTP上的文件到本地
第四步:解压缩文件
第五步:MD5验签(验签代码见下方)
第六步:留存渠道交易数据
第七步:核对数据(关键步骤)
该步骤主要利用合并记录组件来进行数据的核对,需要注意:sql查询时字段的顺序、命名、个数保持、格式保持一致,并且按照相同的排序规则进行排序,然后设置关键字段进行对比将对比结果输出,将验证结果进行处理入库
第七步:汇总对账结果
该步骤可以将当前日期对账的结果进行汇总形成报表,或者是直接进行告警(比如企业微信机器人告警等)通知运维进行相关问题的核查及时处理。示例中打印日志的地方往往会被设置为异常告警便于及时发现。
运行结果分析
模拟数据
渠道端交易数据:
我方交易数据:
运行日志:
2020/11/23 23:42:09 - verify - 开始项[检查文件存放目录]
2020/11/23 23:42:09 - verify - 开始项[SFTP 下载]
2020/11/23 23:42:09 - verify - 开始项[解压缩文件]
2020/11/23 23:42:09 - verify - 开始项[验签]
2020/11/23 23:42:09 - 验签 - Using run configuration [Pentaho local]
2020/11/23 23:42:09 - 验签 - Using legacy execution engine
2020/11/23 23:42:09 - Md5 - 为了转换解除补丁开始 [Md5]
2020/11/23 23:42:09 - 获取全局变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:09 - Java 代码.0 - >>>>>MD5:3a9715f0fa2081119665d82ccf8c72b7
2020/11/23 23:42:10 - Java 代码.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - 结束文件输出Y/N .0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[判断验签是否正确]
2020/11/23 23:42:10 - verify - 开始项[同步交易数据(入库)]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using legacy execution engine
2020/11/23 23:42:10 - synchRefundData - 为了转换解除补丁开始 [synchRefundData]
2020/11/23 23:42:10 - 读取csv文件.0 - Header row skipped in file 'D:/kettle/verify/download/2020/11/23/5_20201123.csv'
2020/11/23 23:42:10 - 读取csv文件.0 - 完成处理 (I=3, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 获取变量.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入 / 更新.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[核对数据一致性]
2020/11/23 23:42:10 - 核对数据一致性 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 核对数据一致性 - Using legacy execution engine
2020/11/23 23:42:10 - verify - 为了转换解除补丁开始 [verify]
2020/11/23 23:42:10 - 京东退保数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 公司交易数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 处理单位-渠道.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 排序-渠道侧数据.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 处理单位-公司.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 排序-公司数据.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 京东退保数据.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385734
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742984
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = changed
2020/11/23 23:42:10 - 写日志.0 - flag = 1
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 公司交易数据.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2020/11/23 23:42:10 - 正向对比.0 - 完成处理 (I=0, O=0, R=5, W=3, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385736
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742985
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = new
2020/11/23 23:42:10 - 写日志.0 - flag = 3
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 字段选择.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 对账状态映射.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 过滤对账一致记录.0 - 完成处理 (I=0, O=0, R=3, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入对账明细.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[汇总对账结果]
2020/11/23 23:42:10 - 汇总对账结果 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 汇总对账结果 - Using legacy execution engine
2020/11/23 23:42:10 - gatherVerifyData - 为了转换解除补丁开始 [gatherVerifyData]
2020/11/23 23:42:10 - 表输入.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - result = 不一致的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - result = 保险公司多出的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
代码解析
MD5验签代码片
.
import com.aliyun.openservices.shade.org.apache.commons.codec.digest.DigestUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
//获取到上一个步骤的输入行
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
//读取出參数变量值
String key = getVariable("key", "");
String fileDir = getVariable("fileDir", "");
String todayF = get(Fields.In, "todayF").getString(r);
String today = get(Fields.In, "today").getString(r);
String tradeType = getVariable("tradeType", "");
//获取MD5摘要
String Md5Dir = fileDir + "/" +todayF + "/" + tradeType +"_"+ today + ".csv.md5";
//拼接文件地址
String csvFileDir = fileDir + "/" + todayF + "/"+ tradeType +"_"+ today + ".csv";
File file = new File(csvFileDir);
FileInputStream fis = null;
File channelfile = new File(Md5Dir);
FileInputStream channelfis = null;
String md5code = null;
String channelmd5code = null;
boolean flag = true;
try {
fis = new FileInputStream(file);
byte[] csvFileNameBytes = IOUtils.toByteArray(fis);
md5code = DigestUtils.md5Hex(ArrayUtils.addAll(key.getBytes(), csvFileNameBytes));
//logBasic(">>>>>MD5:"+md5code);
//读取渠道验签
channelfis = new FileInputStream(channelfile);
byte[]bytes=new byte[1024];
int len =-1;
while ((len=channelfis.read(bytes))!=-1){
channelmd5code=new String(bytes);
}
if(channelmd5code!=null){
channelmd5code=channelmd5code.trim();
}
if(!md5code.equals(channelmd5code)){
flag=false;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
channelfis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//把计算好的值放入到输出记录中
get(Fields.Out, "flag").setValue(r, flag);
//输出到下一个节点做处理
putRow(data.outputRowMeta, r);
return true;
}
demo
标签:11,10,23,kettle,42,对账,2020,日志,巧用 来源: https://blog.csdn.net/weixin_45598170/article/details/109952983