DUCC配置平台实现一个动态化线程池示例代码

作者:京东云开发者 时间:2023-11-28 12:07:39 

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

DUCC配置平台实现一个动态化线程池示例代码

setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

DUCC配置平台实现一个动态化线程池示例代码

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
* 动态线程池
*
*/
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
   /**
    * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
    */
   private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
   /**
    * @param threadPoolBeanName
    * @param threadPoolTaskExecutor
    */
   public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
       log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
       DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
   }
   @Override
   public void afterPropertiesSet() throws Exception {
       this.refresh();
       //创建定时任务线程池
       ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
       //延迟1秒执行,每个1分钟check一次
       executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
   }
   private void refresh() {
       String dynamicThreadPool = "";
       try {
           if (DTP_REGISTRY.isEmpty()) {
               log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
               return;
           }
           dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
           if (StringUtils.isBlank(dynamicThreadPool)) {
               log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
               return;
           }
           log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
           List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
           });
           if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
               log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
               return;
           }
           for (ThreadPoolProperties properties : threadPoolPropertiesList) {
               doRefresh(properties);
           }
       } catch (Exception e) {
           log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
       }
   }
   /**
    * @param properties
    */
   private void doRefresh(ThreadPoolProperties properties) {
       if (StringUtils.isBlank(properties.getThreadPoolBeanName())
               || properties.getCorePoolSize() < 1
               || properties.getMaxPoolSize() < 1
               || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
           log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
           return;
       }
       DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
       if (Objects.isNull(threadPoolTaskExecutor)) {
           log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
           return;
       }
       ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
       if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
               && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
           log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
           return;
       }
       if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
           threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
           log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
       }
       if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
           threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
           log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
       }
       ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
       log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
   }
   private class RefreshThreadPoolConfig extends TimerTask {
       private RefreshThreadPoolConfig() {
       }
       @Override
       public void run() {
           DynamicThreadPoolRefresh.this.refresh();
       }
   }
}

线程池配置类

@Data
public class ThreadPoolProperties {
   /**
    * 线程池名称
    */
   private String threadPoolBeanName;
   /**
    * 线程池核心线程数量
    */
   private int corePoolSize;
   /**
    * 线程池最大线程池数量
    */
   private int maxPoolSize;
}

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

ducc配置平台使用见:cf.jd.com/pages/viewp&hellip;

动态线程池配置key:dynamic.thread.pool

配置value:

[  {    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",    "corePoolSize": 32,    "maxPoolSize": 128  }]

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
   @Override
   public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
       if (bean instanceof DynamicThreadPoolTaskExecutor) {
           DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
       }
       return bean;
   }
}

3.动态线程池应用

动态线程池Bean声明

<!-- 普通线程池 -->
   <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
       <!-- 核心线程数,默认为 -->
       <property name="corePoolSize" value="128"/>
       <!-- 最大线程数,默认为Integer.MAX_VALUE -->
       <property name="maxPoolSize" value="512"/>
       <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
       <property name="queueCapacity" value="500"/>
       <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
       <property name="keepAliveSeconds" value="60"/>
       <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
       <property name="rejectedExecutionHandler">
           <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
           <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
           <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
           <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
           <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
       </property>
   </bean>
   <!-- 动态线程池 -->
   <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
       <!-- 核心线程数,默认为 -->
       <property name="corePoolSize" value="32"/>
       <!-- 最大线程数,默认为Integer.MAX_VALUE -->
       <property name="maxPoolSize" value="128"/>
       <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
       <property name="queueCapacity" value="500"/>
       <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
       <property name="keepAliveSeconds" value="60"/>
       <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
       <property name="rejectedExecutionHandler">
           <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
           <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
           <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
           <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
           <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
       </property>
   </bean>
   <!-- 动态线程池刷新配置 -->
   <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
   <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入Spring Bean后,直接使用即可

@Resource
private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
Runnable asyncTask = ()-&gt;{...};
CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

来源:https://juejin.cn/post/7200417924640489532

标签:DUCC,配置平台,动态化,线程池
0
投稿

猜你喜欢

  • 如何清空文件夹里面的所有文件和文件夹

    2022-09-17 04:11:06
  • android TextView加下划线的方法

    2023-09-11 01:12:36
  • C#实现文件压缩与解压的方法示例【ZIP格式】

    2021-12-31 23:25:20
  • Java ConcurrentHashMap的源码分析详解

    2023-05-02 02:16:21
  • SpringBoot扫描不到Controller的解决方案

    2022-07-19 02:05:07
  • Java并发编程之CountDownLatch源码解析

    2023-11-05 02:06:41
  • Java 实现微信和支付宝支付功能

    2023-03-08 23:18:04
  • java实现1M图片压缩优化到100kb实现示例

    2022-08-08 03:59:43
  • C# 对PDF文档加密、解密(基于Spire.Cloud.SDK for .NET)

    2021-11-23 05:37:26
  • 将c#编写的程序打包成应用程序的实现步骤分享(安装,卸载) 图文

    2023-01-30 03:08:50
  • Kotlin协程启动createCoroutine及创建startCoroutine原理

    2023-01-04 03:05:31
  • 使用maven的profile构建不同环境配置的方法

    2023-08-30 23:43:45
  • spring中bean的生命周期详解

    2021-11-29 23:31:02
  • android手机获取gps和基站的经纬度地址实现代码

    2022-04-05 03:03:00
  • 面试必时必问的JVM 类加载机制详解

    2022-06-22 20:37:52
  • Java设计模式之状态模式

    2022-05-08 07:24:25
  • 浅谈Java中的n种随机数产生办法

    2023-12-22 10:36:29
  • 剖析SpringCloud Feign中所隐藏的坑

    2023-11-19 05:32:03
  • C++选择排序算法实例

    2021-10-27 21:43:33
  • Java二维数组实战案例

    2022-08-13 08:59:25
  • asp之家 软件编程 m.aspxhome.com