关于SpringBoot整合RabbitMQ实现死信队列

作者:叫我二蛋 时间:2022-03-29 23:27:05 

概念介绍

什么是死信

死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:

  1. 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。

  2. 消息到达生存时间还未被消费。

  3. 队列超过长度限制,消息被丢弃。

这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图

关于SpringBoot整合RabbitMQ实现死信队列

死信队列应用

  • 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。

  • 可以实现延迟消费功能。比如,订单15分钟内未支付。

注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:

  • 维护多个队列,每个队列维护一个TTL时间。

  • 使用延迟交换机。这种方式需要下载插件支持

工程搭建

环境说明

  • RabbitMQ环境

  • Java版本:JDK1.8

  • Maven版本:apache-maven-3.6.3

  • 开发工具:IntelliJ IDEA

搭建步骤

1.创建SpringBoot项目。

2.pom.xml文件导入RabbitMQ依赖。

<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

3.application.yml文件添加RabbitMQ配置。

spring:
 # rabbitmq配置信息 RabbitProperties类
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: guest
   password: guest
   virtual-host: /
   # 开启confirm机制
   publisher-confirm-type: correlated
   # 开启return机制
   publisher-returns: true
   #全局配置,局部配置存在就以局部为准
   listener:
     simple:
       acknowledge-mode: manual # 手动ACK

实现死信

准备Exchange&Queue

@Configuration
public class RabbitMQConfig {
   /**
    * 正常队列
    */
   public static final String EXCHANGE = "boot-exchange";
   public static final String QUEUE = "boot-queue";
   public static final String ROUTING_KEY = "boot-rout";
   /**
    * 死信队列
    */
   public static final String DEAD_EXCHANGE = "dead-exchange";
   public static final String DEAD_QUEUE = "dead-queue";
   public static final String DEAD_ROUTING_KEY = "dead-rout";
   /**
    * 声明死信交换机
    *
    * @return
    */
   @Bean
   public Exchange deadExchange() {
       return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
   }
   /**
    * 声明死信队列
    *
    * @return
    */
   @Bean
   public Queue deadQueue() {
       return QueueBuilder.durable(DEAD_QUEUE).build();
   }
   /**
    * 绑定死信的队列和交换机
    *
    * @param deadExchange
    * @param deadQueue
    * @return
    */
   @Bean
   public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
       return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
   }
   /**
    * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
    *
    * @return
    */
   @Bean
   public Exchange bootExchange() {
       return ExchangeBuilder.directExchange(EXCHANGE).build();
   }
   /**
    * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
    * 绑定死信交换机及路由key
    *
    * @return
    */
   @Bean
   public Queue bootQueue() {
       return QueueBuilder.durable(QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               //声明队列属性有更改时需要删除队列
               //给队列设置消息时长
               //.ttl(10000)
               //队列最大长度
               .maxLength(1)
               .build();
   }
   /**
    * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
    *
    * @param bootExchange
    * @param bootQueue
    * @return
    */
   @Bean
   public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
       return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
   }
}

监听死信队列

@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
   public void listener_dead(String msg, Channel channel, Message message) throws IOException {
       System.out.println("死信接收到消息" + msg);
       System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
       System.out.println("messageID:" + message.getMessageProperties().getMessageId());
       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
   }

方式一、消费者拒绝&否认

  • 拒绝消息

@RabbitListener(queues = RabbitMQConfig.QUEUE)
   public void listener(String msg, Channel channel, Message message) throws IOException {
       System.out.println("接收到消息" + msg);
       channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
   }
  • 否认消息

@RabbitListener(queues = RabbitMQConfig.QUEUE)
   public void listener(String msg, Channel channel, Message message) throws IOException {
       System.out.println("接收到消息" + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
   }

方式二、超过消息TTL 发送消息时设置TTL

@SpringBootTest
public class Publisher {
   @Autowired
   private RabbitTemplate template;
       /**
    * 5秒未被消费会路由到死信队列
    */
   @Test
   public void publish_expir() {
       template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
           message.getMessageProperties().setExpiration("5000");
           return message;
       });
   }
}
  • 设置队列所有消息的TTL

更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改

@Bean
   public Queue bootQueue() {
       return QueueBuilder.durable(QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               //声明队列属性有更改时需要删除队列
               //给队列设置消息时长
               .ttl(10000)
               .build();
   }

方式三、超过队列长度限制

设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。

@Bean
   public Queue bootQueue() {
       return QueueBuilder.durable(QUEUE)
               .deadLetterExchange(DEAD_EXCHANGE)
               .deadLetterRoutingKey(DEAD_ROUTING_KEY)
               //声明队列属性有更改时需要删除队列
               .maxLength(1)
               .build();
   }

代码仓库 点我

来源:https://wangbinguang.blog.csdn.net/article/details/128304529

标签:SpringBoot,RabbitMQ,死信队列
0
投稿

猜你喜欢

  • Java递归寻路实现,你真的理解了吗

    2022-09-17 02:24:34
  • java多线程中执行多个程序的实例分析

    2023-03-11 18:21:58
  • Java IO之File 类详解

    2023-08-07 20:02:12
  • Java的接口调用时的权限验证功能的实现

    2023-08-09 11:15:06
  • Java新API的时间格式化

    2023-02-14 19:24:37
  • 解析Java设计模式编程中命令模式的使用

    2023-11-12 04:49:45
  • Spring Boot接口幂等插件用法示例解析

    2022-04-29 16:47:11
  • Spring Boot 集成Dubbo框架实例

    2022-02-03 21:23:27
  • intellij idea14打包apk文件和查看sha1值

    2022-05-25 13:18:37
  • Java如果通过jdbc操作连接oracle数据库

    2023-04-07 10:55:51
  • 解决RedisTemplate调用increment报错问题

    2023-11-20 06:35:05
  • Java去掉小数点后面无效0的方案与建议

    2023-11-29 11:46:57
  • Java Lambda表达式与匿名内部类的联系和区别实例分析

    2022-01-05 21:10:22
  • SpringBoot集成整合JWT与Shiro流程详解

    2022-09-06 06:33:23
  • 深入解析JVM对dll文件和对类的装载过程

    2023-06-14 23:23:33
  • Spring中的aware接口详情

    2023-11-29 10:48:29
  • C#中动态数组用法实例

    2021-11-30 16:42:23
  • 一篇文章带你入门Springboot沙箱环境支付宝支付(附源码)

    2021-06-26 23:21:16
  • Java编程swing组件JLabel详解以及使用示例

    2023-12-15 22:28:50
  • Gradle修改本地仓库的位置方法实现

    2022-01-17 21:25:52
  • asp之家 软件编程 m.aspxhome.com