返回顶部
首页 > 资讯 > 后端开发 > Python >SeataAT模式如何实现行锁详解
  • 518
分享到

SeataAT模式如何实现行锁详解

SeataAT模式实现行锁SeataAT行锁 2022-11-13 19:11:37 518人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录前言如何加锁为什么是行锁前言 我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以在官网看到这

前言

我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以在官网看到这样一个例子:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

那么你知道Seata AT模式是如何实现行锁的嘛?为了搞明白AT模式到底是怎么获取全局锁的,我们深入源码来看看。

如何加锁

为了证实全局锁就是我们所说的行锁,经过一番寻找,我在BaseTransactionalExecutor类中的prepareUndoLog()方法中找到了这样一段代码:

TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
    connectionProxy.appendLockKey(lockKeys);
    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}
  • 如果是删除的SQL,那么通过beforeImage生成行锁标记,否则通过afterImage生成行锁标记;

比如表名wallet_tbl,里面有一个主键id值为1,那么最终生成的lockKeyswallet_tbl:1,如果有多行记录id值分别为1、2、3,那么最终生成的lockKeyswallet_tbl:1,2,3;多个主键索引的话使用_连接。所以我们可以总结lockKeys的生成规则为:tableName:1_A,2_B,3_C123ABC分别为主键索引的值。

此时还没有真正地拿到锁,只是生成一个锁的标记。真正地上锁需要查看ConnectionProxy.reGISter()方法:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
    context.setBranchId(branchId);
}

branchRegister()方法就是RMTC进行分支注册,同时会申请行锁。那么获取行锁的核心代码应该就是在TC端了,我们顺着branchRegister()逻辑一路找到BranchSession.lock()

   public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (this.getBranchType().equals(BranchType.AT)) {
            // 只有AT模式需要获取行锁
            return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
        }
        return true;
    }

下面就要真正地开始进入LockerManager来申请锁了:

@Override
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            // no lock
            return true;
        }
        // get locks of branch
        // 将lockKey解析成多行RowLock
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
    }

这里做了一步将lockKey解析成多行RowLock,根据上面的tableName:1_A,2_B,3_C规则,最终解析成3个RowLock对象:{tableName,1_A},{tableName,2_B},{tableName,3_C}

最终我们追踪到最后一个关键方法LockStoreDataBaseDAO.acquireLock()

@Override
    public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        Set<String> dbExistedRowKeys = new HashSet<>();
        boolean originalAutoCommit = true;
        // 如果有多行锁,那么先去重
        if (lockDOs.size() > 1) {
            lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
        }
        try {
            conn = lockStoreDataSource.getConnection();
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            List<LockDO> unrepeatedLockDOs = lockDOs;
​
            //check lock
            if (!skipCheckLock) {
​
                boolean canLock = true;
                // 查询是否已经存在行锁
                // "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc"
                // in里面最多限制1000个
                String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
                ps = conn.prepareStatement(checkLockSQL);
                for (int i = 0; i < lockDOs.size(); i++) {
                    ps.setString(i + 1, lockDOs.get(i).getRowKey());
                }
                rs = ps.executeQuery();
                String currentXID = lockDOs.get(0).getXid();
                boolean failFast = false;
                while (rs.next()) {
                    String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                    // 如果发现有其他分布式事务和当前申请行锁的数据一致,那么加锁失败
                    if (!StringUtils.equals(dbXID, currentXID)) {
                        if (LOGGER.isInfoEnabled()) {
                            String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                            String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                            long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                            LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                        }
                        if (!autoCommit) {
                            int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                            if (status == LockStatus.Rollbacking.getCode()) {
                                failFast = true;
                            }
                        }
                        // 加锁失败
                        canLock = false;
                        break;
                    }
​
                    dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
                }
                // 加锁失败,回滚抛异常
                if (!canLock) {
                    conn.rollback();
                    if (failFast) {
                        throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                    }
                    return false;
                }
                // 如果是同一个分布式事务中申请行锁,那么剔除重复的锁数据
                if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                    unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                            .collect(Collectors.toList());
                }
                // 如果剔除后不需要再补充行锁,那么直接返回申请成功
                if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                    conn.rollback();
                    return true;
                }
            }
​
            // 申请行锁,分1行和多行两种情况
            if (unrepeatedLockDOs.size() == 1) {
                LockDO lockDO = unrepeatedLockDOs.get(0);
                if (!doAcquireLock(conn, lockDO)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                    }
                    conn.rollback();
                    return false;
                }
            } else {
                if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                            unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                    }
                    conn.rollback();
                    return false;
                }
            }
            conn.commit();
            return true;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(rs, ps);
            if (conn != null) {
                try {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
    }

1.先通过查询语句检查是否存在锁冲突,锁冲突的话,就直接失败抛异常;

2.不存在锁冲突,检查是否锁重入,重入的话,补充行锁;

3.添加行锁;

检查锁冲突的SQL语句如下:

select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc

添加行锁SQL语句如下:

insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)

为什么是行锁

根据上面加锁的逻辑,我们发现一直比较的都是row_key这个主键,那么为什么row_key代表的是行锁呢?这个问题就要回到row_key是如何产生的:

protected LockDO convertToLockDO(RowLock rowLock) {
        LockDO lockDO = new LockDO();
        lockDO.setBranchId(rowLock.getBranchId());
        lockDO.setPk(rowLock.getPk());
        lockDO.setResourceId(rowLock.getResourceId());
        // row_key的生成
        lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
        lockDO.setXid(rowLock.getXid());
        lockDO.setTransactionId(rowLock.getTransactionId());
        lockDO.setTableName(rowLock.getTableName());
        return lockDO;
    }

根据上面代码,我们很清楚地了解到,row_key是由resource_idtableNamepk这三个字段连接生成的,也就意味着row_key是代表表里面的具体一行数据,也就是我们的行记录,所以我们确信AT模式的全局锁其实就是行锁。

以上就是Seata AT模式如何实现行锁详解的详细内容,更多关于Seata AT模式实现行锁的资料请关注编程网其它相关文章!

--结束END--

本文标题: SeataAT模式如何实现行锁详解

本文链接: https://lsjlt.com/news/171224.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • SeataAT模式如何实现行锁详解
    目录前言如何加锁为什么是行锁前言 我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以在官网看到这...
    99+
    2022-11-13
    Seata AT模式实现行锁 Seata AT 行锁
  • 详解redis如何实现分布式锁
    小编这次要给大家分享的是详解redis如何实现分布式锁,文章内容丰富,感兴趣的小伙伴可以来了解一下,希望大家阅读完这篇文章之后能够有所收获。前言系统的不断扩大,分布式锁是最基本的保障。与单机的多线程不一样的...
    99+
    2024-04-02
  • Redis如何实现分布式锁详解
    目录一、前言二、实现原理2.1 加锁2.2 解锁三、通过RedisTemplate实现分布式锁四、通过Redisson实现一、前言 在Java的并发编程中,我们通过锁,来避免由于竞争...
    99+
    2024-04-02
  • Redis实现分布式锁详解
    目录一、前言为什么需要分布式锁?二、基于Redis实现分布式锁为什么redis可以实现分布式锁?如何实现?锁的获取锁的释放三、如何避免死锁?锁的过期时间如何设置?避免死锁锁过期处理释放其他服务的锁如何处理呢?那么redi...
    99+
    2023-04-09
    Redis实现分布式锁 Redis分布式锁
  • 一文详解如何使用Redis实现分布式锁
    目录1. 什么是分布式锁2. 使用Redis实现分布式锁2.1 加锁2.2 释放锁2.3 给锁设置有效期2.4 给锁设置唯一值2.5 通过Lua脚本实现释放锁的原子性3. 小结1. 什么是分布式锁 当我们在编写多线程代码...
    99+
    2024-04-02
  • redis实现分布式锁实例详解
    目录1、业务场景引入2、基础环境准备2.1.准备库存数据库2.2.创建SpringBoot工程,pom.xml中导入依赖,请注意版本。2.3.application.properti...
    99+
    2024-04-02
  • 详解如何在springcloud分布式系统中实现分布式锁
    目录一、简介 二、redis命令介绍 三、实现思路 四、编码实现 五、注意点 六、参考资料 最近在看分布式锁的资料,看了 Josial L的《Redis in Action》的分布式...
    99+
    2024-04-02
  • 如何实现分布式锁
    这篇文章主要讲解了如何实现分布式锁,内容清晰明了,对此有兴趣的小伙伴可以学习一下,相信大家阅读完之后会有帮助。分布式锁三种实现方式:1. 基于数据库实现分布式锁;2. 基于缓存(Redis等)实现分布式锁;...
    99+
    2024-04-02
  • 详解基于redis实现分布式锁
    目录前言原理剖析实现编写注解拦截器拦截上述提及工具问题分析前言 为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchroniz...
    99+
    2024-04-02
  • 基于Zookeeper实现分布式锁详解
    目录1、什么是Zookeeper?2、Zookeeper节点类型3、Zookeeper环境搭建4、Zookeeper基本使用5、Zookeeper应用场景6、Zookeeper分布式...
    99+
    2024-04-02
  • Mysql锁机制中行锁、表锁、死锁如何实现
    这篇文章主要介绍了Mysql锁机制中行锁、表锁、死锁如何实现,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、Mysql锁是什么?锁有哪些类别?锁定义:  ...
    99+
    2023-06-29
  • 如何进行Redlock中Redis分布式锁的实现
    这篇文章将为大家详细讲解有关如何进行Redlock中Redis分布式锁的实现,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。普通实现说道Redis分布式锁大部分人都会想到:setnx+lua,...
    99+
    2023-06-02
  • 如何进行Java的Actor模式的实现
    如何进行Java的Actor模式的实现,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。JActor 2.2.0 RC1 发布,该版本改进了 JLPCActors 的功能,包括...
    99+
    2023-06-17
  • go 分布式锁简单实现实例详解
    目录正文案例资源加锁使用redis来实现分布式锁redis lua保证原子性正文 其实锁这种东西,都能能不加就不加,锁会导致程序一定程度上退回到串行化,进而降低效率。 案例 首先,看...
    99+
    2024-04-02
  • Redisson如何实现分布式锁、锁续约
    这篇文章主要介绍了Redisson如何实现分布式锁、锁续约的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redisson如何实现分布式锁、锁续约文章都会有所收获,下面我们一起来看看吧。一、基础0)Redisso...
    99+
    2023-07-05
  • redis分布式锁的实现原理详解
    首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 1.互斥性。在任意时刻,只有一个客户端能持有锁。 2.不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有...
    99+
    2024-04-02
  • Redis分布式锁如何实现
    这篇文章将为大家详细讲解有关Redis分布式锁如何实现,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。什么是分布式锁?要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、...
    99+
    2024-04-02
  • Redis如何实现分布式锁
    目录一、前言二、正文今天我们来聊一聊分布式锁的那些事。 相信大家对锁已经不陌生了,我们在多线程环境中,如果需要对同一个资源进行操作,为了避免数据不一致,我们需要在操作共享资源之前进行...
    99+
    2024-04-02
  • zookeeper分布式锁如何实现
    这篇文章主要介绍“zookeeper分布式锁如何实现”,在日常操作中,相信很多人在zookeeper分布式锁如何实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zookeeper分布式锁如何实现”的疑惑有所...
    99+
    2023-06-27
  • Zookeeper如何实现分布式锁
    这篇“Zookeeper如何实现分布式锁”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Zookeeper如何实现分布式锁”文...
    99+
    2023-06-27
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作