关于通过java调用datax,返回任务执行的方法

作者:沉梦杨志 时间:2023-11-28 21:26:45 

DATAX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

datax的详细介绍

请参考 DataX-Introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry方法,该方法是一个静态方法。


public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");

BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);

String jobPath = cl.getOptionValue("job");

// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");

Configuration configuration = ConfigParser.parse(jobPath);

long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
 jobId = Long.parseLong(jobIdString);
} else {
 // only for dsc & ds & datax 3 update
 String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
 String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
 String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
 List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
  dsJobUrlPatternString, dsTaskGroupUrlPatternString);
 jobId = parseJobIdFromUrl(patternStringList, jobPath);
}

boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
 // 如果不是 standalone 模式,那么 jobId 一定不能为-1
 throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
 LOG.info(vmInfo.toString());
}

LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

LOG.debug(configuration.toJSON());

ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用JobContainer 的start() 方法。


@Override
public void start() {
LOG.info("DataX jobContainer starts job.");

boolean hasException = false;
boolean isDryRun = false;
try {
 this.startTimeStamp = System.currentTimeMillis();
 isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
 if (isDryRun) {
 LOG.info("jobContainer starts to do preCheck ...");
 this.preCheck();
 } else {
 userConf = configuration.clone();
 LOG.debug("jobContainer starts to do preHandle ...");
 this.preHandle();

LOG.debug("jobContainer starts to do init ...");
 this.init();
 LOG.info("jobContainer starts to do prepare ...");
 this.prepare();
 LOG.info("jobContainer starts to do split ...");
 this.totalStage = this.split();
 LOG.info("jobContainer starts to do schedule ...");
 this.schedule();
 LOG.debug("jobContainer starts to do post ...");
 this.post();

LOG.debug("jobContainer starts to do postHandle ...");
 this.postHandle();
 LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

this.invokeHooks();
 }
} catch (Throwable e) {
 LOG.error("Exception when job run", e);

hasException = true;

if (e instanceof OutOfMemoryError) {
 this.destroy();
 System.gc();
 }

if (super.getContainerCommunicator() == null) {
 // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化

AbstractContainerCommunicator tempContainerCollector;
 // standalone
 tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);

super.setContainerCommunicator(tempContainerCollector);
 }

Communication communication = super.getContainerCommunicator().collect();
 // 汇报前的状态,不需要手动进行设置
 // communication.setState(State.FAILED);
 communication.setThrowable(e);
 communication.setTimestamp(this.endTimeStamp);

Communication tempComm = new Communication();
 tempComm.setTimestamp(this.startTransferTimeStamp);

Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
 super.getContainerCommunicator().report(reportCommunication);

throw DataXException.asDataXException(
  FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
 if (!isDryRun) {

this.destroy();
 this.endTimeStamp = System.currentTimeMillis();
 if (!hasException) {
  //最后打印cpu的平均消耗,GC的统计
  VMInfo vmInfo = VMInfo.getVmInfo();
  if (vmInfo != null) {
  vmInfo.getDelta(false);
  LOG.info(vmInfo.totalString());
  }

LOG.info(PerfTrace.getInstance().summarizeNoException());
  this.logStatistics();
 }
 }
}
}

而我们需要的任务信息就在this.logStatistics() 中


private void logStatistics() {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
 transferCosts = 1L;
}

if (super.getContainerCommunicator() == null) {
 return;
}

Communication communication = super.getContainerCommunicator().collect();
communication.setTimestamp(this.endTimeStamp);

Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);

Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);

// 字节速率
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
 / transferCosts;

long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
 / transferCosts;

reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);

super.getContainerCommunicator().report(reportCommunication);

LOG.info(String.format(
 "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
  + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
  + "%-26s: %19s\n",
 "任务启动时刻",
 dateFormat.format(startTimeStamp),

"任务结束时刻",
 dateFormat.format(endTimeStamp),

"任务总计耗时",
 String.valueOf(totalCosts) + "s",
 "任务平均流量",
 StrUtil.stringify(byteSpeedPerSecond)
  + "/s",
 "记录写入速度",
 String.valueOf(recordSpeedPerSecond)
  + "rec/s", "读出记录总数",
 String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
 "读写失败总数",
 String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
));

LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
 dateFormat.format(endTimeStamp) + "|" +
 String.valueOf(totalCosts) + "|" +
 StrUtil.stringify(byteSpeedPerSecond) + "|" +
 String.valueOf(recordSpeedPerSecond) + "|" +
 String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +
 String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
);

if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
 LOG.info(String.format(
  "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
  "Transformer成功记录总数",
  communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),

"Transformer失败记录总数",
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

"Transformer过滤记录总数",
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
 ));
}
}

改造开始

新增返回实体DataxResult (get、set省略)


public class DataxResult {
//任务启动时刻
private long startTimeStamp;
//任务结束时刻
private long endTimeStamp;
//任务总时耗
private long totalCosts;
//任务平均流量
private long byteSpeedPerSecond;
//记录写入速度
private long recordSpeedPerSecond;
//读出记录总数
private long totalReadRecords;
//读写失败总数
private long totalErrorRecords;
//成功记录总数
private long transformerSucceedRecords;
// 失败记录总数
private long transformerFailedRecords;
// 过滤记录总数
private long transformerFilterRecords;
//字节数
private long readSucceedBytes;
//转换开始时间
private long endTransferTimeStamp;
//转换结束时间
private long startTransferTimeStamp;
//转换总耗时
private long transferCosts;

重写logStatistics方法,返回该实体。


private DataxResult logStatistics(DataxResult resultMsg) {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
 transferCosts = 1L;
}
if (super.getContainerCommunicator() == null) {
 return resultMsg;
}
Communication communication = super.getContainerCommunicator().collect();
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
 / transferCosts;
long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
 / transferCosts;

return resultMsg.getResultMsg(startTimeStamp,
 endTimeStamp,
 totalCosts,
 byteSpeedPerSecond,
 recordSpeedPerSecond,
 communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
 communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
 communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
 communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
 communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
 communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),
 this.endTransferTimeStamp,
 this.startTransferTimeStamp,
 transferCosts
);

}

还需要重写JobContainer的**start()**方法。


@Override
public DataxResult start(DataxResult dataxResult) {
...
DataxResult result = new DataxResult();
result = logStatistics(dataxResult);
...
return result;
}

然后在Engine 类中添加模拟测试方法mockentry


public DataxResult mockstart(Configuration allConf) {

...
DataxResult dataxResult = new DataxResult();
return container.start(dataxResult);
}

开始测试

在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 为本地路径

关于通过java调用datax,返回任务执行的方法

该datax_home路径下有以下几个目录

关于通过java调用datax,返回任务执行的方法


public class test {

public static void main(String[] args) {
String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
try {
 DataxResult dataxResult= Engine.mockentry(datxArgs);
} catch (Throwable e) {
 e.printStackTrace();
}

}
}

执行结果为

3

大功告成!

来源:https://blog.csdn.net/ffyangzhch/article/details/88784007

标签:java,调用,datax,任务执行
0
投稿

猜你喜欢

  • 解析MapStruct转换javaBean时出现的诡异事件

    2022-05-15 01:16:28
  • Qt GUI图形图像开发之Qt表格控件QTableView简单使用方法及QTableView与QTableWidget区别

    2022-02-23 05:02:59
  • springsecurity 企业微信登入的实现示例

    2023-06-16 16:39:35
  • Android编程之Sdcard相关代码集锦

    2022-08-15 09:55:24
  • 归并排序的原理及java代码实现

    2021-11-18 13:51:10
  • 深入浅出探索Java分布式锁原理

    2021-12-16 11:58:07
  • Golang+Android基于HttpURLConnection实现的文件上传功能示例

    2023-10-27 06:19:46
  • Java日常练习题,每天进步一点点(24)

    2022-11-17 06:40:40
  • Windows下Flutter+Idea环境搭建及配置

    2022-01-22 18:12:13
  • Java实现分页的前台页面和后台代码

    2021-07-22 17:10:04
  • android开发之Json文件的读写的示例代码

    2021-12-02 11:55:18
  • Android实现双击返回键退出应用实现方法详解

    2023-03-23 21:33:29
  • 简单解析java方法在调用在内存中的执行过程

    2022-04-12 15:46:26
  • Java使用JDK与Cglib动态代理技术统一管理日志记录

    2021-11-09 00:52:31
  • Spring Boot和Kotlin的无缝整合与完美交融

    2022-07-08 04:54:09
  • fastjson转换对象实体@JsonProperty不生效问题及解决

    2023-10-07 00:13:51
  • C#实现Array添加扩展实例

    2023-02-16 23:01:03
  • eclipse连接不到genymotion问题的解决方案

    2022-09-05 23:26:16
  • 简单阐述一下Java集合的概要

    2023-08-23 19:49:45
  • java制作复制文件工具代码分享

    2022-08-05 05:30:22
  • asp之家 软件编程 m.aspxhome.com