RocketMQ消息生产者是如何选择Broker示例详解

作者:梦想实现家_Z 时间:2023-11-10 21:45:49 

前言

RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?

从NameServer查询Topic信息

通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

这个方法就是通过topicNameServer拉出对应的Broker信息:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
       if (null == topicPublishInfo || !topicPublishInfo.ok()) {
           this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
       }
       if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
           return topicPublishInfo;
       } else {
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
           return topicPublishInfo;
       }
   }

1.一开始的话,是从当前缓存中找Topic信息,第一次肯定是找不到的;

2.找不到Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)NameServer拉对应的信息,如果拉到了就更新到缓存中;

3.如果依然找不到Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;

生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;

如何选择Broker

当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
           int times = 0;
           String[] brokersSent = new String[timesTotal];
           for (; times < timesTotal; times++) {
               String lastBrokerName = null == mq ? null : mq.getBrokerName();
               MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               if (mqSelected != null) {
                   mq = mqSelected;
                   brokersSent[times] = mq.getBrokerName();
                 ......

1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;

2.选择Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;

追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
       if (this.sendLatencyFaultEnable) {
           try {
               int index = tpInfo.getSendWhichQueue().incrementAndGet();
               for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                   int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                   if (pos < 0)
                       pos = 0;
                   MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                   if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                       return mq;
               }
               final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
               int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
               if (writeQueueNums > 0) {
                   final MessageQueue mq = tpInfo.selectOneMessageQueue();
                   if (notBestBroker != null) {
                       mq.setBrokerName(notBestBroker);
                       mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                   }
                   return mq;
               } else {
                   latencyFaultTolerance.remove(notBestBroker);
               }
           } catch (Exception e) {
               log.error("Error occurred when selecting message queue", e);
           }
           return tpInfo.selectOneMessageQueue();
       }
       return tpInfo.selectOneMessageQueue(lastBrokerName);
   }

1.如果开启了延迟故障规避,那么执行规避策略;

  • 1.1:轮询找一个Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);

  • 1.2:如果所有的Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;

  • 1.3:如果依然没有找到合适的Broker,那么就随机选一个Broker

2.否则就随机选一个Broker

下面我们来看一下随机发送的策略是怎么实现的:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
       if (lastBrokerName == null) {
           return selectOneMessageQueue();
       } else {
           for (int i = 0; i < this.messageQueueList.size(); i++) {
               int index = this.sendWhichQueue.incrementAndGet();
               int pos = Math.abs(index) % this.messageQueueList.size();
               if (pos < 0)
                   pos = 0;
               MessageQueue mq = this.messageQueueList.get(pos);
               if (!mq.getBrokerName().equals(lastBrokerName)) {
                   return mq;
               }
           }
           return selectOneMessageQueue();
       }
   }
   public MessageQueue selectOneMessageQueue() {
       int index = this.sendWhichQueue.incrementAndGet();
       int pos = Math.abs(index) % this.messageQueueList.size();
       if (pos < 0)
           pos = 0;
       return this.messageQueueList.get(pos);
   }

1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker,其实就是轮询方式;

2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker服务,也是为了让Broker服务负载均衡;

3.如果没有避开上一次的Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker

小结

通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:

1.第一次发消息,通过轮询的方式选择Broker

2.后续发消息会规避上次的Broker,同样采用轮询的方式选择Broker

3.在消息发送过程中,存在一个Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;

来源:https://juejin.cn/post/7169539055548858398

标签:RocketMQ,消息,生产者,Broker
0
投稿

猜你喜欢

  • java基础知识之FileInputStream流的使用

    2021-07-05 00:30:24
  • C# 计算DataTime的4种时间差的方法(相差天数、相差小时、相差分钟、相差秒)

    2022-12-08 10:37:07
  • Java多线程编程详细解释

    2022-12-22 10:19:41
  • 一篇超详细的SpringBoot整合MybatisPlus的文章

    2023-02-26 11:04:35
  • Android RecyclerView添加搜索过滤器的示例代码

    2022-03-08 21:44:49
  • Java面试题冲刺第十六天--消息队列

    2022-08-08 09:07:04
  • Java和C#输入输出流的方法(详解)

    2022-06-24 09:21:02
  • Android检测手机多点触摸点数的方法

    2023-03-10 11:08:35
  • Java Controller实现参数验证与统一异常处理流程详细讲解

    2022-01-25 18:49:47
  • Java实现顺序表和链表结构

    2023-11-13 09:35:43
  • java基础javeSE程序逻辑控制语法

    2022-09-21 23:41:47
  • Java中JFrame实现无边框无标题方法

    2021-11-25 20:35:54
  • 浅析Disruptor高性能线程消息传递并发框架

    2023-02-26 14:09:01
  • Java 的可变参数方法详述

    2022-06-03 19:12:34
  • WPF实现页面的切换的示例代码

    2023-09-26 21:35:27
  • SpringBoot Redis用注释实现接口限流详解

    2022-03-15 17:40:55
  • c#对象反序列化与对象序列化示例详解

    2022-08-19 16:32:54
  • Android自定义控件实现优雅的广告轮播图

    2023-01-14 22:15:51
  • java数据结构-堆实现优先队列

    2023-11-25 08:30:20
  • Java SpringCache+Redis缓存数据详解

    2023-11-29 01:01:05
  • asp之家 软件编程 m.aspxhome.com