springcloud中RabbitMQ死信队列与延迟交换机实现方法

作者:wu55555 时间:2023-05-04 04:25:54 

0.引言

死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306中的30分钟后未支付订单取消。那么本期,我们就来讲解死信队列,以及如何通过延迟交换机来实现延迟发送的需求。

1. 死信队列

1.2 什么是死信?

理解死信队列前,我们先讲解什么是死信,所谓死信就是没有被成功消费的消息,但并不是所有未成功消费的消息都是死信消息,死信消息的产生来源于以下三种途径: (1)消息被消费者拒绝,参数requeue设置为false的消息 (2)过期的消息,过期消息分为两种: a. 发送消息时,设置了某一条消息的生存时间(message TTL),如果生存时间到了,消息还没有被消费,就会被标注为死信消息 b. 设置了队列的消息生存时间,针对队列中所有的消息,如果生存时间到了,消息还没有被消费,就会被标注为死信消息 (3)当队列达到了最大长度后,再发送过来的消息就会直接变成死信消息

1.3 什么是死信队列?

直接来讲,用来盛装死信的队列就是死信队列,好像是一句废话,所以其重点在于理解死信的概念。

死信队列的作用: (1)队列在已满的情况下,会将消息发送到死信队列中,这样消息就不会丢失了,回头再从死信队列里将消息取出来进行消费即可 (2)可以基于死信队列实现延迟消费的效果。具体的实现我们后续讲解

1.4 创建死信交换机、死信队列

死信交换机、死信队列其实都是普通的交换机、队列,只是专门声明出来用于存储死信消息的。我们只需要通过deadLetterExchange方法来声明死信交换机,然后用deadLetterRoutingKey方法来声明死信队列

如下代码所示,我们创建了test.queuetest.exchangedead.queuedead.exchange,并且在test.queue中将死信交换机和死信路由指定到了测试队列中

注意:涉及到修改队列、交换机属性的,如果该队列、交换机已经存在需要将其删除后才能生效,否则可能还会报错。

@Configuration
public class RabbitMqConfig {

private static final String TEST_EXCHANGE = "test.exchange";
   private static final String TEST_QUEUE = "test.queue";
   private static final String TEST_ROUTING_KEY = "test.routing.key";
   private static final String DEAD_EXCHANGE = "dead.exchange";
   private static final String DEAD_QUEUE = "dead.queue";
   private static final String DEAD_ROUTING_KEY = "dead.routing.key";

@Bean
   public Queue deadQueue(){
       return new Queue(DEAD_QUEUE);
   }
   public DirectExchange deadExchange(){
   // 设置演示,使用了直接交换机Direct,大家可以根据自己的业务情况声明为其他类型的交换机
       return new DirectExchange(DEAD_EXCHANGE);
   public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
       return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
   public DirectExchange testExchange(){
       return new DirectExchange(TEST_EXCHANGE);
   public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
       return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);
}

1.5 实现死信消息

1.5.1 基于消费者进行reject或nack实现死信消息

@Component
public class QueueListener {
   @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
   public void handler(MyMessage messageInfo, Message message, Channel channel) {
       try{
           System.out.println("接收的消息:"+messageInfo.toString());
           // requeue参数设置为false 设置死信消息
           channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
           // multiple和requeue设置为false 设置死信消息
           channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
           // 返回ack 确认接收到消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
       }catch (IOException e){
           try {
               channel.basicRecover();
           } catch (IOException ex) {
               ex.printStackTrace();
               log.error("消息处理失败:{}",e.getMessage());
           }
       }
   }
}

1.5.2 基于生存时间实现

(1)发送消息时设置生存时间

@GetMapping("sendTestQueueWithExpiration")
   public String sendTestQueueWithExpiration(){
       MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date());
       rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
           msg.getMessageProperties().setExpiration("5000");
           return msg;
       });
       return "发送成功";
   }

(2)队列设置生存时间

@Bean
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               // 10s 过期
               .ttl(10000)
               .build();
   }

1.5.3 基于队列max_length实现

@Bean
   public Queue testQueue(){
       return QueueBuilder.durable(TEST_QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               // 容量最大100条
               .maxLength(100)
               .build();
   }

1.6 基于死信队列实现消息延迟发送

上述我们说过死信队列还可以消息延迟发送,其思路就是: (1)消息发送时设置消息的生存时间,其生存时间就是我们想要延迟的时间 (2)消息者监控死信队列进行消费

正常队列的消息因为没有消费者消费,同时又指定了生存时间,到达时间后消息转发到死信队列中,消费者监听了死信队列从而将其消费掉。

基于死信队列实现消息延迟发送的问题

如果有两个消息,一个是5s生存时间,一个是10s生存时间,当我们先发送了10s生存时间的消息到queue中时,因为rabbitmq只会监控队列最外侧的消息的生存时间,也就是监控10s生存时间的消息,而5s生存时间的消息只会在最外侧的10s消息到期后才会监控,也就导致我实际需要5s生存的消息,实际需要10s才监听到了。

所以呢,基于死信队列实现的延迟消息,只使用于延迟时间一致的消息。

为了适配更多的延迟场景,已经更加简单的实现延迟消息,我们引入了延迟交换机

2. 延迟交换机

延迟交换机并不是rabbitmq自带的功能,而是要通过安装延迟交换机插件delayed_message_exchange来实现

其插件的安装我们之间已经讲解过,不再累叙,可以参考如下博文 springcloud:安装rabbitmq并配置延迟队列插件

通过延迟交换机实现的延迟消息,其重点主要在交换机上,队列就是普通队列,消息发送到交换机上后,会记录消息的延迟时间,到达时间后才会发送到队列中,这样消费者通过监控队列,就能在指定时间获取到消息

因此延迟交换机与普通交换机的实现,只在创建交换机时,其他的操作与普通交换机无异,因此使用起来也很方便

创建延迟交换机,通过x-delayed-type属性声明交换机类型,可以是direct也可以是topic,具体支持4中交换机类型,如果不清楚的可以参考之前的博文

@Configuration
public class RabbitMqDelayConfig {
   public static final String DELAY_EXCHANGE = "delay.exchange";
   public static final String DELAY_QUEUE = "delay.queue";
   public static final String DELAY_ROUTING_KEY = "delay.routing.key";
   @Bean
   public Exchange delayExchange(){
       Map<String, Object> arguments = new HashMap<>(1);
       arguments.put("x-delayed-type","direct");
       return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
   }
   @Bean
   public Queue delayQueue(){
       return new Queue(DELAY_QUEUE);
   }
   @Bean
   public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
       return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
   }

}

发送消息时指定延迟时间,单位毫秒

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
           @Override
           public Message postProcessMessage(Message message) throws AmqpException {
               message.getMessageProperties().setDelay(30000);
               return message;
           }
       });

我们还可以将该方法封装为工具类方法,方便之后调用

/**
* 发送 延迟队列
* @param exchange 交换机
* @param routeKey 路由
* @param message 消息
* @param delaySecond 延迟秒数
*/
public void send(String exchange, String routeKey, Object message, int delaySecond){  
rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
// 消息持久化
msg.getMessageProperties().setDelay(delaySecond * 1000);
return msg;
});
}

3. 应用场景

延迟消息的应用场景丰富,除了我们开篇所说的30分钟未支付自动取消订单,还比如到货后72小时未签收自动签收

基本上所有需要延迟触发的业务场景都可以用rabbitmq延迟队列来实现。

4. 练习题

对于刚接触rabbitmq的同学,这里我提供一个练习题给大家,也让大家在实操中加强对于rabbitmq的理解:

需求:订单到货后72小时未签收,自动签收 讲解:我们这里要实现订单到货后的自动签收功能,订单到货后会触发发送自动签收消息的方法,订单已签收的状态status为2,到货状态为1,如果72小时前已经签收了即status被更新为2了,那么需要取消自动签收(不执行自动签收,即忽略自动签收消息)

来源:https://juejin.cn/post/7102722834375376904

标签:springcloud,RabbitMQ,死信队列,延迟交换机
0
投稿

猜你喜欢

  • Java中tomcat memecached session 共享同步问题的解决办法

    2021-12-26 14:22:54
  • Android自定义View多种效果解析

    2022-05-26 06:59:46
  • 云IDE:Eclipse Che:Eclipse下一代IDE(推荐)

    2023-04-01 05:58:27
  • 详解StackExchange.Redis通用封装类分享

    2022-07-06 00:41:32
  • C#使用Selenium的实现代码

    2021-11-16 05:58:25
  • Vue结合Springboot实现用户列表单页面(前后端分离)

    2023-05-28 12:51:22
  • java多线程之铁路售票系统

    2022-01-31 12:25:14
  • spring配置文件解析失败报”cvc-elt.1: 找不到元素 ''beans'' 的声明”异常解决

    2021-07-01 12:45:52
  • 详解Spring-boot中读取config配置文件的两种方式

    2021-07-04 15:52:55
  • JAVA多线程并发下的单例模式应用

    2022-09-15 01:27:31
  • Java内置GUI Frame类的使用

    2021-10-25 18:06:49
  • Android使用Handler实现定时器与倒计时器功能

    2022-03-30 09:06:57
  • Scala中的mkString的具体使用方法

    2023-11-16 00:18:18
  • java设计模式-单例模式实现方法详解

    2022-02-15 13:56:01
  • java报错:找不到或无法加载主类的解决方法简单粗暴

    2023-03-29 17:20:58
  • C#中事件处理的个人体会

    2023-06-22 16:50:55
  • Android使用Sensor感应器实现线程中刷新UI创建android测力计的功能

    2023-03-05 07:51:48
  • Java获取彩色图像中的主色彩的实例代码

    2021-10-16 01:09:39
  • C#判断当前程序是否通过管理员运行的方法

    2023-09-27 15:48:24
  • java制作复制文件工具代码分享

    2022-08-05 05:30:22
  • asp之家 软件编程 m.aspxhome.com