Seata AT模式如何实现行锁详解

作者:梦想实现家_Z 时间:2022-11-18 23:43:34 

前言

我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以在官网看到这样一个例子:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

Seata AT模式如何实现行锁详解

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

Seata AT模式如何实现行锁详解

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

那么你知道Seata AT模式是如何实现行锁的嘛?为了搞明白AT模式到底是怎么获取全局锁的,我们深入源码来看看。

如何加锁

为了证实全局锁就是我们所说的行锁,经过一番寻找,我在BaseTransactionalExecutor类中的prepareUndoLog()方法中找到了这样一段代码:

TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
   connectionProxy.appendLockKey(lockKeys);
   SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
   connectionProxy.appendUndoLog(sqlUndoLog);
}
  • 如果是删除的SQL,那么通过beforeImage生成行锁标记,否则通过afterImage生成行锁标记;

比如表名wallet_tbl,里面有一个主键id值为1,那么最终生成的lockKeyswallet_tbl:1,如果有多行记录id值分别为1、2、3,那么最终生成的lockKeyswallet_tbl:1,2,3;多个主键索引的话使用_连接。所以我们可以总结出lockKeys的生成规则为:tableName:1_A,2_B,3_C123ABC分别为主键索引的值。

此时还没有真正地拿到锁,只是生成一个锁的标记。真正地上锁需要查看ConnectionProxy.register()方法:

private void register() throws TransactionException {
   if (!context.hasUndoLog() || !context.hasLockKey()) {
       return;
   }
   Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
   context.setBranchId(branchId);
}

branchRegister()方法就是RMTC进行分支注册,同时会申请行锁。那么获取行锁的核心代码应该就是在TC端了,我们顺着branchRegister()逻辑一路找到BranchSession.lock()

public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
       if (this.getBranchType().equals(BranchType.AT)) {
           // 只有AT模式需要获取行锁
           return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
       }
       return true;
   }

下面就要真正地开始进入LockerManager来申请锁了:

@Override
   public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
       if (branchSession == null) {
           throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
       }
       String lockKey = branchSession.getLockKey();
       if (StringUtils.isNullOrEmpty(lockKey)) {
           // no lock
           return true;
       }
       // get locks of branch
       // 将lockKey解析成多行RowLock
       List<RowLock> locks = collectRowLocks(branchSession);
       if (CollectionUtils.isEmpty(locks)) {
           // no lock
           return true;
       }
       return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
   }

这里做了一步将lockKey解析成多行RowLock,根据上面的tableName:1_A,2_B,3_C规则,最终解析成3个RowLock对象:{tableName,1_A},{tableName,2_B},{tableName,3_C}

最终我们追踪到最后一个关键方法LockStoreDataBaseDAO.acquireLock()

@Override
   public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;
       Set<String> dbExistedRowKeys = new HashSet<>();
       boolean originalAutoCommit = true;
       // 如果有多行锁,那么先去重
       if (lockDOs.size() > 1) {
           lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
       }
       try {
           conn = lockStoreDataSource.getConnection();
           if (originalAutoCommit = conn.getAutoCommit()) {
               conn.setAutoCommit(false);
           }
           List<LockDO> unrepeatedLockDOs = lockDOs;

           //check lock
           if (!skipCheckLock) {

               boolean canLock = true;
               // 查询是否已经存在行锁
               // "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc"
               // in里面最多限制1000个
               String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
               ps = conn.prepareStatement(checkLockSQL);
               for (int i = 0; i < lockDOs.size(); i++) {
                   ps.setString(i + 1, lockDOs.get(i).getRowKey());
               }
               rs = ps.executeQuery();
               String currentXID = lockDOs.get(0).getXid();
               boolean failFast = false;
               while (rs.next()) {
                   String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                   // 如果发现有其他分布式事务和当前申请行锁的数据一致,那么加锁失败
                   if (!StringUtils.equals(dbXID, currentXID)) {
                       if (LOGGER.isInfoEnabled()) {
                           String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                           String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                           long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                           LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                       }
                       if (!autoCommit) {
                           int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                           if (status == LockStatus.Rollbacking.getCode()) {
                               failFast = true;
                           }
                       }
                       // 加锁失败
                       canLock = false;
                       break;
                   }

                   dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
               }
               // 加锁失败,回滚抛异常
               if (!canLock) {
                   conn.rollback();
                   if (failFast) {
                       throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                   }
                   return false;
               }
               // 如果是同一个分布式事务中申请行锁,那么剔除重复的锁数据
               if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                   unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                           .collect(Collectors.toList());
               }
               // 如果剔除后不需要再补充行锁,那么直接返回申请成功
               if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                   conn.rollback();
                   return true;
               }
           }

           // 申请行锁,分1行和多行两种情况
           if (unrepeatedLockDOs.size() == 1) {
               LockDO lockDO = unrepeatedLockDOs.get(0);
               if (!doAcquireLock(conn, lockDO)) {
                   if (LOGGER.isInfoEnabled()) {
                       LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                   }
                   conn.rollback();
                   return false;
               }
           } else {
               if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                   if (LOGGER.isInfoEnabled()) {
                       LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                           unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                   }
                   conn.rollback();
                   return false;
               }
           }
           conn.commit();
           return true;
       } catch (SQLException e) {
           throw new StoreException(e);
       } finally {
           IOUtil.close(rs, ps);
           if (conn != null) {
               try {
                   if (originalAutoCommit) {
                       conn.setAutoCommit(true);
                   }
                   conn.close();
               } catch (SQLException e) {
               }
           }
       }
   }

1.先通过查询语句检查是否存在锁冲突,锁冲突的话,就直接失败抛异常;

2.不存在锁冲突,检查是否锁重入,重入的话,补充行锁;

3.添加行锁;

检查锁冲突的SQL语句如下:

select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc

添加行锁SQL语句如下:

insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)

为什么是行锁

根据上面加锁的逻辑,我们发现一直比较的都是row_key这个主键,那么为什么row_key代表的是行锁呢?这个问题就要回到row_key是如何产生的:

protected LockDO convertToLockDO(RowLock rowLock) {
       LockDO lockDO = new LockDO();
       lockDO.setBranchId(rowLock.getBranchId());
       lockDO.setPk(rowLock.getPk());
       lockDO.setResourceId(rowLock.getResourceId());
       // row_key的生成
       lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
       lockDO.setXid(rowLock.getXid());
       lockDO.setTransactionId(rowLock.getTransactionId());
       lockDO.setTableName(rowLock.getTableName());
       return lockDO;
   }

根据上面代码,我们很清楚地了解到,row_key是由resource_idtableNamepk这三个字段连接生成的,也就意味着row_key是代表表里面的具体一行数据,也就是我们的行记录,所以我们确信AT模式的全局锁其实就是行锁。

来源:https://juejin.cn/post/7164254193362927624

标签:Seata,AT,模式,行锁
0
投稿

猜你喜欢

  • android教程之service使用方法示例详解

    2023-05-08 03:48:21
  • java基础的详细了解第三天

    2023-10-05 23:47:04
  • Java并发之线程池Executor框架的深入理解

    2022-03-13 10:20:55
  • Java实现读取文章中重复出现的中文字符串

    2022-04-27 04:29:05
  • Android动态布局小结

    2021-10-17 12:04:39
  • C#中timer定时器用法实例

    2023-01-23 02:23:17
  • java实现十六进制字符unicode与中英文转换示例

    2021-05-29 18:19:29
  • 如何在Spring Boot中使用MQTT

    2023-10-08 20:39:13
  • MyBatis中的resultMap简要概述

    2023-11-16 06:51:34
  • Quarkus中RESTEasy Reactive集成合并master分支

    2023-06-07 14:20:45
  • C#获取图片文件扩展名的方法

    2022-03-06 04:27:43
  • java实现日历(某年的日历,某月的日历)用户完全自定义

    2023-04-17 15:14:29
  • Java将Date日期类型字段转换成json字符串的方法

    2023-02-18 19:57:09
  • Java超详细讲解设计模式中的命令模式

    2023-07-26 15:23:11
  • Java中String类的常用方法总结

    2021-11-26 10:39:20
  • Android开发实现webview中img标签加载本地图片的方法

    2023-03-15 01:33:40
  • Android仿百度外卖自定义下拉刷新效果

    2022-05-24 06:32:45
  • IDEA Error:java:无效的源发行版:13的解决过程

    2023-11-25 10:07:19
  • java多线程Future和Callable类示例分享

    2021-09-02 09:49:37
  • Unity实现换装系统

    2021-08-11 15:27:15
  • asp之家 软件编程 m.aspxhome.com