java开源调度如何给xxljob加k8s执行器

作者:KL 时间:2021-09-17 16:41:50 

前言

xxljob 是采用 java 开发的开源的任务调度系统,架构上分为调度管理器、执行器,目前除了官方提供的 java 执行器外,也有 go 开发者提供了 go 语言的执行器(看了 go 执行器的代码,除了任务日志没有实现,其他功能实现都比较完整)。 xxljob 在设计上,抽象出了执行器的接口,所以实现一个语言的执行器并不复杂,这里主要探索下,如何利用 k8s 的 pod 的能力,使用 xxljob 调度 pod 运行,实现一个通用的和语言无关的执行器

  • xxljob :https://github.com/xuxueli/xxl-job

  • k8s-client-java: https://github.com/fabric8io/kubernetes-client

执行器接口

实现一个 xxljob 的执行器,如果不考虑执行器节点自动注册,只需要实现如下五个接口即可:

  • /beat :执行器心跳

  • /idleBeat :执行器的某个 job 是否空闲

  • /run :触发 job 执行

  • /kill :终止正在执行的 job

  • /log :查看本节点执行器的 job 执行日志

不过一些调度策略则需要每个执行器自行实现了,比如【阻塞处理策略】,当同一个job 的任务还在执行,突然又收到了一个新的,是串行执行,还是停止之前的任务,或者丢弃当前的任务,这些实现都需要执行器考虑。

K8S 执行器设计

上面已经了解了实现一个执行器的要素。但是让 k8s 实现这些接口,难度有点高。然后又希望不破坏现有的 xxljob 的设计,怎么办?代理解决。可以直接采用现有的 java 执行器,创建一个 job 任务,这个 job 任务专门发起 k8s 的调度,具体的调度 pod 信息通过调度参数传递,下面来实现下,以及看下需要注意的问题。

1、在 XXL-JOB-ADMIN 模块新增执行器

为了尽量减少系统维护的复杂度,我们可以将代理调度 k8s 的执行器,直接集成到 admin 模块,启动 admin 的时候,自动注册 k8s 执行器。

java开源调度如何给xxljob加k8s执行器

2、引入 K8S-CLIENT-JAVA ,使用 SERVICE ACCOUNT 机制与 K8S 交互

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>5.4.0</version>
</dependency>

这个客户端提供了完整的和 k8s-api-server 交互能力,使用这个客户端,基于 k8s 的 service account 认证,可以轻松在 xxljob 所在 namespace 内完成 pod 的生命周期管理。引入依赖后,首先创建 client 实例:

/**
* @author kl (http://kailing.pub)
* @since 2021/6/4
*/
@Configuration
public class KubernetesClientConfig {
   @Bean
   public KubernetesClient kubernetesClient(){
       return new DefaultKubernetesClient(Config.autoConfigure(null));
   }

}

这里初始化客户端时,采用了自动发现配置的模式,如果是本机开发时,就会自动寻找你本机的 kubectl 配置,当 xxljob 部署到 k8s 内时,如果找不到本地的就会尝试寻找 service account 创建出来的配置,然后从环境变量中自发现 k8s 集群的链接地址。所以无论是开发环境还是线上环境,都不用配置k8s 的链接认证信息。但是,部署到 k8s 时,因为需要借助 k8s 的 service account 机制与 k8s 交互,需要多定义一个 service account 的权限声明,可参考如下:

# In GKE need to get RBAC permissions first with
# kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin [--user=|--group=]
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: xxljob
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 name: xxljob
rules:
- apiGroups: [""]
 resources: ["pods"]
 verbs: ["create","delete","get","list","patch","update","watch"]
- apiGroups: [""]
 resources: ["pods/exec"]
 verbs: ["create","delete","get","list","patch","update","watch"]
- apiGroups: [""]
 resources: ["pods/log"]
 verbs: ["get","list","watch"]
- apiGroups: [""]
 resources: ["events"]
 verbs: ["watch"]
- apiGroups: [""]
 resources: ["secrets"]
 verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: xxljob
roleRef:
 apiGroup: rbac.authorization.k8s.io
 kind: Role
 name: xxljob
subjects:
- kind: ServiceAccount
 name: xxljob

3、编写代理执行器调度代码

/**
* @author kl (http://kailing.pub)
* @since 2021/5/28
*/
@Component
public class KubernetesExecutorHandler {
   private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutorHandler.class);
   private static final String NAMESPACE = "xxl-job";
   private final KubernetesClient client;
   public KubernetesExecutorHandler(KubernetesClient client) {
       this.client = client;
   }
   @XxlJob(value = "callK8s")
   public void callK8s() throws InterruptedException {
       String podResource = XxlJobHelper.getJobParam();
       Pod pod = Serialization.unmarshal(podResource, Pod.class);
       pod.getSpec().setRestartPolicy("Never");//这里强制设置重启策略为不重启
       pod = client.pods().inNamespace(NAMESPACE).create(pod);
       client.resource(pod).waitUntilCondition(pod1 -> pod1.getStatus().getPhase().equals("Succeeded") || pod1.getStatus().getPhase().equals("Failed"), 2, TimeUnit.MINUTES);
       String log = client.pods().inNamespace(NAMESPACE).withName(pod.getMetadata().getName()).getLog();
       XxlJobHelper.log(log); //记录 pod 日志到 xxl-job
       logger.info(log);
       client.resource(pod).delete();
   }

}

如上,一个简版的 k8s 执行器便完成了,使用时,通过定义bean模式的 job ,然后选择 k8s 执行器,jobHandler 名称和填上 callk8s,通过job 参数传递 pod 调度信息,如:

java开源调度如何给xxljob加k8s执行器

这里定义了一个打印 当前时间和当前环境变量的 pod 任务,执行完成后,就可以从 job 的日志里看到执行结果了,如:

java开源调度如何给xxljob加k8s执行器

来源:http://www.kailing.pub/article/index/arcid/333.html

标签:开源调度,jxxljob,k8s,执行器
0
投稿

猜你喜欢

  • Spring注解驱动之BeanDefinitionRegistryPostProcessor原理解析

    2023-11-24 23:24:21
  • Java删除二叉搜索树最大元素和最小元素的方法详解

    2023-09-30 07:27:09
  • java实现文件下载的两种方式

    2023-11-11 06:37:14
  • 解决Mybatis中foreach嵌套使用if标签对象取值的问题

    2023-11-23 06:02:02
  • 从java中调用matlab详细介绍

    2023-08-01 14:04:17
  • 浅析Java多线程同步synchronized

    2023-05-20 15:52:29
  • SpringCloud如何创建一个服务提供者provider

    2023-08-01 01:56:33
  • Java用 Rhino/Nashorn 代替第三方 JSON 转换库

    2023-11-04 02:20:26
  • Java Timer使用讲解

    2023-11-28 20:30:33
  • Java实现FTP文件与文件夹的上传和下载

    2023-09-17 09:45:50
  • Java使用二分法进行查找和排序的示例

    2023-03-16 10:16:37
  • Java8时间api之LocalDate/LocalDateTime的用法详解

    2023-11-10 16:14:43
  • SpringMVC解析JSON请求数据问题解析

    2023-06-02 21:41:32
  • Java单例模式下的MongoDB数据库操作工具类

    2023-11-20 12:55:01
  • SpringCloud分布式链路跟踪的方法

    2023-11-24 23:42:19
  • java实现微信扫码支付功能

    2023-11-09 18:38:08
  • 解决mybatis-plus自动配置的mapper.xml与java接口映射问题

    2023-08-25 04:16:02
  • Mybatis配置之typeAlias标签的用法

    2023-11-27 20:18:20
  • java实现日期拆分的方法

    2023-06-19 00:28:59
  • java的Arrays工具类实战

    2023-08-21 15:46:36
  • asp之家 软件编程 m.aspxhome.com