RabbitMQ消息有效期与死信的处理过程

作者:lss0555 时间:2023-11-10 20:29:42 

一.前言

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

设置TTL有两种方式:

  • 队列有效期:是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期

  • 消息有效期:发送消息时给消息设置属性,可以为每条消息都设置不同的TTL

如果两种方式都设置了,则以设置的较小的为准。

  • 区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。

二.设置消息有效期

1.设置队列的有效期TTL

定义队列的方法如下:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) throws IOException;

该方法的arguments参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。后台添加的话如下:

RabbitMQ消息有效期与死信的处理过程

代码中设置如下:

Map<String, Object> arguments= new HashMap<String , Object>();
arguments.put("x-message-ttl " , 10000);//10秒钟  单位为毫秒
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":100000}' --apply-to queues

通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 100000}}'
http://ip:15672/api/queues/{vhost}/{queuename}

2.设置队列的有效期Expire

有效期Expire可以让队列在指定时间内 &ldquo;未被使用&rdquo; 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 10000 ,则表示该 queue 如果在 10s之内未被使用则会被删除。

代码如下:

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 10000);
channel.queueDeclare("queue", false, false, false, args);

3.通过发送消息时设置有效期

发送消息的方法如下:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在该方法的props参数可以设置其有效期:

Map<String, Object> headers = new HashMap<String, Object>();
                       AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                               .deliveryMode(2) // 消息持久
                               .contentEncoding("UTF-8") // 编码方式
                               .contentType("text/plain")
                               .expiration("100000")
                               .headers(headers)
                               .build();
     channel.basicPublish("", queueName, properties, message.getBytes());

通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"}'  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

三.死信交换机DLX

介绍

  • 死信队列:DLX,dead-letter-exchange

  • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信几种情况

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

  • 消息过期

  • 队列达到最大长度

死信处理过程

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

  • 可以监听这个队列中的消息做相应的处理。

用途

通过监控消费死信队列中消息,来观察和分析数据。
结合TTL实现延迟队列(比如下单超过多长时间自动关闭)

使用

代码如下:

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置死信交换机
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
channel.queueDeclare("myqueue" , false , false , false , args);

实例

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明一个交换机,做死信交换机用
channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
//声明一个队列,做死信队列用
channel.queueDeclare("dlx_queue", true, false, false, null);
//队列绑定到交换机上
channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");

channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
Map<String, Object> arguments=new HashMap<String, Object>();
arguments.put("x-message-ttl" , 1000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键
//为队列normal_queue 添加DLX
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueBind("normal_queue", "normal_exchange", "");
channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));
channel.close();
connection.close();
}

说明:

申明死信队列dlx_queue的绑定如下,与死信交换机dlx_exchange(topic类型)进行绑定,routing key为"dlx.*"
申明队列normal_queue,与交换机normal_exchange(fanout类型)进行绑定

执行流程:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上

  • 因为队列normal_queue没有消费者,消息过期后成为死信消息

  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage

  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

参考:

https://www.jianshu.com/p/986ee5eb78bc

https://blog.csdn.net/u012988901/article/details/88958654

来源:https://blog.csdn.net/u010520146/article/details/103286837

标签:RabbitMQ,消息有效期,死信
0
投稿

猜你喜欢

  • Java synchronized关键_动力节点Java学院整理

    2023-11-10 11:08:53
  • SpringCloud学习笔记之SpringCloud搭建父工程的过程图解

    2022-10-28 02:38:30
  • Java实现注册登录与邮箱发送账号验证激活功能

    2023-11-05 09:21:57
  • SpringBoot Admin用法实例讲解

    2021-07-26 17:57:58
  • Flutter的键值存储数据库使用示例详解

    2023-10-15 02:13:15
  • c#哈希算法的实现方法及思路

    2023-10-22 01:48:08
  • Spring注解驱动之BeanDefinitionRegistryPostProcessor原理解析

    2023-11-24 23:24:21
  • C#学习笔记之状态模式详解

    2021-09-15 21:56:08
  • C# 文字代码页 文字编码的代码页名称速查表

    2023-12-13 04:03:54
  • java输入字符串并将每个字符输出的方法

    2022-10-04 01:25:37
  • java实现两个对象之间传值及简单的封装

    2022-03-11 13:53:18
  • C#中类与接口的区别个人总结

    2023-05-08 17:29:40
  • 关于idea更新到2020.2.3无法创建web项目原因 library is not specified

    2022-11-24 10:13:28
  • Java并发编程之同步容器

    2023-03-10 16:34:29
  • Java算法比赛常用方法实例总结

    2023-11-28 07:15:26
  • kettle中使用js调用java类的方法

    2022-05-09 00:06:31
  • Android利用CountDownTimer实现验证码倒计时效果实例

    2022-07-13 22:35:39
  • unity实现手游虚拟摇杆

    2021-11-23 07:16:44
  • C# 如何规范的写 DEBUG 输出

    2023-04-27 03:12:37
  • Java线程安全的计数器简单实现代码示例

    2023-11-09 15:41:57
  • asp之家 软件编程 m.aspxhome.com