编程语言
首页 > 编程语言> > JAVA调用Kettle脚本

JAVA调用Kettle脚本

作者:互联网

一、Kettle简介

1、ETL简介

ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),对于开发或者运维人员来说,我们经常会遇到各种数据的处理,转换,迁移,所以了解并掌握一种ETL工具的使用,必不可少,这里我们要学习的ETL工具就是Kettle!

2、Kettle简介

Kettle是一款国外开源的ETL工具,纯Java编写,可以在Window、Linux、Unix上运行,绿色无需安装数据抽取高效稳定。Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么而不是你想怎么做。Kettle中有两种脚本文件,transformationjob,transformation完成针对数据的基础转换,job则完成整个工作流的控制。

Kettle(现在已经更名为PDI,Pentaho Data Integration-Pentaho数据集成)。

二、Java调用Kettle

引入依赖包(此处用的7.1.0.0)

        <!-- kettle -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-vfs2</artifactId>
            <version>2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>27.0.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.scannotation</groupId>
            <artifactId>scannotation</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.kettle</groupId>
            <artifactId>pentaho-vfs-browser</artifactId>
            <version>7.1.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.kettle</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>7.1.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.kettle</groupId>
            <artifactId>kettle-core</artifactId>
            <version>7.1.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.kettle</groupId>
            <artifactId>metastore</artifactId>
            <version>7.1.0.0</version>
        </dependency>
        <!-- kettle -->

1、ktr(转换)

/**
     * kettle脚本执行 - 转换
     * @param initKettleParam kettle脚本入参
     * @param crontabMissionLog 输出日志
     * @param ktrFilePath 脚本路径(/User/xxx/xxx.ktr)
     * @return true/flase
     */
    public static boolean runKettleTransfer(Map<String, String> initKettleParam,CrontabMissionLog crontabMissionLog, String ktrFilePath) {
        Trans trans;
        String uuid = UUID.randomUUID().toString();
        logger_info.info("ExecKettleUtil@runKettleTransfer:" + uuid + " {ktrFilePath:" + ktrFilePath + "}");
        try {
            // 初始化
            KettleEnvironment.init();
            //EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(ktrFilePath);
            // 转换
            trans = new Trans(transMeta);
            // 初始化trans参数,脚本中获取参数值:${variableName}
            if (initKettleParam != null) {
                for (String variableName : initKettleParam.keySet()) {
                     trans.setVariable(variableName, initKettleParam.get(variableName));
                     //trans.setParameterValue(variableName, initKettleParam.get(variableName));
                     //transMeta.setParameterValue(variableName, initKettleParam.get(variableName));
                }
            }

            //监听kettle执行日志
            KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
                boolean flag = true;
                @Override
                public void eventAdded(KettleLoggingEvent logs) {
                    crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString());
                }
            });

            // 执行转换
            trans.execute(null);
            // 等待转换执行结束
            trans.waitUntilFinished();
            if (trans.getErrors() > 0) {
                crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue());
            } else {
                crontabMissionLog.setExecStatus(ExecStatus.执行成功.getValue());
                crontabMissionLog.setSuccess(true);
            }
            return true;
        } catch (Exception e) {
            crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue());
            return false;
        }
    }

2、kjb(任务)

/**
     * kettle脚本执行 - 作业
     * @param initKettleParam kettle脚本入参
     * @param crontabMissionLog 输出日志
     * @param ktrFilePath 脚本路径(/User/xxx/xxx.kjb)
     */
    public static void runKettlJob(Map<String, String> initKettleParam, CrontabMissionLog crontabMissionLog, String ktrFilePath) {
        try {
            KettleEnvironment.init();
            // jobname 是Job脚本的路径及名称
            JobMeta jobMeta = new JobMeta(ktrFilePath, null);
            Job job = new Job(null, jobMeta);
            // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
            if (initKettleParam != null) {
                for (String variableName : initKettleParam.keySet()) {
                    job.setVariable(variableName, initKettleParam.get(variableName));
                }
            }
            //监听kettle执行日志
            KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
                boolean flag = true;
                @Override
                public void eventAdded(KettleLoggingEvent logs) {
                    crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString());
                }
            });
            job.start();
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception(
                        "There are errors during job exception!(执行job发生异常)");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3、调用

 /**
     * kettle脚本执行 (.ktr;.kjb)
     * @param crontabMissionLog 日志
     * @param crontabMission  调度实体类
     */
    public static void runKettleScript(CrontabMissionLog crontabMissionLog, CrontabMission crontabMission,String businessDate) {
        crontabMissionLog.setExecBeginTime(DateEnum.yyyyMMddHHmmssSSS.now());
        String ktrFile =crontabMission.getKettleScript();
        Map<String,String> initKettleParam = new HashMap<>();
        initKettleParam.put("date",businessDate);
//        String[] params = new String[0];
//        if(StringUtils.hasText(crontabMission.getKettleScriptParam())){
//            params = crontabMission.getKettleScriptParam().split(";");
//            for(String param : params){
//                String[] strings = param.split(":");
//                initKettleParam.put(strings[0],strings[1]);
//            }
//        }
        crontabMissionLog.setExecLog(DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本开始执行:\n脚本执行结果:");
        String ktrFilePath = fileLocal + "/kettle/"+ crontabMission.getSid() + File.separator + ktrFile;
        if (ktrFile.endsWith(".ktr")){
            runKettleTransfer(initKettleParam,crontabMissionLog,ktrFilePath);
        }else if (ktrFile.endsWith(".kjb")){
            runKettlJob(initKettleParam,crontabMissionLog,ktrFilePath);
        }else {
            crontabMissionLog.setExecLog("无法执行非kettle脚本格式文件");
        }
        crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+"\n"+DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本完成;");
        crontabMissionLog.setExecEndTime(DateEnum.yyyyMMddHHmmssSSS.now());


    }

 

标签:Kettle,JAVA,String,kettle,ktrFilePath,调用,crontabMissionLog,initKettleParam,varia
来源: https://blog.csdn.net/BUGdeQD/article/details/117733258