RocketMQ生产者调用start发送消息原理示例

作者:梦想实现家_Z 时间:2022-07-05 20:13:04 

RocketMQ发送消息

我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("42.192.50.8:9876");
try {
   producer.start();
   producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
   e.printStackTrace();
} finally {
   producer.shutdown();
}

上述代码中,在消息发送之前调用了start()方法,如果不调用start()方法,直接发送消息,那么会出现以下报错:

RocketMQ生产者调用start发送消息原理示例

报错消息里面很明显地告知我们,目前这个DefaultMQProducer状态没有准备好,还不能发送消息。为了一探究竟,我们得去看看start()里面究竟做了什么操作呢?

start()里面究竟做了什么操作

我们根据源码一路走下来,可以追踪到DefaultMQProducerImpl.start(final boolean startFactory)这个方法:

public void start(final boolean startFactory) throws MQClientException {
       switch (this.serviceState) {
           case CREATE_JUST:
               this.serviceState = ServiceState.START_FAILED;
               this.checkConfig();
               if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                   this.defaultMQProducer.changeInstanceNameToPID();
               }
               // 创建MQClientInstance
               this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
               // 注册Producer到MQClientInstance中
               boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
               if (!registerOK) {
                   this.serviceState = ServiceState.CREATE_JUST;
                   throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                       + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                       null);
               }
               this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
               // 启动MQClientInstance实例
               if (startFactory) {
                   mQClientFactory.start();
               }
               log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                   this.defaultMQProducer.isSendMessageWithVIPChannel());
               this.serviceState = ServiceState.RUNNING;
               break;
           case RUNNING:
           case START_FAILED:
           case SHUTDOWN_ALREADY:
               throw new MQClientException("The producer service state not OK, maybe started once, "
                   + this.serviceState
                   + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                   null);
           default:
               break;
       }

上述代码主要做了以下几点:

1.创建MQClientInstance实例;

2.注册Producer到MQClientInstance实例中;

3.启动MQClientInstance实例;

MQClientInstance实例并不是每次都会创建的,它创建出来也会缓存的MQClientManager中,不过根据源码来看的话,每次创建Producer都会对应创建一个新的MQClientInstance实例,所以一般情况下不建议一个应用服务中重复创建Producer

最终start()方法的关键实现逻辑还是需要进入MQClientInstance.start()中:

public void start() throws MQClientException {
       synchronized (this) {
           switch (this.serviceState) {
               case CREATE_JUST:
                   this.serviceState = ServiceState.START_FAILED;
                   // 如果namesrv地址为null,那么就需要自己找namesrv地址
                   if (null == this.clientConfig.getNamesrvAddr()) {
                       this.mQClientAPIImpl.fetchNameServerAddr();
                   }
                   // 开启一个请求响应渠道,没猜错的话,应该是netty实现的
                   this.mQClientAPIImpl.start();
                   // 开启定时任务
                   this.startScheduledTask();
                   // 开启拉消息服务
                   this.pullMessageService.start();
                   // 开启负载均衡服务
                   this.rebalanceService.start();
                   // 再开启一个默认生产者,这个生产者不需要启动MQClientInstance实例
                   this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                   log.info("the client factory [{}] start OK", this.clientId);
                   this.serviceState = ServiceState.RUNNING;
                   break;
               case START_FAILED:
                   throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
               default:
                   break;
           }
       }
   }

看样子,这才是start()方法真正要做的事情:

1.找namesrv地址,应该是后面需要使用namesrv地址查询对应的broker

2.开启Netty客户端的初始化,包括与namesrv建立信道;另外开启两个定时任务,一个清除列表中过期的请求,第二个就是筛选可用的namesrv服务;

3.开启一些定时任务;包括如果没有设置namesrv地址的话,会从指定站点拉namesrv地址;清除下线broker并发送心跳给所有的broker等工作;

4.因为当前是生产者,所以pullMessageService很快就结束;

5.生产者不需要做负载均衡,所以rebalanceService很快也结束;

6.给默认创建的生产者执行一下start()方法,其实啥也没做;

上述大多数任务都是给消费者使用的,作为生产者,唯一起作用的就是前三步,查找namesrv地址、第二步与namesrv建立通信以及第三步对broker的一些定时清理工作;不过没有发生消息之前,是不会从远程获取任何数据的。所以综上所述,start()方法里面只做了以下两件事情:

1.与namesrv建立通信渠道,它甚至都没有从namesrv获取任何数据;

2.启动一些定时任务,包括清理下线的broker;

小结

虽然在生产者中,start()方法里面真正做的事情比较少,但是却是非常有必要的。发送消息之前,我们没有使用start()方法,导致消息发送失败,是因为生产者与namesrv之间的通信渠道没有建立。

来源:https://juejin.cn/post/7168735153185882148

标签:RocketMQ,start,生产者,发送消息
0
投稿

猜你喜欢

  • android Service基础(启动服务与绑定服务)

    2023-05-07 12:31:34
  • Java哈希表和有序表实例代码讲解

    2023-05-28 11:29:29
  • Java与Kotlin互调原理讲解

    2023-08-19 00:07:41
  • SpringBoot深入分析讲解监听器模式上

    2022-06-25 21:04:04
  • C#泛型集合类System.Collections.Generic

    2023-02-24 19:21:18
  • Android语音声波控件 Android条形波控件

    2023-10-29 02:03:05
  • Java中ArrayList集合的常用方法大全

    2023-09-01 15:23:30
  • 老生常谈反射之Class类的使用(必看篇)

    2022-07-20 16:32:30
  • 快速定位Java 内存OOM的问题

    2022-05-26 00:19:38
  • C#开发Winform控件之打开文件对话框OpenFileDialog类

    2023-04-19 10:53:16
  • sprng和struts有什么区别?

    2022-03-02 02:13:35
  • 字符串替换Replace仅替换第一个字符串匹配项

    2021-10-02 17:36:56
  • Spring+SpringMVC+MyBatis深入学习及搭建(三)之MyBatis全局配置文件解析

    2022-03-21 05:25:23
  • Android 中Manifest.xml文件详解

    2023-11-06 21:41:58
  • c# JSON返回格式的WEB SERVICE

    2022-04-16 01:43:31
  • Opencv实现傅里叶变换

    2023-08-24 18:53:29
  • Spring循环依赖代码演示及解决方案

    2022-05-17 01:56:08
  • java Stream流常见操作方法(反射,类加载器,类加载,反射)

    2022-03-24 06:54:56
  • Java上传文件错误java.lang.NoSuchMethodException的解决办法

    2023-11-10 13:15:43
  • java如何将一个float型数的整数部分和小数分别输出显示

    2022-08-17 16:50:26
  • asp之家 软件编程 m.aspxhome.com