解决RabbitMq消息队列Qos Prefetch消息堵塞问题

作者:james_searcher 时间:2021-11-17 17:36:13 

mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。

ConnectionFactory:创建connection的工厂类

Connection: 简单理解为socket

Channel:和mq交互的接口,定义queue、exchange和绑定queue、exhange等接口都是它。

接下来就是和mq的交互类

exchange:简单地看成路由,类型不是重点,看看官网即可

queue:客户端监听的是queue,而不是exchange,但是使用queue的前提要先将exchange和queue绑定。用过java queue工具类应该很容易上手,queue分为写和读,各自可以有自己频率,写得快读得慢,容易堵塞;写得慢读得快又容易造成消费者的空闲。

Prefetc:一个重要却容易被忽略的指标,也是这次遇到的问题。

prefetch与消息投递

prefetch是指单一消费者最多能消费的unacked messages数目。

如何理解呢?

mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。

更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

我遇到的问题是一个粗心的程序员,在编写代码的时候,他对某些消息处理方式是这样的

if (success) {
                       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                   } else {
                       logger.error("######### The message is not delete from queue : {}", body);
                   }

首先他讲ack机制设置为手动的,然后他的理解是如果处理成功的消息,就ack给MQ,期望MQ就可以删除完成的数据。不然,保留数据再次被处理。

这里的误区就是就是对ack的理解,失败的时候,如果需要让程序继续处理,应该使用basicNack,并告诉mq将消息再次放入队列

if (success) {
                           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                       } else {
                           channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                       }

对于客户端意外宕机的情况,没有ack服务器确实不会删除掉数据,但是consumer重启以后,对于服务器就是一个新的消费者了,也就是它的缓冲区又被重置为原来的n-prefetch,所以这个问题被粗心的小哥想当然地测试通过了。

prefetch的大小应该为多少

这篇文章给了很好的建议,我简单地说一下我的理解。

理想状况下,计算MQ SERVER 从缓冲区中拿到消息并推送到消费端,加上消费端处理完ack消息到MQ server,的时间,假设为100ms,其中消费端处理业务话费了10ms。

这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能毛姑姑的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。

所以如果你保证所有的消息都ack了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。一开始建议采用或者,使用一个线程池来处理这些业务逻辑。

来源:https://blog.csdn.net/james_searcher/article/details/70308565

标签:RabbitMq,Qos,Prefetch,消息堵塞,消息队列
0
投稿

猜你喜欢

  • Android中SeekBar和RatingBar用法实例分析

    2023-07-28 00:13:59
  • flutter实现倒计时加载页面

    2023-08-18 23:30:09
  • C++ Boost MPI接口详细讲解

    2023-11-02 13:35:36
  • c#中分割字符串的几种方法

    2023-04-11 16:04:30
  • 浅谈Unity中的Shader

    2022-03-25 05:15:07
  • C#中TextBox的横线样式及占位提示详解

    2023-05-17 10:33:27
  • Java 替换字符串中的回车换行符的方法

    2022-10-05 10:55:22
  • Android使用 Coroutine + Retrofit打造简单的HTTP请求库

    2023-11-17 04:56:11
  • Java基础之容器Vector详解

    2023-11-25 13:10:07
  • Android Studio利用AChartEngine制作饼图的方法

    2021-12-28 10:53:30
  • C#(WinForm) ComboBox和ListBox添加项及设置默认选择项

    2022-05-13 10:09:49
  • 详解Android获取所有依赖库的几种方式

    2023-12-13 05:41:51
  • DevExpress实现禁用TreeListNode CheckBox的方法

    2022-07-20 22:17:43
  • java使用stream判断两个list元素的属性并输出方式

    2023-02-06 14:15:18
  • 浅谈三分钟学习Java泛型中T、E、K、V、?的含义

    2022-09-01 20:12:38
  • SpringMVC文件上传原理及实现过程解析

    2021-09-03 00:24:25
  • Eclipse自定义启动画面和图标的方法介绍

    2022-05-14 09:27:13
  • 详解Android消息机制完整的执行流程

    2021-10-14 18:11:00
  • Android超详细讲解组件ScrollView的使用

    2022-08-05 09:55:18
  • Java语法基础之函数的使用说明

    2022-07-20 15:55:09
  • asp之家 软件编程 m.aspxhome.com