使用@TransactionalEventListener监听事务教程

作者:shiliang_feng 时间:2023-10-05 02:50:44 

@TransactionalEventListener监听事务

项目背景

最近在项目遇到一个问题

A方法体内有 INSERT、UPDATE或者DELETE操作,最后会发送一段MQ给外部,外部接收到MQ后会再发送一段请求过来,系统收到请求后会执行B方法,B方法会依赖A方法修改后的结果,这就有一个问题,如果A方法事务没有提交;且B方法的请求过来了会查询到事务未提交前的状态,这就会有问题

解决办法:@TransactionalEventListener

在Spring4.2+,有一种叫做TransactionEventListener的方式,能够控制在事务的时候Event事件的处理方式。 我们知道,Spring的发布订阅模型实际上并不是异步的,而是同步的来将代码进行解耦。而TransactionEventListener仍是通过这种方式,只不过加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等的时候才会去进行Event的处理。

具体实现


//创建一个事件类
package com.qk.cas.config;
import org.springframework.context.ApplicationEvent;
public class MyTransactionEvent extends ApplicationEvent {
   private static final long serialVersionUID = 1L;
   private IProcesser processer;
   public MyTransactionEvent(IProcesser processer) {
       super(processer);
       this.processer = processer;
   }
   public IProcesser getProcesser() {
       return this.processer;
   }
   @FunctionalInterface
   public interface IProcesser {
       void handle();
   }
}
//创建一个监听类
package com.qk.cas.config;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class MyTransactionListener {
   @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
   public void hanldeOrderCreatedEvent(MyTransactionEvent event) {
       event.getProcesser().handle();
   }
}
//MQ方法的变动
   @Autowired
   private ApplicationEventPublisher eventPublisher;
   @Autowired
   private AmqpTemplate rabbitTemplate;
   public void sendCreditResult(String applyNo, String jsonString) {
       eventPublisher.publishEvent(new MyTransactionEvent(() -> {
           LOGGER.info("MQ。APPLY_NO:[{}]。KEY:[{}]。通知报文:[{}]", applyNo, Queues.CREDIT_RESULT, jsonString);
           rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);
       }));
   }

拓展

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有当前事务提交之后,才会执行事件监听的方法,其中参数phase默认为AFTER_COMMIT,共有四个枚举:


public enum TransactionPhase {
   /**
    * Fire the event before transaction commit.
    * @see TransactionSynchronization#beforeCommit(boolean)
    */
   BEFORE_COMMIT,
   /**
    * Fire the event after the commit has completed successfully.
    * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
    * therefore executes in the same after-completion sequence of events,
    * (and not in {@link TransactionSynchronization#afterCommit()}).
    * @see TransactionSynchronization#afterCompletion(int)
    * @see TransactionSynchronization#STATUS_COMMITTED
    */
   AFTER_COMMIT,
   /**
    * Fire the event if the transaction has rolled back.
    * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
    * therefore executes in the same after-completion sequence of events.
    * @see TransactionSynchronization#afterCompletion(int)
    * @see TransactionSynchronization#STATUS_ROLLED_BACK
    */
   AFTER_ROLLBACK,
   /**
    * Fire the event after the transaction has completed.
    * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
    * {@link #AFTER_ROLLBACK} to intercept transaction commit
    * or rollback, respectively.
    * @see TransactionSynchronization#afterCompletion(int)
    */
   AFTER_COMPLETION
}

注解@TransactionalEventListener

例如 用户注册之后需要计算用户的邀请关系,递归操作。如果注册的时候包含多步验证,生成基本初始化数据,这时候我们通过mq发送消息来处理这个邀请关系,会出现一个问题,就是用户还没注册数据还没入库,邀请关系就开始执行,但是查不到数据,导致出错。

@TransactionalEventListener 可以实现事务的监听,可以在提交之后再进行操作。

监听的对象


package com.jinglitong.springshop.interceptor;
import com.jinglitong.springshop.entity.Customer;
import org.springframework.context.ApplicationEvent;

public class RegCustomerEvent extends ApplicationEvent{
   public RegCustomerEvent(Customer customer){
       super(customer);
   }
}

监听到之后的操作


package com.jinglitong.springshop.interceptor;
import com.alibaba.fastjson.JSON;
import com.jinglitong.springshop.entity.Customer;
import com.jinglitong.springshop.entity.MqMessageRecord;
import com.jinglitong.springshop.servcie.MqMessageRecordService;
import com.jinglitong.springshop.util.AliMQServiceUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;  
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
@Slf4j
public class RegCustomerListener {

@Value("${aliyun.mq.order.topic}")
   private String topic;

@Value("${aliyun.mq.regist.product}")
   private String registGroup;

@Value("${aliyun.mq.regist.tag}")
   private String registTag;

@Autowired
   MqMessageRecordService mqMessageRecordService;

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
   public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {
       Customer cust = (Customer) regCustomerEvent.getSource();
       Map<String, String> map = new HashMap<String, String>();
       map.put("custId", cust.getZid());
       map.put("account", cust.getAccount());
       log.info("put regist notice to Mq start");
       String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);
       MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);
       if(StringUtils.isEmpty(hdResult)) {
           insert.setStatus(false);
       }else {
           insert.setStatus(true);
       }
       mqMessageRecordService.insertRecord(insert);
       log.info("put regist notice to Mq end");
       log.info("regist notice userId : " + cust.getAccount());
   }

private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {
       MqMessageRecord msg = new MqMessageRecord();
       msg.setFlowId(custId);
       msg.setGroupName(groupId);
       msg.setTopic(topic);
       msg.setTag(tag);
       msg.setMsgId(result);
       msg.setDataBody(jsonStr);
       msg.setSendType(3);
       msg.setGroupType(1);
       msg.setCreateTime(new Date());
       return msg;
   }
}
@Autowired
   private ApplicationEventPublisher applicationEventPublisher;

applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));

这样可以确保数据入库之后再进行异步计算

来源:https://my.oschina.net/u/4021946/blog/4915478

标签:@TransactionalEventListener,监听,事务
0
投稿

猜你喜欢

  • 详解Java中使用泛型实现快速排序算法的方法

    2022-04-28 09:47:00
  • Android webview与js交换JSON对象数据示例

    2022-10-19 18:36:18
  • Android实现遮罩层(蒙板)效果

    2023-04-26 18:43:03
  • c++难以发现的bug(有趣)

    2022-01-27 17:01:41
  • C#中string.format用法详解

    2023-07-12 21:25:48
  • springboot乱码问题解决方案

    2022-03-22 21:32:38
  • Java常用HASH算法总结【经典实例】

    2023-04-26 00:34:55
  • java 二叉查找树实例代码

    2022-07-23 22:54:28
  • 使用Java程序模拟实现新冠病毒传染效果

    2022-09-12 20:26:14
  • Android检测Activity或者Service是否运行的方法

    2021-09-03 00:52:00
  • 深入理解C#窗体关闭事件

    2023-06-01 14:38:56
  • Java Swing组件编程之JTable表格用法实例详解

    2022-12-23 01:49:26
  • Android apk 插件启动内存释放问题

    2022-05-16 07:26:39
  • WCF和Remoting之间的消息传输

    2023-04-15 01:01:20
  • Android异常处理最佳实践

    2021-06-15 20:17:12
  • Android控件之Spinner用法实例分析

    2022-08-06 08:36:33
  • 深入理解Java并发编程之ThreadLocal

    2023-11-21 02:43:42
  • Android平台预置GMS包后关机闹钟失效问题及解决方法

    2022-12-31 05:52:33
  • SpringBoot+JWT实现注册、登录、状态续签流程分析

    2022-09-29 09:07:11
  • windows 部署JAVA环境安装iDea的详细步骤

    2022-10-09 01:09:26
  • asp之家 软件编程 m.aspxhome.com