RocketMQ源码解析topic创建机制详解

作者:蚂蚁背大象 时间:2023-03-16 01:06:23 

1. RocketMQ Topic创建机制

以下源码基于Rocket MQ 4.7.0

RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建

autoCreateTopicEnable=true/false

下面会结合源码来深度分析一下自动创建和手动创建的过程。

2. 自动Topic

默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中

// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

自动创建开关是下BrokerConfig类中有一个私有变量:

@ImportantField
private boolean autoCreateTopicEnable = true;

这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。

在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):

{
   // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
   if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
       String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
       TopicConfig topicConfig = new TopicConfig(topic);
       this.systemTopicList.add(topic);
       topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
           .getDefaultTopicQueueNums()); //8
       topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
           .getDefaultTopicQueueNums()); //8
       int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
       topicConfig.setPerm(perm);
       this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
   }
}

这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。

然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
               try {
                   BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
               } catch (Throwable e) {
                   log.error("registerBrokerAll Exception", e);
               }
           }
       }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

这里就说明了如何把每个Broker的系统自定义的Topic注册到NameServer。

接下来看在发送过程中如何从NameServer获取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl

private SendResult sendDefaultImpl(
       Message msg,
       final CommunicationMode communicationMode,
       final SendCallback sendCallback,
       final long timeout
   ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
       //省略代码
       //获取路由信息
       TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
   }

通过DefaultMQProducerImpl.tryToFindTopicPublishInfo方法获取Topic的路由信息。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
       //第一次从缓存中获取--肯定没有因为还没创建
       if (null == topicPublishInfo || !topicPublishInfo.ok()) {
           this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
           //从NameServer获取--也是没有,因为没有创建
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
       }
       if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
           return topicPublishInfo;
       } else {
           //第二次从这里获取
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
           return topicPublishInfo;
       }
   }

下面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
       DefaultMQProducer defaultMQProducer) {
   //省略代码
   if (isDefault && defaultMQProducer != null) {
           //使用默认的TBW102 Topic获取数据
           topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                           1000 * 3);
               if (topicRouteData != null) {
                   for (QueueData data : topicRouteData.getQueueDatas()) {
                               int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                               data.setReadQueueNums(queueNums);
                               data.setWriteQueueNums(queueNums);
                           }
                       }
                   } else {
                       //这是正常的
                       topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                   }
     //省略代码      
   }

如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。

这里会比较一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默认值和TBW102中的值哪个更小。

if (topicRouteData != null) {
       TopicRouteData old = this.topicRouteTable.get(topic);
       boolean changed = topicRouteDataIsChange(old, topicRouteData);
       if (!changed) {
           changed = this.isNeedUpdateTopicRouteInfo(topic);
       } else {
           log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
       }
}

判断获取默认的是否存在,如果存在把当前的Topic的信息更新。

也就是把TBW102 Topic的数据更新为自动创建的数据。

if (changed) {
   TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
   for (BrokerData bd : topicRouteData.getBrokerDatas()) {
       this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
   }
   // Update Pub info
   {
       TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
       publishInfo.setHaveTopicRouterInfo(true);
       Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
       while (it.hasNext()) {
           Entry<String, MQProducerInner> entry = it.next();
           MQProducerInner impl = entry.getValue();
           if (impl != null) {
               impl.updateTopicPublishInfo(topic, publishInfo);
           }
       }
   }
   // Update sub info
   {
       Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
       Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
       while (it.hasNext()) {
           Entry<String, MQConsumerInner> entry = it.next();
           MQConsumerInner impl = entry.getValue();
           if (impl != null) {
               impl.updateTopicSubscribeInfo(topic, subscribeInfo);
           }
       }
   }
   log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
   this.topicRouteTable.put(topic, cloneTopicRouteData);
   return true;
}

更新本地的缓存。这样TBW102 Topic的负载和一些默认的路由信息就会被自己创建的Topic使用。这里就是整个自动创建的过程.

总结一下就是:通过使用系统内部的一个TBW102的Topic的配置来自动创建当前用户的要创建的自定义Topic。

3. 手动创建--预先创建

手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面创建Topic。

通过界面控制台创建

项目地址: github.com/apache/rock&hellip;

TopicController主要负责Topic的管理

@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
   Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
           "clusterName or brokerName can not be all blank");
   logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
   topicService.createOrUpdate(topicCreateOrUpdateRequest);
   return true;
}

然后通过MQAdminExtImpl.createAndUpdateTopicConfig方法来创建:

@Override
   public void createAndUpdateTopicConfig(String addr, TopicConfig config)
       throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
       MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
   }

通过调用DefaultMQAdminExtImpl.createAndUpdateTopicConfig创建Topic

@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
       InterruptedException, MQClientException {
   this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}

最后通过MQClientAPIImpl.createTopic创建Topic

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
       final long timeoutMillis)
       throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
       CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
       requestHeader.setTopic(topicConfig.getTopicName());
       requestHeader.setDefaultTopic(defaultTopic);
       requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
       requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
       requestHeader.setPerm(topicConfig.getPerm());
       requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
       requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
       requestHeader.setOrder(topicConfig.isOrder());
       RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
       RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
           request, timeoutMillis);
       assert response != null;
       switch (response.getCode()) {
           case ResponseCode.SUCCESS: {
               return;
           }
           default:
               break;
       }
       throw new MQClientException(response.getCode(), response.getRemark());
   }

来源:https://juejin.cn/post/7133413162048569374

标签:RocketMQ,topic,创建
0
投稿

猜你喜欢

  • Java基础之面向对象机制(多态、继承)底层实现

    2023-11-12 02:59:59
  • spring缓存自定义resolver的方法

    2021-05-30 17:07:56
  • ScrollView与ListView合用(正确计算Listview的高度)的问题解决

    2021-12-30 07:14:58
  • Java压缩文件工具类ZipUtil使用方法代码示例

    2022-11-26 02:21:32
  • Java中Json字符串直接转换为对象的方法(包括多层List集合)

    2021-06-26 18:13:42
  • Java中静态类型检查是如何进行的实例思路详解

    2022-01-01 16:08:30
  • Spring如何在一个事务中开启另一个事务

    2021-08-30 17:29:15
  • Android App后台服务报告工作状态实例

    2023-04-26 19:10:34
  • c#图片处理之图片裁剪成不规则图形

    2023-02-23 02:43:58
  • SpringBoot实现MapperScan添加动态配置(占位符)

    2023-11-26 05:08:06
  • 实战SpringBoot集成JWT实现token验证

    2022-10-07 15:57:49
  • 举例讲解JDK注解的使用和自定义注解的方法

    2022-06-29 17:34:52
  • Android开发中MJRefresh自定义刷新动画效果

    2023-11-27 06:04:20
  • 在VSCode里使用Jupyter Notebook调试Java代码的详细过程

    2022-03-25 07:14:12
  • Android ScrollView 下嵌套 ListView 或 GridView出现问题解决办法

    2023-03-31 07:17:04
  • Flutter 分页功能表格控件详细解析

    2023-09-22 20:02:45
  • 提升java开发效率工具lombok使用争议

    2022-06-22 03:08:18
  • C#连接ODBC数据源的方法

    2023-04-20 07:30:33
  • 浅谈Java代码的 微信长链转短链接口使用 post 请求封装Json(实例)

    2023-07-27 19:36:09
  • UnityUI中绘制线状统计图

    2022-12-03 14:30:43
  • asp之家 软件编程 m.aspxhome.com