RocketMQ生产者一个应用不能发送多个NameServer消息解决

作者:梦想实现家_Z 时间:2022-05-18 15:56:11 

前言

目前有两套RocketMQ集群,集群A包含topic名称为cluster_A_topic,集群B包含topic名称为cluster_B_topic,在应用服务OrderApp上通过RocketMQ Client创建两个DefaultMQProducer实例发送消息给集群A和集群B

架构图如下:

RocketMQ生产者一个应用不能发送多个NameServer消息解决

根据上述架构图,我们给出的示例代码如下:

// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
   // 设置nameServer地址
   producer1.setNamesrvAddr("192.168.2.230:9876");
   try {
     producer1.start();
     // 发送消息
     SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
     switch (result1.getSendStatus()) {
       case SEND_OK:
         System.out.println("cluster_A_topic 发送成功!");
         break;
       case FLUSH_DISK_TIMEOUT:
         System.out.println("cluster_A_topic 持久化失败!");
         break;
       case FLUSH_SLAVE_TIMEOUT:
         System.out.println("cluster_A_topic 同步slave失败!");
         break;
       case SLAVE_NOT_AVAILABLE:
         System.out.println("cluster_A_topic 副本不可用!");
     }
   } catch (Exception e) {
     e.printStackTrace();
   }
   // 创建第二个DefaultMQProducer
   DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
   // 设置nameServer地址
   producer2.setNamesrvAddr("192.168.2.231:9876");
   try {
     producer2.start();
     // 发送消息
     SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
     switch (result2.getSendStatus()) {
       case SEND_OK:
         System.out.println("cluster_B_topic 发送成功!");
         break;
       case FLUSH_DISK_TIMEOUT:
         System.out.println("cluster_B_topic 持久化失败!");
         break;
       case FLUSH_SLAVE_TIMEOUT:
         System.out.println("cluster_B_topic 同步slave失败!");
         break;
       case SLAVE_NOT_AVAILABLE:
         System.out.println("cluster_B_topic 副本不可用!");
     }
     return "ok";
   } catch (Exception e) {
     e.printStackTrace();
   } finally {
     producer1.shutdown();
     producer2.shutdown();
   }

结果竟然报错了,报错内容时cluster_B_topic不存在:

RocketMQ生产者一个应用不能发送多个NameServer消息解决

经过不断的测试,发现只有放在最前面启动的DefaultMQProducer会生效,后面启动的DefaultMQProducer发送消息就报错说对应的topic不存在,而且报错的broker竟然是前面启动的DefaultMQProducer对应的broker。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

问题定位

首先说明一下,当前使用的RocketMQ Client版本是4.8.0。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸😢]。

我们都知道生产者是发送消息给Broker的,获取Broker信息是通过连接NameServer获取的。既然报错的Broker和目标Broker竟然不对应,肯定是后面启动的生产者获取的Broker不对。有了最基本的判断,我们先从DefaultMQProducer#start()入手,最终我们定位到这样一段代码DefaultMQProducerImpl#start(final boolean startFactory)

public void start(final boolean startFactory) throws MQClientException {
       switch (this.serviceState) {
           case CREATE_JUST:
               this.serviceState = ServiceState.START_FAILED;
               this.checkConfig();
// 如果生产者group名称不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
               if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                   this.defaultMQProducer.changeInstanceNameToPID();
               }
           // 创建MQClientInstance实例
               this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
           // 注册生产者实例到MQClientInstance中
               boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
               if (!registerOK) {
                   this.serviceState = ServiceState.CREATE_JUST;
                   throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                       + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                       null);
               }
           // 添加TBW102对应的topic信息,broker设置autoCreateTopicEnable = true才起作用
           this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
               if (startFactory) {
                   // 启动刚刚创建的MQClientInstance实例
                   mQClientFactory.start();
               }
               log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                   this.defaultMQProducer.isSendMessageWithVIPChannel());
               // 修改服务状态为RUNNING
               this.serviceState = ServiceState.RUNNING;
               break;
           case RUNNING:
           case START_FAILED:
           case SHUTDOWN_ALREADY:
               throw new MQClientException("The producer service state not OK, maybe started once, "
                   + this.serviceState
                   + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                   null);
           default:
               break;
       }

上面的代码主要是创建了MQClientInstance实例,并且通过start()方法启动。

通过针对这两段代码的debug,我们发现创建的两个DefaultMQProducer对象是共用了一个MQClientInstance实例,并且所有针对NameServerBroker的远程操作全部是通过MQClientInstance实例来做的。比如发送消息的时候需要找到对应的Broker下的消息队列:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
       if (null == topicPublishInfo || !topicPublishInfo.ok()) {
           this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
           // 从NameServer更新topic路由
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
       }
       if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
           return topicPublishInfo;
       } else {
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
           return topicPublishInfo;
       }
   }

最终我们发现两个DefaultMQProducer对象都是去同一个NameServer下获取对应的topic信息,这下问题就定位到了:因为使用了同一个MQClientInstance实例导致不同的DefaultMQProducer去访问了同一个NameServer,同一个集群需要同时接收两个topic的消息,也就出现了前面的报错说topic不存在的情况。

如何解决

我们来看看MQClientInstance实例是如何保证唯一性的:

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
       // 生成clientID
       String clientId = clientConfig.buildMQClientId();
       // 从缓存中获取MQClientInstance
       MQClientInstance instance = this.factoryTable.get(clientId);
       if (null == instance) {
           // 没有缓存的话就创建一个MQClientInstance
           instance =
               new MQClientInstance(clientConfig.cloneClientConfig(),
                   this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
           // 新创建出来的再放进缓存
           MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
           if (prev != null) {
               instance = prev;
               log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
           } else {
               log.info("Created new MQClientInstance for clientId:[{}]", clientId);
           }
       }
       // 返回MQClientInstance实例
       return instance;
   }

我们之所以拿到的MQClientInstance实例是同一个,是因为在同一个服务下创建的clientId相同:

public String buildMQClientId() {
       StringBuilder sb = new StringBuilder();
       sb.append(this.getClientIP());
       sb.append("@");
       sb.append(this.getInstanceName());
       if (!UtilAll.isBlank(this.unitName)) {
           sb.append("@");
           sb.append(this.unitName);
       }
       return sb.toString();
   }

两个clientId都是192.168.18.173@14933,为了防止clientId相同,我们可以在创建DefaultMQProducer实例是加上unitName值,保证两个unitName值不同来避免共享同一个MQClientInstance

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();

通过上述代码修改后,两个消息都发送成功了。

另一个办法就是升级RocketMQ Client4.9.0,我们来看一下RocketMQ Client 4.9.0是怎么解决这个问题的:

public void changeInstanceNameToPID() {
       if (this.instanceName.equals("DEFAULT")) {
           this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
       }
   }

RocketMQ Client 4.9.0在后面补充了一个纳秒值,之前的代码是这样的:

public void changeInstanceNameToPID() {
       if (this.instanceName.equals("DEFAULT")) {
           this.instanceName = String.valueOf(UtilAll.getPid());
       }
   }

也就是说,在新的版本中,一个应用服务内创建多个DefaultMQProducer就会有多个MQClientInstance实例对应,不会再出现我们前面的报错。

来源:https://juejin.cn/post/7166855806355406879

标签:RocketMQ,NameServer,生产者,消息发送
0
投稿

猜你喜欢

  • C#开发中常用的加密解密方法汇总

    2021-09-06 23:35:49
  • Java 线程池ExecutorService详解及实例代码

    2022-09-02 17:07:24
  • Java如何使用Optional与Stream取代if判空逻辑(JDK8以上)

    2022-01-15 02:52:53
  • Spring JPA联表查询之注解属性详解

    2021-11-04 14:19:04
  • java开源调度如何给xxljob加k8s执行器

    2021-09-17 16:41:50
  • Java8中Optional类型和Kotlin中可空类型的使用对比

    2023-07-29 07:49:21
  • c#与js随机数生成方法

    2023-12-14 12:30:36
  • List集合对象中按照不同属性大小排序的实例

    2023-06-07 14:27:41
  • C# winfrom 模拟ftp文件管理实现代码

    2023-07-15 16:29:48
  • SpringBoot中的Condition包下常用条件依赖注解案例介绍

    2023-05-29 11:42:17
  • Android Camera+SurfaceView自动聚焦防止变形拉伸

    2023-06-18 06:35:54
  • java读取其他服务接口返回的json数据示例代码

    2023-11-10 14:05:29
  • Mybatis联合查询的实现方法

    2021-11-27 23:26:44
  • SpringBoot打Jar包在命令行运行流程详解

    2023-11-24 16:53:59
  • Java输入/输出流体系详解

    2023-03-01 06:37:00
  • java实现纸牌游戏之小猫钓鱼算法

    2021-08-11 22:57:00
  • MyBatis字段名和属性名不一致的解决方法

    2022-12-15 18:15:22
  • Java Mybatis框架Dao层的实现与映射文件以及核心配置文件详解分析

    2021-06-15 16:29:22
  • C++右值引用与move和forward函数的使用详解

    2023-07-05 19:27:33
  • Java 方法的重载与参数传递详解

    2023-10-19 18:50:46
  • asp之家 软件编程 m.aspxhome.com