Java定时调用.ktr文件的示例代码(解决方案)

作者:ACGkaka_ 时间:2021-12-29 13:21:49 

1.Maven依赖


<!-- Kettle -->
<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>kettle-core</artifactId>
   <version>7.1.0.0-12</version>
</dependency>
<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>kettle-engine</artifactId>
   <version>7.1.0.0-12</version>
</dependency>
<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>metastore</artifactId>
   <version>7.1.0.0-12</version>
</dependency>
<dependency>
   <groupId>commons-io</groupId>
   <artifactId>commons-io</artifactId>
   <version>1.4</version>
</dependency>

<!-- connector -->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.49</version>
</dependency>

注意:kettle的jar包依赖会拉不下来,需要将jar包install到本地,命令:

创建 0_install.bat 文件


:: 本地 install kettle-core.jar包
CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-core  -Dversion=7.1.0.0-12  -Dpackaging=jar

:: 本地 install kettle-engine.jar包
CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-engine  -Dversion=7.1.0.0-12  -Dpackaging=jar

:: 本地 install metastore.jar包
CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=metastore  -Dversion=7.1.0.0-12  -Dpackaging=jar

pause

或者deploy到内网 * 上,命令:

创建 1_deploy.bat 文件


:: * deploy kettle-core.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-core  -Dversion=7.1.0.0-12  -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: * deploy kettle-engine.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-engine  -Dversion=7.1.0.0-12  -Dpackaging=jar  -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: * deploy metastore.jar包
CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=metastore  -Dversion=7.1.0.0-12  -Dpackaging=jar  -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

pause

(脚本创建在jar包目录下,创建好之后双击运行即可)

jar包、脚本文件下载地址

https://share.weiyun.com/eaOSjqP7

Java定时调用.ktr文件的示例代码(解决方案) 

2.执行.ktr/.kjb工具类

KettleReadUtils.java


import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

import java.io.InputStream;

/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleReadUtils {

/**
    * 调用 kettle ktr
    *
    * @param path 文件路径
    */
   public static void runKtr(String path) {
       try {
           KettleEnvironment.init();
           EnvUtil.environmentInit();
           TransMeta transMeta = new TransMeta(path);
           Trans trans = new Trans(transMeta);
           trans.execute(null);
           trans.waitUntilFinished();
           if (trans.getErrors() > 0) {
               throw new Exception("Errors during transformation execution!");
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

/**
    * 以流的方式调用 kettle ktr
    *
    * @param in 文件流
    */
   public static void runKtrByStream(InputStream in) {
       try {
           KettleEnvironment.init();
           TransMeta transMeta = new TransMeta(in, null, true, null, null);
           Trans trans = new Trans(transMeta);
           trans.execute(null);
           trans.waitUntilFinished();
           if (trans.getErrors() > 0) {
               throw new Exception("Errors during transformation execution!");
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

/**
    * 调用 kettle job
    *
    * @param paraNames  多个参数名
    * @param paraValues 多个参数值
    * @param jobPath    如: String fName= "D:\\kettle\\aaa.kjb";
    */
   public static void runJob(String[] paraNames, String[] paraValues, String jobPath) {
       try {
           KettleEnvironment.init();
           JobMeta jobMeta = new JobMeta(jobPath, null);
           Job job = new Job(null, jobMeta);
           // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
           if (paraNames != null && paraValues != null) {
               for (int i = 0; i < paraNames.length && i < paraValues.length; i++) {
                   job.setVariable(paraNames[i], paraValues[i]);
               }
           }
           job.start();
           job.waitUntilFinished();
           if (job.getErrors() > 0) {
               throw new Exception("Errors during job execution!");
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

}

3.创建.ktr/.kjb工具类

(此处只是提供java创建途径,可以直接使用Spoon.bat创建好的文件)


import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.io.File;

/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleWriteUtils {

/**
    * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
    */
   private static final String DATABASE_XML_1 =
           "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                   "<connection>" +
                   "<name>db1</name>" +
                   "<server>127.0.0.1</server>" +
                   "<type>MYSQL</type>" +
                   "<access>Native</access>" +
                   "<database>test</database>" +
                   "<port>3306</port>" +
                   "<username>root</username>" +
                   "<password>root</password>" +
                   "</connection>";
   private static final String DATABASE_XML_2 =
           "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                   "<connection>" +
                   "<name>db2</name>" +
                   "<server>127.0.0.1</server>" +
                   "<type>MYSQL</type>" +
                   "<access>Native</access>" +
                   "<database>test</database>" +
                   "<port>3306</port>" +
                   "<username>root</username>" +
                   "<password>root</password>" +
                   "</connection>";
   /**
    * 创建ktr文件
    *
    * @param args args
    */
   public static void main(String[] args) {
       try {
           KettleEnvironment.init();
           KettleWriteUtils kettleWriteUtils = new KettleWriteUtils();
           TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans();
           String transXml = transMeta.getXML();
           String transName = "update_insert_Trans.ktr";
           File file = new File(transName);
           FileUtils.writeStringToFile(file, transXml, "UTF-8");
       } catch (Exception e) {
           e.printStackTrace();
           return;
       }

}

/**
    * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
    * @return 元数据
    * @throws KettleXMLException 生成XML异常
    */
   private TransMeta generateMyOwnTrans() throws KettleXMLException {
       TransMeta transMeta = new TransMeta();
       //设置转化的名称
       transMeta.setName("insert_update");
       //添加转换的数据库连接
       DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1);
       transMeta.addDatabase(databaseMeta1);
       DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2);
       transMeta.addDatabase(databaseMeta2);

//registry是给每个步骤生成一个标识Id用
       PluginRegistry registry = PluginRegistry.getInstance();
       //第一个表输入步骤(TableInputMeta)
       TableInputMeta tableInput = new TableInputMeta();
       String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
       //给表输入添加一个DatabaseMeta连接数据库
       DatabaseMeta db1 = transMeta.findDatabase("db1");
       tableInput.setDatabaseMeta(db1);
       String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user";
       tableInput.setSQL(sql);
       //添加TableInputMeta到转换中
       StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput);
       //给步骤添加在spoon工具中的显示位置
       tableInputMetaStep.setDraw(true);
       tableInputMetaStep.setLocation(100, 100);
       transMeta.addStep(tableInputMetaStep);

//第二个步骤插入与更新
       InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
       String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
       //添加数据库连接
       DatabaseMeta db2 = transMeta.findDatabase("db2");
       insertUpdateMeta.setDatabaseMeta(db2);
       //设置操作的表
       insertUpdateMeta.setTableName("t_stat_user_info");
       //设置用来查询的关键字
       insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"});
       insertUpdateMeta.setKeyStream(new String[]{"USER_ID"});
       insertUpdateMeta.setKeyStream2(new String[]{""});
       insertUpdateMeta.setKeyCondition(new String[]{"="});
       //设置要更新的字段
       String[] updatelookup = {"USER_ID","USER_NAME"} ;
       String[] updateStream = {"USER_ID","USER_NAME"} ;
       Boolean[] updateOrNot = {false,true};
       insertUpdateMeta.setUpdateLookup(updatelookup);
       insertUpdateMeta.setUpdateStream(updateStream);
       insertUpdateMeta.setUpdate(updateOrNot);
       //添加步骤到转换中
       StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta);
       insertUpdateStep.setDraw(true);
       insertUpdateStep.setLocation(250, 100);
       transMeta.addStep(insertUpdateStep);
       //添加hop把两个步骤关联起来
       transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
       return transMeta;
   }
}

4.测试执行.ktr文件

执行用例:


public static void main(String[] args) {
   InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr");
   runKtrByStream(inputStream);
}

.ktr文件位置:

Java定时调用.ktr文件的示例代码(解决方案)

执行结果:

Java定时调用.ktr文件的示例代码(解决方案) 

5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容问题

  • mysql-connector-java 5.1.49 版本中,支持连接驱动,org.gjt.mm.mysql.Driver

  • mysql-connector-java 8.* 版本中,连接驱动,com.mysql.cj.jdbc.Driver

  • 如果直接使用 8.* 版本 去连接 MySQL 数据库的话会出现"错误连接数据库"问题:

Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

Java定时调用.ktr文件的示例代码(解决方案)

解决方案:

1.关闭Kettle;

2.将/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar 替换为 mysql-connector-java-8.*.jar

3.打开Kettle,修改连接类型为 Generic database ,配置驱动名称为 com.mysql.cj.jdbc.Driver;

4.重新导出为.ktr/.kjb文件;

5.再用java调用即可解决问题。

Java定时调用.ktr文件的示例代码(解决方案)

整理完毕,完结撒花~

来源:https://blog.csdn.net/qq_33204709/article/details/115833635

标签:Java,定时调用,.ktr
0
投稿

猜你喜欢

  • 一篇文章让你弄懂Java运算符

    2023-12-02 20:16:41
  • Java集合中的fail-fast(快速失败)机制详解

    2023-05-10 16:31:33
  • SpringBoot中定时任务@Scheduled注解的使用解读

    2022-11-24 17:20:11
  • java使用@Transactional时常犯的N种错误

    2021-08-16 01:58:44
  • Spring之spring-context-indexer依赖详解

    2023-11-23 12:21:41
  • SpringBoot+Spring Security+JWT实现RESTful Api权限控制的方法

    2022-07-18 03:38:36
  • 对Java中传值调用的理解分析

    2023-05-03 15:22:14
  • 在Eclipse中运行Solr 基础知识

    2021-07-06 22:51:04
  • maven中profile的使用

    2022-03-31 10:43:53
  • mybatis-plus update更新操作的三种方式(小结)

    2023-10-08 14:05:08
  • java 交换两个数据的方法实例详解

    2021-12-06 00:56:04
  • Spring boot2.0 日志集成方法分享(1)

    2023-05-12 20:10:25
  • Java多线程之ThreadLocal浅析

    2023-06-19 19:55:37
  • Java实战入门之双色球彩票小游戏

    2023-05-12 04:07:13
  • 基于MapReduce实现决策树算法

    2023-10-20 16:05:40
  • SpringBoot在一定时间内限制接口请求次数的实现示例

    2021-10-12 04:28:52
  • 利用Java计算某个日期是星期几

    2023-11-17 05:49:42
  • SpringCloud eureka(server)微服务集群搭建过程

    2023-05-22 15:08:55
  • java图形界面之加法计算器

    2023-08-31 02:27:43
  • Java堆&优先级队列示例讲解(上)

    2023-04-09 11:09:59
  • asp之家 软件编程 m.aspxhome.com