RocketMQ producer发送者浅析

作者:Acqierement 时间:2023-04-03 06:35:32 

发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。

首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。

启动的流程主要在这个方法中:

MQClientInstance#start

public void start() throws MQClientException {
   synchronized (this) {
       switch (this.serviceState) {
           case CREATE_JUST:
               this.serviceState = ServiceState.START_FAILED;
               // If not specified,looking address from name server
               if (null == this.clientConfig.getNamesrvAddr()) {
                   this.mQClientAPIImpl.fetchNameServerAddr();
               }
               // Start request-response channel
               this.mQClientAPIImpl.start();
               // Start various schedule tasks
               this.startScheduledTask();
               // Start pull service
               this.pullMessageService.start();
               // Start rebalance service
               this.rebalanceService.start();
               // Start push service
               this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
               log.info("the client factory [{}] start OK", this.clientId);
               this.serviceState = ServiceState.RUNNING;
               break;
           case START_FAILED:
               throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
           default:
               break;
       }
   }
}

其中启动了一系列定时任务,包括org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer这个方法

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
       DefaultMQProducer defaultMQProducer) {
       try {
           if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
               try {
                   TopicRouteData topicRouteData;
                   if (isDefault && defaultMQProducer != null) {
                       // 从nameServer获取topciRouteData
                       topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                           clientConfig.getMqClientApiTimeout());
                       if (topicRouteData != null) {
                           for (QueueData data : topicRouteData.getQueueDatas()) {
                               int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                               data.setReadQueueNums(queueNums);
                               data.setWriteQueueNums(queueNums);
                           }
                       }
                   } else {
                       topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                   }
                   if (topicRouteData != null) {
                       TopicRouteData old = this.topicRouteTable.get(topic);
                       boolean changed = topicRouteData.topicRouteDataChanged(old);
                       if (!changed) {
                           changed = this.isNeedUpdateTopicRouteInfo(topic);
                       } else {
                           log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                       }
                       if (changed) {
                           for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                               this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                           }
                           // Update endpoint map
                           {
                               ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                               if (!mqEndPoints.isEmpty()) {
                                   topicEndPointsTable.put(topic, mqEndPoints);
                               }
                           }
                           // Update Pub info
                           {
                               // 生成topicPublishInfo
                               TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                               publishInfo.setHaveTopicRouterInfo(true);
                               for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                                   MQProducerInner impl = entry.getValue();
                                   if (impl != null) {
                                       // 更新 topicPublishInfo
                                       impl.updateTopicPublishInfo(topic, publishInfo);
                                   }
                               }
                           }
                           // Update sub info
                           if (!consumerTable.isEmpty()) {
                               Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                               for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                                   MQConsumerInner impl = entry.getValue();
                                   if (impl != null) {
                                       impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                   }
                               }
                           }
                           TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                           log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                           this.topicRouteTable.put(topic, cloneTopicRouteData);
                           return true;
                       }
                   } else {
                       log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                   }
               } catch (MQClientException e) {
                   if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                       log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                   }
               } catch (RemotingException e) {
                   log.error("updateTopicRouteInfoFromNameServer Exception", e);
                   throw new IllegalStateException(e);
               } finally {
                   this.lockNamesrv.unlock();
               }
           } else {
               log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
           }
       } catch (InterruptedException e) {
           log.warn("updateTopicRouteInfoFromNameServer Exception", e);
       }
       return false;
   }

通过方法名也知道是从nameServer获取这个topic相关的broke数据,拿到TopicRouteData数据。先更新brokerAddrTable,存储borker具体的地址。然后在org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo里面再进一步生成TopicPublishInfo数据。TopicPublishInfo是对TopicRouteData的一个封装,除了TopicRouteData,还有messageQueue数据,messageQueue是Queue和Borker的交集,会根据配置的queue数量,生成具体的messageQueue,queueId就是0,1,2,3,4他们自己的顺序。

所以有了TopicPublishInfo数据,就知道往哪里发了。

发送消息的过程。

  • 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。

  • 从MessageQueueList里面拿到一个messageQueue。 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。

  • 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址就是前面提到的brokerAddrTable变量,在updateTopicRouteInfoFromNameServer方法里面维护的。

来源:https://blog.csdn.net/weixin_43094917/article/details/129934869

标签:RocketMQ,producer,发送者
0
投稿

猜你喜欢

  • SpringBoot整合Groovy脚本实现动态编程详解

    2023-04-02 03:24:16
  • Android ViewPager实现轮播图效果

    2023-03-09 20:44:53
  • Android中mvp模式使用实例详解

    2023-12-11 19:48:04
  • 下载软件后使用c#获取文件的md5码示例

    2022-02-08 21:39:58
  • Android实现聊天界面

    2023-04-09 22:57:57
  • IntelliJ IDEA(2020.2)的下载、安装步骤详细教程

    2023-11-25 07:10:16
  • Java解析XML的四种方法详解

    2022-07-02 23:39:33
  • Java设计模式之抽象工厂模式(Abstract Factory)

    2021-08-31 02:39:48
  • 微信举报解除和微信解除限制的6个方法

    2022-01-07 22:30:46
  • Java利用Jackson序列化实现数据脱敏详解

    2023-12-22 17:47:38
  • JavaBean和SpringBean的区别及创建SpringBean方式

    2022-05-23 03:32:14
  • 关于mybatis resulttype 返回值异常的问题

    2021-08-09 20:26:19
  • C#实现rabbitmq 延迟队列功能实例代码

    2023-03-18 02:59:34
  • Unity2021发布WebGL与网页交互问题的解决

    2023-01-27 23:44:17
  • Android单点触控实现图片平移、缩放、旋转功能

    2022-08-18 04:16:52
  • Android ActivityManager使用案例详解

    2021-11-09 14:35:54
  • 使用java从乱码文本中解析出正确的文本

    2023-12-03 12:10:22
  • springmvc+shiro+maven 实现登录认证与权限授权管理

    2023-07-04 14:33:03
  • C#中一些你可能没用过的调试窗口的方法

    2022-08-07 09:01:16
  • Java Date时间类型的操作实现

    2023-11-25 06:44:31
  • asp之家 软件编程 m.aspxhome.com