讲解如何利用 Python完成 Saga 分布式事务
作者:叶东富 发布时间:2021-08-19 00:03:48
目录
1、分布式事务
2、SAGA
3、SAGA 实践
4、处理网络异常
5、处理回滚
6、小结
银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。
1、分布式事务
分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:
基本业务可用性( Basic Availability )
柔性状态( Soft state )
最终一致性( Eventual consistency )
另一方面,分布式事务也部分遵循 ACID 规范:
原子性:严格遵循
一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
持久性:严格遵循
2、SAGA
Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
目前可用于 SAGA 的开源框架,主要为 Java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:
DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:
AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支
RM/资源管理器,负责分支事务各项资源的管理
TM/事务管理器,负责协调全局事务的正确执行,包括 SAGA 正向 /逆向操作的执行
下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:
3、SAGA 实践
对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。
首先我们创建账户余额表:
CREATE TABLE dtm_busi.`user_account` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
我们先编写核心业务代码,调整用户的账户余额
def saga_adjust_balance(cursor, uid, amount):
affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
if affected == 0:
raise Exception("update error, balance not enough")
下面我们来编写具体的正向操作 /补偿操作的处理函数
@app.post("/api/TransOutSaga")
def trans_out_saga():
saga_adjust_balance(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutCompensate")
def trans_out_compensate():
saga_adjust_balance(c, out_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInSaga")
def trans_in_saga():
saga_adjust_balance(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInCompensate")
def trans_in_compensate():
saga_adjust_balance(c, in_uid, -30)
return {"dtm_result": "SUCCESS"}
到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用
# 这是 dtm 服务地址
dtm = "http://localhost:8080/api/dtmsvr"
# 这是业务微服务地址
svc = "http://localhost:5000/api"
req = {"amount": 30}
s = saga.Saga(dtm, utils.gen_gid(dtm))
s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
s.submit()
至此,一个完整的 SAGA 分布式事务编写完成。
如果您想要完整运行一个成功的示例,那么参考这个例子yedf/dtmcli-py-sample,将它运行起来非常简单
# 部署启动 dtm
# 需要 docker 版本 18 以上
git clone https://github.com/yedf/dtm
cd dtm
docker-compose up
# 另起一个命令行
git clone https://github.com/yedf/dtmcli-py-sample
cd dtmcli-py-sample
pip3 install flask dtmcli requests
flask run
# 另起一个命令行
curl localhost:5000/api/fireSaga
4、处理网络异常
假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?
这类网络异常的妥当处理,是分布式事务中的大难题,异常情况包括三类:重复请求、空补偿、悬挂,都需要正确处理
DTM 提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交。(子事务屏障详情参考分布式事务最经典的七种解决方案的子事务屏障环节)
我们把处理函数调整为:
@app.post("/api/TransOutSaga")
def trans_out_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, out_uid, -30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "SUCCESS"}
这里的 barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证 busi_callback 回调函数仅被提交一次
您可以尝试多次调用这个 TransIn 服务,仅有一次余额调整。
5、处理回滚
假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败
@app.post("/api/TransInSaga")
def trans_in_saga():
return {"dtm_result": "FAILURE"}
我们给出事务失败交互的时序图:
这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?
不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作;TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。
我们可以将返回错误的 TransIn 改成:
@app.post("/api/TransInSaga")
def trans_in_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, in_uid, 30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "FAILURE"}
最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节
6、小结
在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。
文中使用的 dtm 是新开源的 Golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、PHP 、node 、csharp 等语言的。同时提供了非常简单易用的接口。
来源:http://developer.51cto.com/art/202109/681073.htm


猜你喜欢
- 一、随机数种子为什么要提出随机数种子呢?咱们前面提到过了,随机数均是模拟出来的, 想要模拟的比较真实,就需要变换种子函数内的数值,一般以时间
- 网络上关于各种语言和应用软件的速查手册和快速参考指南有很多很多,不幸的是当我们需要的时候,总是很难找到,所以我决定花点时间尽可能的收集更多的
- DesktopNexus 是我最喜爱的一个壁纸下载网站,上面有许多高质量的壁纸,几乎每天必上, 每月也必会坚持分享我这个月来收集的壁纸但是
- 第一点:找Python安装目录方法一:方法二:输入import sysprint(sys.path)化黑线处第二点:找到安装目录后就可以开始
- 前言:在Python中,如果我们想要在遍历一组数据的过程中,对这组数据进行修改,通常会出现许多问题,例如对列表进行上述操作时, 会忽略部分数
- 通过启用php.ini配置文件中的相关选项,就可以将大部分想利用SQL注入漏洞的骇客拒绝于门外。 开启magic_quote_gpc=on之
- python开启debug模式的代码如下所示:import requests session = requests.session()imp
- MySQL是一个非常流行的小型关系型数据库管理系统,2008年1月16号被Sun公司收购。目前MySQL被广泛地应用在Internet上的中
- 数据标准化(归一化)处理是数据挖掘的一项基础工作,不同评价指标往往具有不同的量纲和量纲单位,这样的情况会影响到数据分析的结果,为了消除指标之
- 前言:之前,我写笔记的工具一直都是 notion,而且没有写博客的习惯。但是一是由于 notion 的服务器在
- <%Function BytesToBstr(body,Cset)dim objstreamset&n
- 什么是序列化与反序列化这里引入微软对序列化的解释:序列化是指将对象转换成字节流,从而存储对象或将对象传输到内存、数据库或文件的过程。 它的主
- mysql>prompt \u@\h(\d) \r:\m:\s> \u:连接用户 \h:连接主机 \d:连接数据库 \r:\m:
- Pytorch one_hot编码函数解读one_hot编码定义在一个给定的向量中,按照设定的最值–可以是向量中包含的最
- 以下函数代码中“123456” 是个加密的key,自己可以随便改。php加密,js解密,貌似没什么意义,主要是key在js中会被看到。不过在
- 一、什么是xml?有何特征?xml即可扩展标记语言,它可以用来标记数据、定义数据类型,是一种允许用户对自己的标记语言进行定义的源语言。例子:
- 目录range函数的使用第一种创建方式第二种创建方式第三种创建方式判断指定的数有没有在当前序列中循环结构总结range函数的使用作为循环遍历
- 本文主要是利用Python的第三方库Pillow,实现单通道灰度图像的颜色翻转功能。# -*- encoding:utf-8 -*-impo
- 本文实例讲述了Python Flask框架模板操作。分享给大家供大家参考,具体如下:模板在前面的示例中,视图函数的主要作用是生成请求的响应,
- 1. 数据筛选 a b c0 0 2 41 6 8 102 12 14 163 18 20 224 24 26 285 30 32 346