基于rocketmq的有序消费模式和并发消费模式的区别说明

作者:从心归零 时间:2021-10-29 08:41:02 

rocketmq消费者注册监听有两种模式

有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。


MessageListenerOrderly

正确消费返回


ConsumeOrderlyStatus.SUCCESS

稍后消费返回


ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

MessageListenerConcurrently

正确消费返回


ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消费返回


ConsumeConcurrentlyStatus.RECONSUME_LATER

顾名思义,有序消费模式是按照消息的顺序进行消费,但是除此之外,在实践过程中我发现和并发消费模式还有很大的区别的。

第一,速度,下面我打算用实验来探究一下。

使用mq发送消息,消费者使用有序消费模式消费,具体的业务是阻塞100ms


Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
       logger.info("==========CONSUME_START===========");  
logger.info(Thread.currentThread().getName()  
                           + " Receive New Messages: " + msgs.size());  
       try {
       if(date1 == null)
       date1 = new Date();//在第一次消费时初始化
       Thread.sleep(100);
      logger.info("total:"+(++total));
       date2 = new Date();
      totalTime = (date2.getTime() - date1.getTime());
      logger.info("totalTime:"+totalTime);
           logger.info("==========CONSUME_SUCCESS===========");  
           return ConsumeOrderlyStatus.SUCCESS;  
       }catch (Exception e) {
           logger.info("==========RECONSUME_LATER===========");  
           logger.error(e.getMessage(),e);
           return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
       }
}
}

消费100条消息

基于rocketmq的有序消费模式和并发消费模式的区别说明

速度挺快的,为了让结果更准确,将消息加到1000条

消费1000条消息

基于rocketmq的有序消费模式和并发消费模式的区别说明

可以看到每一条消息平均耗时25ms,然而业务是阻塞100ms,这说明有序消费模式和同步消费可能并不是一回事,那如果不阻塞代码我们再来看一下结果

基于rocketmq的有序消费模式和并发消费模式的区别说明

不阻塞过后速度明显提高了,那么我阻塞300ms会怎么样呢?

基于rocketmq的有序消费模式和并发消费模式的区别说明

时间相比阻塞100ms多了2倍

接下来我们测试并发消费模式


Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
   public ConsumeConcurrentlyStatus consumeMessage(  
                      List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  

logger.info(Thread.currentThread().getName()  
                                + " Receive New Messages: " + msgs.size());
   try {
   if(date1 == null)
   date1 = new Date();
           Thread.sleep(100);
          logger.info("total:"+(++total));
          date2 = new Date();
          totalTime = (date2.getTime() - date1.getTime());
          logger.info("totalTime:"+totalTime);
               logger.info("==========CONSUME_SUCCESS===========");  
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
           } catch (Exception e) {
               logger.info("==========RECONSUME_LATER===========");  
               logger.error(e.getMessage(),e);
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
   }  
}

基于上次的经验,同样测试三种情况,消费1000条不阻塞,消费1000条阻塞100ms,消费1000条阻塞300ms

消费1000条不阻塞的情况

基于rocketmq的有序消费模式和并发消费模式的区别说明

和有序消费模式差不多,快个一两秒。

消费1000条阻塞100ms

基于rocketmq的有序消费模式和并发消费模式的区别说明

竟然比不阻塞的情况更快,可能是误差把

消费1000条阻塞300ms

基于rocketmq的有序消费模式和并发消费模式的区别说明

速度稍慢,但是还是比有序消费快得多。

结论是并发消费的消费速度要比有序消费更快。

另一个区别是消费失败时的处理不同,有序消费模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消费者会立马消费这条消息,而使用并发消费模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要过好几秒甚至十几秒才会再次消费。

我是在只有一条消息的情况下测试的。更重要的区别是,

返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不会增加消息的消费次数,mq消息有个默认最大消费次数16,消费次数到了以后,这条消息会进入死信队列,这个最大消费次数是可以在mqadmin中设置的。


mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3

我测试后发现,并发模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一个消息到达最大消费次数之后就不会再出现了。这说明有序消费模式可能并没有这个机制,这意味着你再有序消费模式下抛出固定异常,那么这条异常信息将会被永远消费,并且很可能会影响之后正常的消息。下面依然做个试验


Map<String, Integer> map = new HashMap<>();//保存消息错误消费次数
new MessageListenerOrderly() {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
       try {
       if(1 == 1)
       throw new Exception();
           return ConsumeOrderlyStatus.SUCCESS;  
       }catch (Exception e) {
       MessageExt msg = msgs.get(0);
if(map.containsKey(msg.getKeys())) {//消息每消费一次,加1
   map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
}else {
   map.put(msg.getKeys(), 1);
}
logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
           return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
       }
}
}

发送了十条消息

基于rocketmq的有序消费模式和并发消费模式的区别说明

可以看到虽然我发了十条消息,但是一直在消费同样四条消息,这可能跟消息broker有默认四条队列有关系。同时从时间可以看到,消费失败后,会马上拉这条信息。

至于并发消费模式则不会无限消费,而且消费失败后不会马上再消费。具体的就不尝试了。

结论是有序消费模式MessageListenerOrderly要慎重地处理异常,我则是用全局变量记录消息的错误消费次数,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS。

突然想到之前测试有序消费模式MessageListenerOrderly的时候为什么1000条消息阻塞100ms耗时25000ms了,因为有序消费模式是同时拉取四条队列消息的,这就对上了。

来源:https://blog.csdn.net/qq_36804701/article/details/81481343

标签:rocketmq,有序,消费模式,并发
0
投稿

猜你喜欢

  • Java基础之List内元素的排序性能对比

    2023-04-05 15:13:58
  • 浅谈Java中Map和Set之间的关系(及Map.Entry)

    2023-08-25 02:23:48
  • Struts2 文件上传进度条的实现实例代码

    2023-04-20 11:13:59
  • SpringSecurity登录使用JSON格式数据的方法

    2021-09-10 21:40:40
  • Mybatis环境搭建及文件配置过程解析

    2021-07-04 22:37:03
  • 使用WebSocket实现即时通讯(一个群聊的聊天室)

    2023-11-29 03:00:46
  • Windows下安装ElasticSearch的方法(图文)

    2023-11-25 13:35:35
  • JAVA使用POI(XSSFWORKBOOK)读取EXCEL文件过程解析

    2023-03-01 16:36:39
  • SpringBoot整合MyCat实现读写分离的方法

    2022-03-05 23:37:45
  • 使用Spring Cloud Feign远程调用的方法示例

    2021-12-06 10:30:09
  • Java 添加、替换、删除PDF中的图片的示例代码

    2023-08-28 09:06:26
  • IntelliJ IDEAx导出安卓(Android)apk文件图文教程

    2022-06-22 18:26:16
  • Android实现腾讯新闻的新闻类别导航效果

    2023-07-29 04:17:46
  • java获取中文拼音首字母工具类定义与用法实例

    2023-07-14 08:23:55
  • spring boot实现自动输出word文档功能的实例代码

    2021-11-10 13:37:51
  • Java+opencv3.2.0实现人脸检测功能

    2022-11-27 10:36:42
  • Mybatis中的延迟加载案例解析

    2023-02-27 01:55:37
  • list集合去除重复对象的实现

    2022-10-16 23:02:42
  • 解读maven配置阿里云镜像问题

    2023-02-28 11:57:56
  • C#线程入门教程之单线程介绍

    2022-03-15 20:37:28
  • asp之家 软件编程 m.aspxhome.com