SpringCloud+RocketMQ实现分布式事务的实践

作者:黄青石 时间:2022-04-06 16:33:04 

随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的。所以我们来一步一步的实现基于RocketMQ的分布式事务。接下来,我们将要做的主题写出来。

  • RocketMQ的分布式事务结构和说明

  • 搭建RocketMQ步骤

  • 事务场景,然后准备工程,运行代码

一、RocketMQ的分布式事务结构和说明

我们通过下图来了解一下RocketMQ实现分布式事务的结构。采用半消息机制实现分布式事务,半消息顾名思义,就是发送方将消息发送到MQ中的Broker端,这个消息被标记为“暂不投递”状态,这个时间订阅方是不能收到这个消息的,当发送方将提交了Commit后,这个消息才可以被订阅方收到。如果发送方提交了Rollback,那么订阅方就不会收到以该消息。

RocketMQ采用的是最终一致性事务,因为它使用了半消息机制,订阅方可能收不到消息,但是最终状态是能保持一致的。

我们在讲这个流程前,我们需要先看下图的红色部分,当发送方发给MQ的Commit/Rollback没有收到的时候,这个时候需要启用一个线程,从本地事务库里边查找到当前的状态,根据当前的状态来决定是否重新发送Commit/Rollback,相当于重试。

SpringCloud+RocketMQ实现分布式事务的实践

接下来,我们说一下整个执行的过程。

发送方将半消息发送到MQ中,成功后MQ将消息保持好后将结果同步给发送方,即半消息发送成功。当MQ告诉发送方成功后,我们需要记录下发送到MQ的相关的事情,比如发送的消息内容,发送时间,发送的相关ID,比如事务ID,都要做好相应记录。当本地事务库,也可以理解为日志库,我们可以针对发生的问题进行追溯。本地库记录好事务后,发送方就可以将Commit/Rollback提交到MQ了。当MQ收到了Commit命令后,这个时候就会将该条消息发送给订阅方。收到了Rollback后,那该条消息不会投递给订阅方,消息保存3天后删除。

整个过程就是这些,大家如果觉得有遗漏的,可以告诉我。

二、搭建RocketMQ

我使用的是windos10,所以需要下载windows版本。

进入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

下载二进制文件压缩包。如下图:

SpringCloud+RocketMQ实现分布式事务的实践

先解压,然后进行环境变量配置,打开菜单,直接输入环境变量。

变量名:ROCKETMQ_HOME

值:解压的release的路径,如D:\rocketmq-all-4.3.0-bin-release

然后打开CMD命令,进入到解压路径中的bin目标,进行nameserver启动。


start mqnameserv.cmd

启动后的效果为:

SpringCloud+RocketMQ实现分布式事务的实践

再进行broker的启动,启动需要连到的nameserver为127.0.0.1:9876,打开自动创建Topic,这个时候当用到某个Topic没有的情况下会自动创建一个。


start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

SpringCloud+RocketMQ实现分布式事务的实践

这个时候Windows的rocketmq就下载好并且启动成功了。

三、事务场景,然后准备工程,运行代码

因为我们使用的RocketMq是使用的分布式事务是最终一致性的,柔性的,所以我们使用的场景也要考虑到。我们用的场景就是下订单充话费,用户支付了订单,那么直接给用户的进行充值。购买的订单不会立马充值到手机的,需要等一会才到账,这就是最终一致性。

我们使用的如下代码版本号,SpringCloud的版本号要和SpringBoot保持一致,否则会出现类找不到的情况。

SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)

我们使用SpringCloud的几个组件:

Euerka Server :用于提供服务注册的能力和发现

Euerka Client : 用于进行服务注册

Feign:用于服务间的调用

Config :用于进行配置

接下来,我们创建需要进行配置的表。


DROP TABLE IF EXISTS `spring_cloud_config`;

CREATE TABLE `spring_cloud_config` (
 `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
 `akey` varchar(30) DEFAULT NULL,
 `avalue` varchar(128) DEFAULT NULL,
 `application` varchar(30) DEFAULT NULL,
 `aprofile` varchar(30) DEFAULT NULL,
 `label` varchar(30) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
   (2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
   (3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
   (4,'order_topic','order_topic','order-service','dev','dev'),
   (5,'order_topic','order_topic','product-service','dev','dev');

我们构建了4个模块,eureka模块用于服务注册和发现,springcloud-config将数据库创建的配置加载供其他工程使用。charge-order-service用于下充值订单,phone-charge-service用于给手机进行充值业务。

SpringCloud+RocketMQ实现分布式事务的实践

主要的流程为,通过charge-order-service产生了订单,然后将订单和数量到phone-charge-service进行二次确认看是否还有足够的数量可以使用,如果数量不够的话直接将事务进行rollback,这样消息就不会到消费端。如果下过去的订单号已经充值过了,那么该消息将会被直接丢掉,这也是消息端的幂等设计。

接下来,我们看一下生产者以及事务的核心代码。主要用于连接RocketMQ, 执行本地事务,在本地事务中进行二次确认,根据结果进行Commit、Rollback。当连接RocketMQ出现问题的时候可能会收到UNKNOWN,这个时候会调用checkLocalTransaction()方法,用于检查是否将消息发送给RocketMQ了。


package com.hqs.chargeorderservice.mqservice;

import com.alibaba.fastjson.JSONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
*
* @Description: 分布式事务RocketMQ 生产者
*/
@Slf4j
@Component
public class TransactionProducer {

/**
    * 需要自定义事务 * 用于 事务的二次确认 和 事务回查
    */
   private TransactionListener transactionListener ;

/**
    * 这里的生产者和之前的不一样
    */
   private TransactionMQProducer producer = null;

/**
    * 官方建议自定义线程 给线程取自定义名称 发现问题更好排查
    */
   private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
           new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
           Thread thread = new Thread(r);
           thread.setName("client-transaction-msg-check-thread");
           return thread;
       }

});

public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
       transactionListener = new TransactionListenerImpl(produceOrderService);
       // 初始化 事务生产者
       producer = new TransactionMQProducer(jms.getOrderTopic());
       // 添加服务器地址
       producer.setNamesrvAddr(jms.getNameServer());
       // 添加事务 *
       producer.setTransactionListener(transactionListener);
       // 添加自定义线程池
       producer.setExecutorService(executorService);

start();
   }

public TransactionMQProducer getProducer() {
       return this.producer;
   }

/**
    * 对象在使用之前必须要调用一次,只能初始化一次
    */
   public void start() {
       try {
           this.producer.start();
       } catch (MQClientException e) {
           e.printStackTrace();
       }
   }

/**
    * 一般在应用上下文,使用上下文 * ,进行关闭
    */
   public void shutdown() {
       this.producer.shutdown();
   }
}

/**
* @Description: 自定义事务 *
*/
@Slf4j
class TransactionListenerImpl implements TransactionListener {

@Autowired
   private ProduceOrderService produceOrderService ;

public TransactionListenerImpl( ProduceOrderService produceOrderService) {
       this.produceOrderService = produceOrderService;
   }

@Override
   public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
       log.info("=========本地事务开始执行=============");
       String message = new String(msg.getBody());
       JSONObject jsonObject = JSONObject.parseObject(message);
       Integer productId = jsonObject.getInteger("productId");
       Integer total = jsonObject.getInteger("total");
       Integer tradeNo = jsonObject.getInteger("tradeNo");
       int userId = Integer.parseInt(arg.toString());
       //模拟执行本地事务begin=======
       /**
        * 本地事务执行会有三种可能
        * 1、commit 成功
        * 2、Rollback 失败
        * 3、网络等原因服务宕机收不到返回结果
        */
       log.info("本地事务执行参数,用户id={},商品ID={},销售库存={}",userId,productId,total);
       int result = produceOrderService.save(userId, productId, total, tradeNo);
       //模拟执行本地事务end========
       //TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
       //1、二次确认消息,然后消费者可以消费
       if (result == 0) {
           return LocalTransactionState.COMMIT_MESSAGE;
       }
       //2、回滚消息,Broker端会删除半消息
       if (result == 1) {
           return LocalTransactionState.ROLLBACK_MESSAGE;
       }
       //3、Broker端会进行回查消息
       if (result == 2) {
           return LocalTransactionState.UNKNOW;
       }
       return LocalTransactionState.COMMIT_MESSAGE;
   }

/**
    * 只有上面接口返回 LocalTransactionState.UNKNOW 才会调用查接口被调用
    *
    * @param msg 消息
    * @return
    */
   @Override
   public LocalTransactionState checkLocalTransaction(MessageExt msg) {
       log.info("==========回查接口=========");
       String key = msg.getKeys();
       //TODO 1、必须根据key先去检查本地事务消息是否完成。
       /**
        * 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
        * 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
        * 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
        */
       // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
       return LocalTransactionState.COMMIT_MESSAGE;
   }
}

我们启动一下程序。我们登录localhost:7001注册中心,看到3个服务都成功注册了。

SpringCloud+RocketMQ实现分布式事务的实践

然后我们进行下订单调用接口,userId用户编码,productId产品号,total购买量,tradeNo交易订单号

http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4

charge-order-service显示出来本地执行事务的参数。

SpringCloud+RocketMQ实现分布式事务的实践

phone-charge-service消费事务执行的日志。

SpringCloud+RocketMQ实现分布式事务的实践

当我们在创建charge-order-service和phone-charge-service的时候一定要注意,将工程里边的application.properties变为bootstrap.yml,要不然程序启动的时候会从8888找config的配置内容。因为SpringCloud里面有个“启动上下文”,主要是用于加载远端的配置,也就是加载ConfigServer里面的配置,默认加载顺序为:加载bootstrap.*里面的配置 --> 链接configserver,加载远程配置 --> 加载application.*里面的配置。

好了,代码也放到git地址了,https://github.com/stonehqs/springcloud-rocketmq-transaction,

来源:https://www.cnblogs.com/huangqingshi/p/15456085.html

标签:SpringCloud,RocketMQ,分布式事务
0
投稿

猜你喜欢

  • Android中Canvas的常用方法总结

    2021-11-25 03:00:28
  • java IO流 之 输入流 InputString()的使用

    2023-08-22 07:44:31
  • 深入探究Spring底层核心原理

    2023-03-05 08:32:16
  • Struts2 Result 参数详解

    2022-04-28 07:54:35
  • java身份证合法性校验并提取身份证有效信息

    2023-04-18 17:26:18
  • spring mvc实现登录账号单浏览器登录

    2022-06-28 22:29:47
  • 在WPF中实现全局快捷键功能

    2023-12-02 07:45:39
  • C#基于UDP进行异步通信的方法

    2022-03-20 18:23:55
  • Android开发之线程通信详解

    2022-08-21 21:37:34
  • Spring boot集成Mybatis的方法教程

    2023-11-25 06:20:41
  • C# 对Outlook2010进行二次开发的图文教程

    2022-02-03 00:34:34
  • Android自定义图片集合

    2022-06-24 11:34:52
  • java按指定编码写入和读取文件内容的类分享

    2023-06-18 10:13:01
  • 解决myBatis返回integer值的问题

    2022-07-23 18:17:38
  • java阶乘计算获得结果末尾0的个数代码实现

    2022-11-30 00:01:02
  • android计算器代码示例分享

    2023-10-14 14:06:58
  • Java绘制迷宫动画并显示的示例代码

    2022-04-06 22:37:45
  • Java List转换成String数组几种实现方式详解

    2023-11-10 07:19:41
  • iOS新浪微博、腾讯微博分享功能实例

    2023-06-16 09:15:53
  • Java日常练习题,每天进步一点点(2)

    2023-08-17 22:46:19
  • asp之家 软件编程 m.aspxhome.com