JAVA调用Kettle脚本
作者:互联网
一、Kettle简介
1、ETL简介
ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),对于开发或者运维人员来说,我们经常会遇到各种数据的处理,转换,迁移,所以了解并掌握一种ETL工具的使用,必不可少,这里我们要学习的ETL工具就是Kettle!
2、Kettle简介
Kettle是一款国外开源的ETL工具,纯Java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
Kettle(现在已经更名为PDI,Pentaho Data Integration-Pentaho数据集成)。
二、Java调用Kettle
引入依赖包(此处用的7.1.0.0)
<!-- kettle -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.scannotation</groupId>
<artifactId>scannotation</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.kettle</groupId>
<artifactId>pentaho-vfs-browser</artifactId>
<version>7.1.0.0</version>
</dependency>
<dependency>
<groupId>com.kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>7.1.0.0</version>
</dependency>
<dependency>
<groupId>com.kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>7.1.0.0</version>
</dependency>
<dependency>
<groupId>com.kettle</groupId>
<artifactId>metastore</artifactId>
<version>7.1.0.0</version>
</dependency>
<!-- kettle -->
1、ktr(转换)
/**
* kettle脚本执行 - 转换
* @param initKettleParam kettle脚本入参
* @param crontabMissionLog 输出日志
* @param ktrFilePath 脚本路径(/User/xxx/xxx.ktr)
* @return true/flase
*/
public static boolean runKettleTransfer(Map<String, String> initKettleParam,CrontabMissionLog crontabMissionLog, String ktrFilePath) {
Trans trans;
String uuid = UUID.randomUUID().toString();
logger_info.info("ExecKettleUtil@runKettleTransfer:" + uuid + " {ktrFilePath:" + ktrFilePath + "}");
try {
// 初始化
KettleEnvironment.init();
//EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrFilePath);
// 转换
trans = new Trans(transMeta);
// 初始化trans参数,脚本中获取参数值:${variableName}
if (initKettleParam != null) {
for (String variableName : initKettleParam.keySet()) {
trans.setVariable(variableName, initKettleParam.get(variableName));
//trans.setParameterValue(variableName, initKettleParam.get(variableName));
//transMeta.setParameterValue(variableName, initKettleParam.get(variableName));
}
}
//监听kettle执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
boolean flag = true;
@Override
public void eventAdded(KettleLoggingEvent logs) {
crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString());
}
});
// 执行转换
trans.execute(null);
// 等待转换执行结束
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue());
} else {
crontabMissionLog.setExecStatus(ExecStatus.执行成功.getValue());
crontabMissionLog.setSuccess(true);
}
return true;
} catch (Exception e) {
crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue());
return false;
}
}
2、kjb(任务)
/**
* kettle脚本执行 - 作业
* @param initKettleParam kettle脚本入参
* @param crontabMissionLog 输出日志
* @param ktrFilePath 脚本路径(/User/xxx/xxx.kjb)
*/
public static void runKettlJob(Map<String, String> initKettleParam, CrontabMissionLog crontabMissionLog, String ktrFilePath) {
try {
KettleEnvironment.init();
// jobname 是Job脚本的路径及名称
JobMeta jobMeta = new JobMeta(ktrFilePath, null);
Job job = new Job(null, jobMeta);
// 向Job 脚本传递参数,脚本中获取参数值:${参数名}
if (initKettleParam != null) {
for (String variableName : initKettleParam.keySet()) {
job.setVariable(variableName, initKettleParam.get(variableName));
}
}
//监听kettle执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
boolean flag = true;
@Override
public void eventAdded(KettleLoggingEvent logs) {
crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString());
}
});
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception(
"There are errors during job exception!(执行job发生异常)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
3、调用
/**
* kettle脚本执行 (.ktr;.kjb)
* @param crontabMissionLog 日志
* @param crontabMission 调度实体类
*/
public static void runKettleScript(CrontabMissionLog crontabMissionLog, CrontabMission crontabMission,String businessDate) {
crontabMissionLog.setExecBeginTime(DateEnum.yyyyMMddHHmmssSSS.now());
String ktrFile =crontabMission.getKettleScript();
Map<String,String> initKettleParam = new HashMap<>();
initKettleParam.put("date",businessDate);
// String[] params = new String[0];
// if(StringUtils.hasText(crontabMission.getKettleScriptParam())){
// params = crontabMission.getKettleScriptParam().split(";");
// for(String param : params){
// String[] strings = param.split(":");
// initKettleParam.put(strings[0],strings[1]);
// }
// }
crontabMissionLog.setExecLog(DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本开始执行:\n脚本执行结果:");
String ktrFilePath = fileLocal + "/kettle/"+ crontabMission.getSid() + File.separator + ktrFile;
if (ktrFile.endsWith(".ktr")){
runKettleTransfer(initKettleParam,crontabMissionLog,ktrFilePath);
}else if (ktrFile.endsWith(".kjb")){
runKettlJob(initKettleParam,crontabMissionLog,ktrFilePath);
}else {
crontabMissionLog.setExecLog("无法执行非kettle脚本格式文件");
}
crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+"\n"+DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本完成;");
crontabMissionLog.setExecEndTime(DateEnum.yyyyMMddHHmmssSSS.now());
}
标签:Kettle,JAVA,String,kettle,ktrFilePath,调用,crontabMissionLog,initKettleParam,varia 来源: https://blog.csdn.net/BUGdeQD/article/details/117733258