Seata AT模式TransactionHook被删除探究

作者:梦想实现家_Z 时间:2022-01-12 14:56:49 

前言

兄弟们,刚刚又给seata社区修了一个BUG,有用户提了issue反应TransactionHook在某些情况下不会被调用:

Seata AT模式TransactionHook被删除探究

相关issue链接:github.com/seata/seata…,该用户在issue中已经指出了相关问题所在:

Seata AT模式TransactionHook被删除探究

下面我们来看一下到底是什么原因导致了上述BUG的产生。

问题定位

根据用户的反馈,我们找到目标源码io.seata.tm.api.TransactionalTemplate#execute()

try {
   // 开启分布式事务,获取XID        
   beginTransaction(txInfo, tx);
   Object rs;
   try {
       // 执行业务代码
       rs = business.execute();
   } catch (Throwable ex) {
       // 3. 处理异常,准备回滚.
       completeTransactionAfterThrowing(txInfo, tx, ex);
       throw ex;
   }
   // 4. 提交事务.
   commitTransaction(tx, txInfo);
   return rs;
} finally {
   //5. 回收现场
   resumeGlobalLockConfig(previousConfig);
   triggerAfterCompletion();
   cleanUp();
}

问题代码就出在cleanUp()中,我们来看一下里面做了什么操作,最终我们定位到:

public final class TransactionHookManager {
 private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();
 // 注册TransactionHook
 public static void registerHook(TransactionHook transactionHook) {
     if (transactionHook == null) {
           throw new NullPointerException("transactionHook must not be null");
       }
       List<TransactionHook> transactionHooks = LOCAL_HOOKS.get();
       if (transactionHooks == null) {
           LOCAL_HOOKS.set(new ArrayList<>());
       }
       LOCAL_HOOKS.get().add(transactionHook);
   }
 // 移除当前线程上所有TransactionHook
 public static void clear() {
     LOCAL_HOOKS.remove();
 }
}

由上面的源码可知,cleanUp()操作时把当前线程中的所有TransactionHook都清除掉了。也就是说,假如事务A和事务B共用同一个线程,当事务B处理完毕后,调用了cleanUp()回收现场时,把该线程当中存储的所有TransactionHook全部清除掉了,导致事务A的生命周期中找不到该事务对应的TransactionHook,从而产生了BUG

如何解决

通过与seata社区的大佬不断地沟通,最终敲定以下方案:

1.改造TransactionHookManager.LOCAL_HOOKS,把数据类型改成ThreadLocal<Map<String, List<TransactionHook>>>Map中的key对应分布式事务XID

2.针对当前上下文中没有XID,那么key就为null,因为HashMap允许keynull

3.当用户查询指定XID下的hook时,连同keynull对应的hook也一起返回;

  • 第一步比较好理解,因为事务A和事务B对应的TransactionHook没有被区分出来,所以造成了清理事务B的TransactionHook时连同事务A的TransactionHook一起被清除,那么我们修改数据结构来区分事务A和事务B的TransactionHook,以便清理的时候不会造成误删;

第二步为什么要针对没有XID的时候也要能设置TransactionHook,因为有这么一段代码:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
       try {
           // 执行triggerBeforeBegin()
           triggerBeforeBegin();
           // 注册分布式事务,生成XID
           tx.begin(txInfo.getTimeOut(), txInfo.getName());
           // 执行triggerAfterBegin()
           triggerAfterBegin();
       } catch (TransactionException txe) {
           throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.BeginFailure);
       }
   }

上面的代码会产生一个问题,因为我们的TransactionHook依赖于XID,但是triggerBeforeBegin()执行的时候还没有产生XID,所以为了能够在没有XID的时候也能够让TransactionHook生效,我们要有一个虚值key来临时设置TransactionHook

第三步的设计时为了在第二步的基础上,当事务开启后获取XID后,要保证XID获取前注册的TransactionHook也要生效,我们在通过XID查询TransactionHook时要把虚值key对应的TransactionHook也一起返回;

注意事项

在实际代码修改中,发现triggerAfterCommit()triggerAfterRollback()triggerAfterCompletion()在被调用时始终拿不到对应的TransactionHook,最终debug下来发现在调用这三个方法前,上下文中的XID被解绑了,导致拿到的XID为空。代码类似下面这样:

try {
           // 调用triggerBeforeCommit()
           triggerBeforeCommit();
           // 提交事务,清除XID
           tx.commit();
           if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
               throw new TransactionalExecutor.ExecutionException(tx,
                       new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                       TransactionalExecutor.Code.TimeoutRollback);
           }
           // 调用triggerAfterCommit()
           triggerAfterCommit();
       } catch (TransactionException txe) {
           // 4.1 Failed to commit
           throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.CommitFailure);
       }

不过经过我的一番查找,发现GlobalTransaction中是包含XID属性的,所以果断从GlobalTransaction对象中取XID传进来。

修改后的代码如下:

try {
           // 调用triggerBeforeCommit()
           triggerBeforeCommit();
           // 提交事务,清除XID
           tx.commit();
           if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
               throw new TransactionalExecutor.ExecutionException(tx,
                       new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
                       TransactionalExecutor.Code.TimeoutRollback);
           }
           // 调用triggerAfterCommit()
           triggerAfterCommit(tx.getXid());
       } catch (TransactionException txe) {
           // 4.1 Failed to commit
           throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.CommitFailure);
       }

改造后的TransactionHookManager

public final class TransactionHookManager {
   private TransactionHookManager() {
   }
   private static final ThreadLocal<Map<String, List<TransactionHook>>> LOCAL_HOOKS = new ThreadLocal<>();
   /**
    * get the current hooks
    *
    * @return TransactionHook list
    */
   public static List<TransactionHook> getHooks() {
       String xid = RootContext.getXID();
       return getHooks(xid);
   }
   /**
    * get hooks by xid
    *
    * @param xid
    * @return TransactionHook list
    */
   public static List<TransactionHook> getHooks(String xid) {
       Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
       if (hooksMap == null || hooksMap.isEmpty()) {
           return Collections.emptyList();
       }
       List<TransactionHook> hooks = new ArrayList<>();
       List<TransactionHook> localHooks = hooksMap.get(xid);
       if (StringUtils.isNotBlank(xid)) {
           List<TransactionHook> virtualHooks = hooksMap.get(null);
           if (virtualHooks != null && !virtualHooks.isEmpty()) {
               hooks.addAll(virtualHooks);
           }
       }
       if (localHooks != null && !localHooks.isEmpty()) {
           hooks.addAll(localHooks);
       }
       if (hooks.isEmpty()) {
           return Collections.emptyList();
       }
       return Collections.unmodifiableList(hooks);
   }
   /**
    * add new hook
    *
    * @param transactionHook transactionHook
    */
   public static void registerHook(TransactionHook transactionHook) {
       if (transactionHook == null) {
           throw new NullPointerException("transactionHook must not be null");
       }
       Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
       if (hooksMap == null) {
           hooksMap = new HashMap<>();
           LOCAL_HOOKS.set(hooksMap);
       }
       String xid = RootContext.getXID();
       List<TransactionHook> hooks = hooksMap.get(xid);
       if (hooks == null) {
           hooks = new ArrayList<>();
           hooksMap.put(xid, hooks);
       }
       hooks.add(transactionHook);
   }
   /**
    * clear hooks by xid
    *
    * @param xid
    */
   public static void clear(String xid) {
       Map<String, List<TransactionHook>> hooksMap = LOCAL_HOOKS.get();
       if (hooksMap == null || hooksMap.isEmpty()) {
           return;
       }
       hooksMap.remove(xid);
       if (StringUtils.isNotBlank(xid)) {
           hooksMap.remove(null);
       }
   }
}

来源:https://juejin.cn/post/7165835702662496287

标签:Seata,AT,删除,TransactionHook
0
投稿

猜你喜欢

  • springboot+mybatis+redis 二级缓存问题实例详解

    2022-08-09 09:06:36
  • 分析Java设计模式之组合模式

    2023-11-25 13:13:30
  • Java语言读取配置文件config.properties的方法讲解

    2023-09-29 14:45:51
  • Spring Boot产生环形注入的解决方案

    2023-11-08 20:14:04
  • SpringBoot注解梳理(小结)

    2023-11-10 13:27:19
  • java自定义ClassLoader加载指定的class文件操作

    2022-03-16 16:19:07
  • 深入解析java中的locale

    2023-11-09 18:14:20
  • Flutter刷新组件RefreshIndicator自定义样式demo

    2023-07-06 15:56:45
  • 使用Jackson反序列化遇到的问题及解决

    2023-11-13 21:12:14
  • java中sleep方法和wait方法的五个区别

    2023-08-27 18:37:23
  • MyBatis框架迭代器模式实现原理解析

    2021-08-07 13:56:00
  • java实现手写一个简单版的线程池

    2022-09-29 04:59:32
  • Java去掉小数点后面无效0的方案与建议

    2023-11-29 11:46:57
  • Git工具 conflict冲突问题解决方案

    2023-07-27 10:27:30
  • java多线程通过CompletableFuture组装异步计算单元

    2023-07-19 10:15:42
  • SpringSecurity解决POST方式下CSRF问题

    2023-07-18 18:59:51
  • SpringBoot整合Mybatis实现多数据源配置与跨数据源事务实例

    2023-06-29 23:47:34
  • Java编程调用微信接口实现图文信息推送功能

    2023-11-25 07:20:47
  • 简单谈谈java的异常处理(Try Catch Finally)

    2021-08-01 12:40:02
  • Spring之spring-context-indexer依赖详解

    2023-11-23 12:21:41
  • asp之家 软件编程 m.aspxhome.com