TC 集群Seata1.6高可用架构源码解析

作者:Applehope 时间:2022-04-18 05:02:34 

一、背景

TC 集群具有高可用架构,应用到集群是这样一个间接的关系:应用 -》事务分组 -》TC 集群,应用启动后所指定的事务分组不能变,可通过配置中心变更事务分组所属的 TC 集群,Seata 客户端监听到这个变更后,会切换到新的 TC 集群。

TC 集群Seata1.6高可用架构源码解析

本篇从源码梳理这个高可用能力是如何实现的。

二、环境配置

客户端配置使用nacos配置中心和nacos注册中心,

seata:
 enabled: true
 # Seata 应用编号
 application-id: seataclistock
 # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
 tx-service-group: tx_group_stock
 # 关闭自动代理
 enable-auto-data-source-proxy: false
 config:
   # support: nacos, consul, apollo, zk, etcd3
   type: nacos
   nacos:
     serverAddr:
     namespace: seata # 需要与服务端添加的配置文件相同
     group: SEATA_GROUP_ROCKTEST
     username: seata
     password: seata
     data-id: seataClient.tx_group_busin.properties
 registry:
   # support: nacos, eureka, redis, zk, consul, etcd3, sofa
   type: nacos
   nacos:
     application: seata-server
     serverAddr:
     namespace: seata  # 需要与服务端添加的配置文件相同
     group: SEATA_GROUP_ROCKTEST   # 需要与服务端添加的配置文件相同
     username: seata
     password: seata

三、从配置中心获取TC集群

服务注册的能力要依赖配置中心,从nacos的配置中心获取配置NacosConfiguration#initSeataConfig

Data Id:seataClient.tx_group_stock.properties

Group:SEATA_GROUP_LWKTEST

其中的service.vgroupMapping.tx_group_stock的值是dev_cluster_1,接下来注册能力就要使用这个集群来工作。

private static void initSeataConfig() {
   try {
       String nacosDataId = getNacosDataId();
       String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
       if (StringUtils.isNotBlank(config)) {
           seataConfig = ConfigProcessor.processConfig(config, getNacosDataType());
           NacosListener nacosListener = new NacosListener(nacosDataId, null);
           configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
       }
   } catch (NacosException | IOException e) {
       LOGGER.error("init config properties error", e);
   }
}

RegistryFactory#getInstance()这是个单例机制,所以源码梳理起来很简单,下边获取TC服务的时候会调用此单例方法做初始化。

  • 读取配置文件中registry.type,

  • 配置的值是nacos,所以读出的值是nacos

  • 通过SPI加载并实例化 NacosRegistryProvider

public class RegistryFactory {
   /**
    * Gets instance.
    *
    * @return the instance
    */
   public static RegistryService getInstance() {
       return RegistryFactoryHolder.INSTANCE;
   }
   private static RegistryService buildRegistryService() {
       RegistryType registryType;
       //registryTypeName = "registry.type"
       String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
           ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
               + ConfigurationKeys.FILE_ROOT_TYPE);
       try {
           // nacos
           registryType = RegistryType.getType(registryTypeName);
       } catch (Exception exx) {
           throw new NotSupportYetException("not support registry type: " + registryTypeName);
       }
       // 通过SPI 加载并实例化 NacosRegistryProvider
       return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
   }
   private static class RegistryFactoryHolder {
       private static final RegistryService INSTANCE = buildRegistryService();
   }
}

TM、RM 客户端需要与TC通信,所以在其初始化时必然会有获取TC集群的逻辑,对应在源码TmNettyRemotingClient#init 中的reconnect方法。

@Override
public void init() {
   // registry processor
   registerProcessor();
   if (initialized.compareAndSet(false, true)) {
       //父类中会开启定时任务来执行 getClientChannelManager().reconnect(transactionServiceGroup)
       super.init();
       if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
           getClientChannelManager().reconnect(transactionServiceGroup);
       }
   }
}

reconnect中的.NettyClientChannelManager#getAvailServerList 是根据seata.tx-service-group的值来检索TC集群信息。直接提供出来调用堆栈,方便大家快速熟悉调用链路:

getServiceGroup:111, RegistryService (io.seata.discovery.registry)
lookup:145, NacosRegistryServiceImpl (io.seata.discovery.registry.nacos)
getAvailServerList:257, NettyClientChannelManager (io.seata.core.rpc.netty)
reconnect:171, NettyClientChannelManager (io.seata.core.rpc.netty)
init:198, TmNettyRemotingClient (io.seata.core.rpc.netty)
init:47, TMClient (io.seata.tm)
initClient:220, GlobalTransactionScanner (io.seata.spring.annotation)
afterPropertiesSet:512, GlobalTransactionScanner (io.seata.spring.annotation)

这里便是通过Seata客户端 seata.tx-service-group的值,找到最终TC集群的关键之处。在getServiceGroup中从nacos中获取service.vgroupMapping.tx_group_stock的值,即dev_cluster_1

default String getServiceGroup(String key) {
   //key = service.vgroupMapping.tx_group_stock
   key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
   if (!SERVICE_GROUP_NAME.contains(key)) {
       ConfigurationCache.addConfigListener(key);
       SERVICE_GROUP_NAME.add(key);
   }
   return ConfigurationFactory.getInstance().getConfig(key);
}

然后NettyClientChannelManager#reconnect中获取 TC 集群中的所有 TC 服务节点,对每个TC 服务节点建连。

for (String serverAddress : availList) {
   try {
       acquireChannel(serverAddress);
       channelAddress.add(serverAddress);
   } catch (Exception e) {
       LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
           serverAddress, e.getMessage(), e);
   }
}

再梳理一下,梳理这么多的关键就是通过tx_group_stock 找到 TC 集群 dev_cluster_1

客户端:

seata:
 # 默认关闭,如需启用spring.datasource.dynami.seata需要同时开启
 enabled: true
 # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应
 tx-service-group: tx_group_stock

nacos:

Data ID: seataClient.tx_group_stock.properties
Group: SEATA_GROUP_ROCKTEST
配置内容:
   ...
   service.vgroupMapping.tx_group_stock=dev_cluster_1
   ...

TC服务端:

registry:
 # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa
 type: nacos
 preferred-networks: 30.240.*
 nacos:
   application: seata-server
   cluster: dev_cluster_1

RegistryService#getServiceGroup中从nacos获取值的时候,有个细节需要注意:通过namespace + Group + Key(Data Id) 三维来唯一标示一个Key。

四、刷新TC集群

AbstractNettyRemotingClient#init中默认会每隔10s进行一次 TC 服务清单刷新与重连

timerExecutor.scheduleAtFixedRate(new Runnable() {
   @Override
   public void run() {
       clientChannelManager.reconnect(getTransactionServiceGroup());
   }
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

来源:https://juejin.cn/post/7180243089527668797

标签:TC,集群,Seata,架构
0
投稿

猜你喜欢

  • java数据结构-堆实现优先队列

    2023-11-25 08:30:20
  • 教你用Java GUI实现文本文件的读写

    2023-05-25 06:47:13
  • java实现大文件分割与合并的实例代码

    2023-11-11 04:31:21
  • Java 判断字符串中是否包含中文的实例详解

    2023-11-06 13:17:18
  • java根据网络地址保存图片的方法

    2021-09-01 18:37:02
  • springmvc 防止表单重复提交的两种方法

    2023-03-27 17:57:18
  • java.lang.NoClassDefFoundError错误解决办法

    2021-12-29 03:52:27
  • java类加载相关知识总结

    2023-10-19 13:31:57
  • TransmittableThreadLocal解决线程间上下文传递烦恼

    2023-11-09 17:09:35
  • java多线程加锁以及Condition类的使用实例解析

    2023-08-07 07:25:30
  • java.text.DecimalFormat用法详解

    2022-09-30 03:02:27
  • Java设计模式初识之备忘录模式详解

    2023-08-29 23:27:09
  • Java容器HashMap与HashTable详解

    2022-03-05 19:25:00
  • c# 在windows中操作IIS设置FTP服务器的示例

    2023-07-18 06:13:01
  • Unity实现场景漫游相机

    2023-06-15 19:08:57
  • Flutter 分页功能表格控件详细解析

    2023-09-22 20:02:45
  • IOS与网页JS交互详解及实例

    2023-07-08 11:58:20
  • Java 数据结构与算法系列精讲之时间复杂度与空间复杂度

    2022-03-19 20:19:50
  • 浅谈Java中复制数组的方式

    2022-04-14 23:30:27
  • Java实现图片倒影的源码实例内容

    2022-08-30 02:39:24
  • asp之家 软件编程 m.aspxhome.com