SpringBoot集成RabbitMQ的方法(死信队列)

作者:小揪揪 时间:2023-06-10 15:12:06 

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

SpringBoot集成RabbitMQ的方法(死信队列)

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装MQ

使用docker方式安装,选择带mangement的版本


docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置


package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

//time
 @Value("${spring.df.buffered.min:120}")
 private int springdfBufferedTime;

@Value("${spring.df.high-buffered.min:5}")
 private int springdfHighBufferedTime;

@Value("${spring.df.low-buffered.min:120}")
 private int springdfLowBufferedTime;

// 30min Buffered Queue
 @Value("${spring.df.queue:spring-df-buffered-queue}")
 private String springdfBufferedQueue;

@Value("${spring.df.topic:spring-df-buffered-topic}")
 private String springdfBufferedTopic;

@Value("${spring.df.route:spring-df-buffered-route}")
 private String springdfBufferedRouteKey;

// 5M Buffered Queue
 @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
 private String springdfHighBufferedQueue;

@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
 private String springdfHighBufferedTopic;

@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
 private String springdfHighBufferedRouteKey;

// High Queue
 @Value("${spring.df.high.queue:spring-df-high-queue}")
 private String springdfHighQueue;

@Value("${spring.df.high.topic:spring-df-high-topic}")
 private String springdfHighTopic;

@Value("${spring.df.high.route:spring-df-high-route}")
 private String springdfHighRouteKey;

// 2H Low Buffered Queue
 @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
 private String springdfLowBufferedQueue;

@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
 private String springdfLowBufferedTopic;

@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
 private String springdfLowBufferedRouteKey;

// Low Queue
 @Value("${spring.df.low.queue:spring-df-low-queue}")
 private String springdfLowQueue;

@Value("${spring.df.low.topic:spring-df-low-topic}")
 private String springdfLowTopic;

@Value("${spring.df.low.route:spring-df-low-route}")
 private String springdfLowRouteKey;

@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
 Queue springdfBufferedQueue() {
   int bufferedTime = 1000 * 60 * springdfBufferedTime;
   return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
 Queue springdfHighBufferedQueue() {
   int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
   return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
 Queue springdfHighQueue() {
   return new Queue(springdfHighQueue, true);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
 Queue springdfLowBufferedQueue() {
   int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
   return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
 Queue springdfLowQueue() {
   return new Queue(springdfLowQueue, true);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
 TopicExchange springdfBufferedTopic() {
   return new TopicExchange(springdfBufferedTopic);
 }

@Bean
 Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
   return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
 TopicExchange springdfHighBufferedTopic() {
   return new TopicExchange(springdfHighBufferedTopic);
 }

@Bean
 Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
   return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
 TopicExchange springdfHighTopic() {
   return new TopicExchange(springdfHighTopic);
 }

@Bean
 Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
   return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
 TopicExchange springdfLowBufferedTopic() {
   return new TopicExchange(springdfLowBufferedTopic);
 }

@Bean
 Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
   return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
 }

@Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
 TopicExchange springdfLowTopic() {
   return new TopicExchange(springdfLowTopic);
 }

@Bean
 Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
   return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
 }

@Bean
 SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                      MessageListenerAdapter listenerAdapter) {
   SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
   container.setConnectionFactory(connectionFactory);
   container.setQueueNames(springdfHighQueue, springdfLowQueue);
   container.setMessageListener(listenerAdapter);
   return container;
 }

@Bean
 MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {

MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
   adapter.setDefaultListenerMethod("receive");
   Map<String, String> queueOrTagToMethodName = new HashMap<>();
   queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
   queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
   adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
   return adapter;

}

private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
   Map<String, Object> args = new HashMap<>();
   args.put("x-dead-letter-exchange", topic);
   args.put("x-dead-letter-routing-key", routeKey);
   args.put("x-message-ttl", bufferedTime);
   // 是否持久化
   boolean durable = true;
   // 仅创建者可以使用的私有队列,断开后自动删除
   boolean exclusive = false;
   // 当所有消费客户端连接断开后,是否自动删除队列
   boolean autoDelete = false;

return new Queue(queueName, durable, exclusive, autoDelete, args);
 }
}

消费者配置


package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

@Value("${high-retry:5}")
 private int highRetry;

@Value("${low-retry:5}")
 private int lowRetry;

@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
 private String springdfHighBufferedTopic;

@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
 private String springdfHighBufferedRouteKey;

@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
 private String springdfLowBufferedTopic;

@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
 private String springdfLowBufferedRouteKey;

private final RabbitTemplate rabbitTemplate;
 @Autowired
 public MqReceiver(RabbitTemplate rabbitTemplate) {
   this.rabbitTemplate = rabbitTemplate;
 }

public void receive(Object message) {
   if (logger.isInfoEnabled()) {
     logger.info("default receiver: " + message);
   }
 }

/**
  * 消息从初始队列进入5分钟的高速缓冲队列
  * @param message
  */
 public void highReceiver(Object message){
   ObjectMapper mapper = new ObjectMapper();
   Map msg = mapper.convertValue(message, Map.class);

try{
     logger.info("这里做消息处理...");
   }catch (Exception e){
     int times = msg.get("times") == null ? 0 : (int) msg.get("times");
     if (times < highRetry) {
       msg.put("times", times + 1);
       rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
     } else {
       msg.put("times", 0);
       rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
     }
   }
 }

/**
  * 消息从5分钟缓冲队列进入2小时缓冲队列
  * @param message
  */
 public void lowReceiver(Object message){
   ObjectMapper mapper = new ObjectMapper();
   Map msg = mapper.convertValue(message, Map.class);

try {
     logger.info("这里做消息处理...");
   }catch (Exception e){
     int times = msg.get("times") == null ? 0 : (int) msg.get("times");
     if (times < lowRetry) {
       rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
     }else{
       logger.info("消息无法被消费...");
     }
   }
 }
}

来源:https://segmentfault.com/a/1190000019042102

标签:SpringBoot,集成,RabbitMQ
0
投稿

猜你喜欢

  • 简述Java编程之关系操作符

    2021-08-07 09:44:23
  • 在当前Activity之上创建悬浮view之WindowManager悬浮窗效果

    2023-02-09 21:02:12
  • springboot @ConditionalOnMissingBean注解的作用详解

    2021-11-27 09:33:59
  • 10个C#程序员经常用到的实用代码片段

    2022-12-01 13:02:58
  • Java设计模式之java备忘录模式详解

    2023-08-22 19:31:07
  • android之listview悬浮topBar效果

    2022-12-24 23:29:58
  • 使用C# 的webBrowser写模拟器时的javascript脚本调用问题

    2022-03-14 23:56:31
  • android控件实现多张图片渐变切换

    2022-06-18 20:11:57
  • springcloud之自定义简易消费服务组件

    2022-01-29 00:18:24
  • 彻底搞懂Java多线程(一)

    2023-08-02 10:42:30
  • SpringBoot 配置文件总结

    2021-09-06 13:12:57
  • SpringMVC源码解读之HandlerMapping - AbstractUrlHandlerMapping系列request分发

    2022-07-26 20:39:48
  • SpringBoot响应处理之以Json数据返回的实现方法

    2023-12-21 12:21:47
  • kotlin gson反序列化默认值失效深入讲解

    2022-04-07 15:28:59
  • java利用多线程和Socket实现猜拳游戏

    2022-10-03 08:03:30
  • flutter升级3.7.3报错Unable to find bundled Java version解决

    2023-07-29 16:57:52
  • java基础-数组扩容详解

    2022-05-24 00:34:58
  • Java Vector实现班级信息管理系统

    2023-11-25 03:09:09
  • java 判断两个对象是否为同一个对象实例代码

    2022-09-19 22:31:35
  • Spring Security配置保姆级教程

    2023-11-07 11:46:56
  • asp之家 软件编程 m.aspxhome.com