其他分享
首页 > 其他分享> > kettle实战-巧用合并记录组件完成业务对账

kettle实战-巧用合并记录组件完成业务对账

作者:互联网

kettle实战-巧用合并记录组件轻松完成业务对账

应用场景:

对于互联网公司而言,无论是支付还是其他产品的交易操作都难免会涉及到对账的环节,这是保证公司与支付平台或者公司与渠道之间交易对等的关键。kettle作为开源的ETL工具为我们提供了强大的组件支持,在实际的开发中数据处理应用到的地方也是非常的广泛,之前在实际的开发中碰巧遇到了这么一个对账的场景,便借此机会运用kettle的合并记录组件完成了这一功能的开发,相当的方便快捷,极大地降低了开发的工作量。

技术

kettle:pdi-ce-8.3.0.0-371

组件简介-合并记录

1.功能介绍
该步骤用于将两个不同来源的数据合并,这两个来源的数据分别为旧数据和新数据,该步骤将旧数据和新数据按照指定的关键字匹配、比较、合并。
2.设置参数
旧数据来源:旧数据来源的步骤
新数据来源:新数据来源的步骤
关键字段:用于定位两个数据源中的同一条记录
比较字段:对于两个数据源中的同一条记录中,指定需要比较的字段
3.运行结果分析
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果:

第一列第二列
identical旧数据和新数据一样
changed数据发生了变化
new新数据中有而旧数据中没有的记录
deleted旧数据中有而新数据中没有的记录

4.关键点
旧数据和新数据需要事先按照关键字段排序
旧数据和新数据要有相同的字段名称

业务场景

每天下午五点从SFTP上获取渠道交易数据(交易时间:昨日下午三点至今日下午三点,交易类型:转入、转出、退保等),文件名为 交易类型_当前日期.zip(例如:5_20201124.zip),加压缩后包含:交易类型_当前日期.csv(例如:5_20201124.csv)和交易类型_当前日期.csv.md5(例如:5_20201124.csv.md5)文件,解析时对csv文件+key进行MD5获取签名进行核对,然后进行业务对账。

简要流程

SFTP下载渠道侧对账文件——合并记录——结果汇总(落库)——异常数据报表(告警)

整体流程

在这里插入图片描述

流程拆解

第一步:设置变量
设置文件地址,SFTP配置信息,当然常用的配置也可选择配到kettle.properties文件中
在这里插入图片描述

第二步:设置系统时间变量
该步骤主要将系统时间格式化为自己需要使用的格式,例如:文件目录(2020/11/24,或者业务开始时间:2020-11-23 15:00:00,供sql查询使用)
在这里插入图片描述

第三步:下载SFTP上的文件到本地
在这里插入图片描述

第四步:解压缩文件
在这里插入图片描述

第五步:MD5验签(验签代码见下方)
在这里插入图片描述
在这里插入图片描述
第六步:留存渠道交易数据
在这里插入图片描述

第七步:核对数据(关键步骤)
该步骤主要利用合并记录组件来进行数据的核对,需要注意:sql查询时字段的顺序、命名、个数保持、格式保持一致,并且按照相同的排序规则进行排序,然后设置关键字段进行对比将对比结果输出,将验证结果进行处理入库
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第七步:汇总对账结果
该步骤可以将当前日期对账的结果进行汇总形成报表,或者是直接进行告警(比如企业微信机器人告警等)通知运维进行相关问题的核查及时处理。示例中打印日志的地方往往会被设置为异常告警便于及时发现。
在这里插入图片描述

运行结果分析

模拟数据

渠道端交易数据:
在这里插入图片描述

我方交易数据:
在这里插入图片描述

运行日志:

2020/11/23 23:42:09 - verify - 开始项[检查文件存放目录]
2020/11/23 23:42:09 - verify - 开始项[SFTP 下载]
2020/11/23 23:42:09 - verify - 开始项[解压缩文件]
2020/11/23 23:42:09 - verify - 开始项[验签]
2020/11/23 23:42:09 - 验签 - Using run configuration [Pentaho local]
2020/11/23 23:42:09 - 验签 - Using legacy execution engine
2020/11/23 23:42:09 - Md5 - 为了转换解除补丁开始  [Md5]
2020/11/23 23:42:09 - 获取全局变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:09 - Java 代码.0 - >>>>>MD5:3a9715f0fa2081119665d82ccf8c72b7
2020/11/23 23:42:10 - Java 代码.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - 结束文件输出Y/N .0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[判断验签是否正确]
2020/11/23 23:42:10 - verify - 开始项[同步交易数据(入库)]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using legacy execution engine
2020/11/23 23:42:10 - synchRefundData - 为了转换解除补丁开始  [synchRefundData]
2020/11/23 23:42:10 - 读取csv文件.0 - Header row skipped in file 'D:/kettle/verify/download/2020/11/23/5_20201123.csv'
2020/11/23 23:42:10 - 读取csv文件.0 - 完成处理 (I=3, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 获取变量.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入 / 更新.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[核对数据一致性]
2020/11/23 23:42:10 - 核对数据一致性 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 核对数据一致性 - Using legacy execution engine
2020/11/23 23:42:10 - verify - 为了转换解除补丁开始  [verify]
2020/11/23 23:42:10 - 京东退保数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 公司交易数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 处理单位-渠道.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 排序-渠道侧数据.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 处理单位-公司.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 排序-公司数据.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 京东退保数据.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385734
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742984
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = changed
2020/11/23 23:42:10 - 写日志.0 - flag = 1
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 公司交易数据.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2020/11/23 23:42:10 - 正向对比.0 - 完成处理 (I=0, O=0, R=5, W=3, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385736
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742985
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = new
2020/11/23 23:42:10 - 写日志.0 - flag = 3
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 字段选择.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 对账状态映射.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 过滤对账一致记录.0 - 完成处理 (I=0, O=0, R=3, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入对账明细.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[汇总对账结果]
2020/11/23 23:42:10 - 汇总对账结果 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 汇总对账结果 - Using legacy execution engine
2020/11/23 23:42:10 - gatherVerifyData - 为了转换解除补丁开始  [gatherVerifyData]
2020/11/23 23:42:10 - 表输入.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - result = 不一致的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - result = 保险公司多出的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 - 
2020/11/23 23:42:10 - 写日志.0 - ====================

代码解析

MD5验签代码片.

import com.aliyun.openservices.shade.org.apache.commons.codec.digest.DigestUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
        //获取到上一个步骤的输入行
        Object[] r = getRow();
        if (r == null) {
        setOutputDone();
        return false;
        }
        r = createOutputRow(r, data.outputRowMeta.size());
        //读取出參数变量值
        String key = getVariable("key", "");
        String fileDir = getVariable("fileDir", "");
        String todayF = get(Fields.In, "todayF").getString(r);
        String today = get(Fields.In, "today").getString(r);
        String tradeType = getVariable("tradeType", "");
        //获取MD5摘要
        String Md5Dir = fileDir  + "/" +todayF + "/" + tradeType +"_"+ today + ".csv.md5";
        //拼接文件地址
        String csvFileDir = fileDir  + "/" + todayF  + "/"+ tradeType +"_"+ today + ".csv";
        File file = new File(csvFileDir);
        FileInputStream fis = null;
        File channelfile = new File(Md5Dir);
        FileInputStream channelfis = null;
        String md5code = null;
        String channelmd5code = null;
        boolean flag = true;
        try {
        fis = new FileInputStream(file);
        byte[] csvFileNameBytes  = IOUtils.toByteArray(fis);
        md5code = DigestUtils.md5Hex(ArrayUtils.addAll(key.getBytes(), csvFileNameBytes));
        //logBasic(">>>>>MD5:"+md5code);
        //读取渠道验签
        channelfis = new FileInputStream(channelfile);
        byte[]bytes=new byte[1024];
        int len =-1;
        while ((len=channelfis.read(bytes))!=-1){
        channelmd5code=new String(bytes);
        }
        if(channelmd5code!=null){
        channelmd5code=channelmd5code.trim();
        }
        if(!md5code.equals(channelmd5code)){
        flag=false;
        }
        } catch (Exception e) {
        e.printStackTrace();
        } finally {
        try {
        fis.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
        try {
        channelfis.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
        }

        //把计算好的值放入到输出记录中
        get(Fields.Out, "flag").setValue(r, flag);
        //输出到下一个节点做处理
        putRow(data.outputRowMeta, r);
        return true;
        }

demo

demo示例

标签:11,10,23,kettle,42,对账,2020,日志,巧用
来源: https://blog.csdn.net/weixin_45598170/article/details/109952983