Java动态线程池插件dynamic-tp集成zookeeper

作者:Redick01 时间:2023-11-25 03:41:38 

前言

dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,在配置中心的支持上最开始的时候支持NacosApollo,由于笔者公司用的配置中心是Zookeeper,所以就想着扩展支持Zookeeper,在了解源码支持发现dynamic-tp的扩展能力做的很好,提供了扩展接口,只要我开发对应的配置中心模块即可,最终笔者实现了Zookeeper的支持并贡献到社区。接下来我通过源码解析方式介绍下Zookeeper配置中心的接入。

配置刷新

dynamic-tp提供了一个刷新配置的接口Refresher,抽象类AbstractRefresher实现刷新配置接口的刷新配置方法refresh,该方法能根据配置类型内容和配置解析配置并刷新动态线程池的相关配置,由DtpRegistry负责刷新线程池配置,事件发布订阅模式操作Web容器参数,代码如下:

public interface Refresher {
   /**
    * Refresh with specify content.
    * @param content content
    * @param fileType file type
    */
   void refresh(String content, ConfigFileTypeEnum fileType);
}
@Slf4j
public abstract class AbstractRefresher implements Refresher {
   @Resource
   private DtpProperties dtpProperties;
   @Resource
   private ApplicationEventMulticaster applicationEventMulticaster;
   @Override
   public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) {
       if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) {
           return;
       }
       try {
           // 根据配置内容和配置类型将配置内容转成Map
           val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum);
           doRefresh(prop);
       } catch (IOException e) {
           log.error("DynamicTp refresh error, content: {}, fileType: {}",
                   content, fileTypeEnum, e);
       }
   }
   private void doRefresh(Map<Object, Object> properties) {
       // 将Map中的配置转换成DtpProperties
       ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
       Binder binder = new Binder(sources);
       ResolvableType type = ResolvableType.forClass(DtpProperties.class);
       Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
       binder.bind(MAIN_PROPERTIES_PREFIX, target);
       // 刷新动态线程池配置
       DtpRegistry.refresh(dtpProperties);
       // 发布刷新实现,该事件用于控制Web容器线程池参数控制
       publishEvent();
   }
   private void publishEvent() {
       RefreshEvent event = new RefreshEvent(this, dtpProperties);
       applicationEventMulticaster.multicastEvent(event);
   }
}

Zookeeper配置中心接入扩展实现

基于AbstractRefresher就可以实现Zookeeper配置中心的扩展了,Zookeeper的扩展实现继承AbstractRefresherZookeeper的扩展实现只需要监听配置中心的配置变更即可拿到配置内容,然后通过refresh刷新配置即可。代码如下:

ZookeeperRefresher继承AbstractRefresher,实现InitializingBeanafterPropertiesSet方法逻辑从配置DtpProperties获取Zookeeper的配置信息,CuratorFrameworkFactory创建客户端,设置 * ,这里有两种 * ,一个是连接监听ConnectionStateListener,一个是节点变动监听CuratorListener,出发监听后loadNode负责从Zookeeper获取配置文件配置并组装配置内容,然后通过refresh刷新配置,注意,Zookeeper配置目前配置类型仅支持properties

@Slf4j
public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean {
   @Resource
   private DtpProperties dtpProperties;
   private CuratorFramework curatorFramework;
   @Override
   public void afterPropertiesSet() throws Exception {
       DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper();
       curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(),
               new ExponentialBackoffRetry(1000, 3));
       String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(),
               zookeeper.getConfigVersion()), zookeeper.getNode());
       final ConnectionStateListener connectionStateListener = (client, newState) -> {
           if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
               loadNode(nodePath);
           }};
       final CuratorListener curatorListener = (client, curatorEvent) -> {
           final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
           if (null != watchedEvent) {
               switch (watchedEvent.getType()) {
                   case NodeChildrenChanged:
                   case NodeDataChanged:
                       loadNode(nodePath);
                       break;
                   default:
                       break;
               }
           }};
       curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
       curatorFramework.getCuratorListenable().addListener(curatorListener);
       curatorFramework.start();
       log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);
   }
   /**
    * load config and refresh
    * @param nodePath config path
    */
   public void loadNode(String nodePath) {
       try {
           final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
           final List<String> children = childrenBuilder.watched().forPath(nodePath);
           StringBuilder content = new StringBuilder();
           children.forEach(c -> {
               String n = ZKPaths.makePath(nodePath, c);
               final String nodeName = ZKPaths.getNodeFromPath(n);
               final GetDataBuilder data = curatorFramework.getData();
               String value = "";
               try {
                   value = new String(data.watched().forPath(n), StandardCharsets.UTF_8);
               } catch (Exception e) {
                   log.error("zk config value watched exception.", e);
               }
               content.append(nodeName).append("=").append(value).append("\n");
           });
           refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES);
       } catch (Exception e) {
           log.error("load zk node error, nodePath is {}", nodePath, e);
       }
   }
}

来源:https://blog.csdn.net/qq_31279701/article/details/123551397

标签:Java,dynamic,tp,zookeeper
0
投稿

猜你喜欢

  • 修改maven本地仓库路径的方法

    2022-08-09 13:44:16
  • JAVA IDEA 打开assert 设置方式

    2022-08-19 13:48:49
  • 简单了解JAVA public class与class区别

    2023-11-15 23:59:26
  • 浅析Java中的异常处理机制

    2021-08-19 05:42:48
  • Android实现控制摄像头拍照

    2022-06-03 02:14:06
  • Java中的Struts2框架拦截 器之实例代码

    2023-06-21 19:04:03
  • mybatis-generator自动生成dao、mapping、bean配置操作

    2023-08-17 14:05:30
  • SpringBoot集成整合JWT与Shiro流程详解

    2022-09-06 06:33:23
  • C#算法之散列表

    2022-07-30 19:05:52
  • 浅谈Spring Boot 开发REST接口最佳实践

    2021-10-08 12:24:35
  • c#解压文件的实例方法

    2022-04-25 09:49:40
  • Java实现双色球抽奖随机算法示例

    2023-04-25 12:15:38
  • java面试try-with-resources问题解答

    2023-09-03 15:08:01
  • Spring Boot 整合 Apache Dubbo的示例代码

    2021-10-09 03:52:07
  • Android下拉刷新控件SwipeRefreshLayout源码解析

    2023-04-03 20:42:16
  • VS2022调试通过海康摄像头烟火识别SDK的实现

    2022-09-04 05:34:26
  • SpringBoot搭建多数据源的实现方法

    2022-07-02 18:57:04
  • mybatis一直加载xml,找到错误的解决方案

    2022-08-12 14:55:26
  • Spring IOC:CreateBean环节中的流程转换

    2022-06-10 12:28:31
  • Android RecyclerView实现滑动删除

    2022-11-20 19:50:42
  • asp之家 软件编程 m.aspxhome.com