RocketMQ NameServer 核心源码解析

作者:雨夜之寂 时间:2022-05-08 09:35:41 

带着问题 往下看 (namesrv)

  • 我们在写组件的时候 怎么管理version

  • 如果现在让你 维护一个 各个jar包公用的属性

  • System.exit(-1); 0 -1 -2 各种数都是干什么的,什么时候 用哪个

  • 环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?

  • 我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改

  • 大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?

  • 如果我们想 自己设置使用的log 组件,怎么办

  • 遍历 Field[] 的时候 想跳过 static的属性 怎么写代码?

  • 多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写

  • KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?

  • KVConfigManager 怎么保证的 持久化?

  • 怎么在 并发操作的时候 保证数据的安全性?

  • 方法的参数 使用final 有什么用?

  • 怎么判断的broker 是不是master

  • netty 怎么让nameserver 通知broker 信息的。

  • nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改

  • Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?

  • @ImportantField 干什么的? 什么时候 使用

  • 在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?

  • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

请思考下 写写你的答案 再往下看

nameserver 启动的逻辑

nameserver 功能

  • 管理broker 集群

  • 属于注册中心 业务端 和nameserver 进行连接,获取broker地址

  • 负责维护broker 连接/心跳/监控

nameserver 问题解答

我们在写组件的时候 怎么管理version

一方面是 在父类的 pom.xml 通过 进行 控制版本,然后 业务端通过

<dependencyManagement>
   <dependencies>
       <dependency>
           <groupId>com.xxx</groupId>
           <artifactId>xxx</artifactId>
           <version>4.0.0-SNAPSHOT</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
    </dependencies>
</dependencyManagement>

这是第一个 ,第二个 是 rocketmq 这种 在common 包下面 新建一个 MQVersion 管理版本

这里会有一个问题,那这个版本管理 我用在哪里啊,不用不行么?

  • 为了方便测试,测试的时候 可能因为版本有差异 导致的问题。指定version 就没有这个问题了 2 broker 操作也是,(其实一句话 为了之后的版本兼容)比如

if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
   result.setCode(ResponseCode.SYSTEM_ERROR);
   result.setRemark("the client does not support this feature. version="
       + MQVersion.getVersionDesc(version));
   log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
       RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
   return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
}

如果现在让你 维护一个 各个jar包公用的属性

1 在common包搞一个 公共的实体类 随时用随时取呗,大不了就一个map 然后就put get

2 System.setProperty 底层就是全局 map 进行put get

extends Hashtable<Object,Object>

环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?

设置 rocketmq.home.dir=xxx

我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改

nettyServerConfig.setListenPort(9876);

代码指定 的netty 监听端口,一般情况不改

大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

这是先把commandLine 转变为 Properties 对象,然后调用 namesrvConfig 反射方法 赋值

如果我们想 自己设置使用的log 组件,怎么办

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

遍历 Field[]

遍历 Field[]的时候 想跳过 static的属性 怎么写代码?

(field.getModifiers() & 0x00000008) != 0 如果为true 就是 static false为 非static

多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写

for (Entry<Object, Object> next : from.entrySet()) {
   Object fromObj = next.getValue(), toObj = to.get(next.getKey());
   if (toObj != null && !toObj.equals(fromObj)) {
       log.info("Replace, key: {}, value: {} -> {}", next.getKey(), toObj, fromObj);
   }
   to.put(next.getKey(), fromObj);
}

因为 可能同时操作这个对象 导致 数据不一致 ,所以要加上 读写锁的 写锁

KVConfigManager 有什么作用

KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?

是 kv 配置的管理器,主要是

HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable

以后写map 也要像这种方式 写注释。

//读取的是 ./namesrv/kvConfig.json
kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

行吧 ,现在还不知道 这些kv的作用,先看看怎么存储的,到用的时候 我们接上,先知道 kv 存储在KVConfigManager类 configTable 属性中

putKVConfig 使用的 ReentrantReadWriteLock 的写锁 保证数据一致性,如果map的key 存在了,不会进行覆盖,而是 跳过。

KVConfigManager 持久化

KVConfigManager 怎么保证的 持久化?

执行过 上面的 那些方法,执行 persist ,加读锁,如下图

RocketMQ NameServer 核心源码解析

怎么在 并发操作的时候 保证数据的安全性?

一方面 是 不可变类,其中返回属性的时候 要进行copy 简单来说 就是我通过get 方法出去的 对象 是 copy的对象,而不是 原来的对象,防止 外面通过引用 修改 属性值,把我们的对象 属性 进行修改。

方法的参数 使用final 有什么用?

  • 确保,不会也不能对于参数进行修改,保证了调用发起方数据的安全;

  • 避免在方法体中修改参数,引起不必要的错误

  • 程序员工作不是一个人的工作,你设置为final,别人将来维护的时候一看就知道这个变量不能修改,而不需要去记忆这个是不能变化的值,是常量。这个是代码规范。

broker 是不是master判断

怎么判断的broker 是不是master

//0 == brokerId
MixAll.MASTER_ID == brokerId

这个其实可以 抽出来一个公共的方法, 方便之后的修改

netty 怎么让nameserver 通知broker 信息的。

netty 保存的 channel 到时候用了 直接从map 获取 然后发送消息

nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改

BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2

static final 写死的,如果 最后一次心跳时间 + 2分钟 都小于System.currentTimeMillis() 执行删除操作

  • 关闭 netty channel

  • brokerLiveTable 删除对应的实例

这是一个定时任务 从项目 启动5s之后 ,每10s执行一次,说明 对broker的感知 会有些许 延迟。(最大也就 20s,一般10s以内感知)

Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?

当程序正常退出,系统调用 System.exit方法或虚拟机被关闭时才会执行添加的shutdownHook线程。其中shutdownHook是一个已初始化但并不有启动的线程,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以可通过这些钩子在jvm关闭的时候进行内存清理、资源回收等工作。

@ImportantField

@ImportantField 干什么的? 什么时候 使用

最后的true 代表 是否只打印关键属性,写@ImportantField的 就一定会打,没有这个注解的就不打印了

MixAll.printObjectProperties(console, brokerConfig, true);

在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?

isolateLogEnable 改为 true

if (brokerConfig.isIsolateLogEnable()) {
   System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
}
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
   System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}

broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

无论是 -p 还是 -m 都是print 输出,本来就是希望打印日志,然后进程停止。

opt = new Option("p", "printConfigItem", false, "Print all config item");
opt = new Option("m", "printImportantConfig", false, "Print important config item");

来源:https://juejin.cn/post/7143448812911067167

标签:RocketMQ,NameServer,源码解析
0
投稿

猜你喜欢

  • C#实现图像锐化的方法

    2023-07-23 01:08:21
  • 关于idea的gitignore文件编写及解决ignore文件不生效问题

    2023-02-28 02:04:13
  • Java中如何快速构建项目脚手架的实现

    2023-10-30 03:37:50
  • spring cloud gateway 限流的实现与原理

    2023-04-10 16:47:56
  • 微信支付仅能成功调用一次问题的解决方法(Android)

    2021-07-27 10:40:17
  • Java找不到或无法加载主类及编码错误问题的解决方案

    2021-08-01 03:43:10
  • Java父线程(或是主线程)等待所有子线程退出的实例

    2022-10-23 16:25:50
  • C#实现插入排序

    2023-07-02 13:16:58
  • Java实现新建有返回值的线程的示例详解

    2022-02-13 17:09:33
  • unity实现虚拟摇杆控制Virtual Joystick

    2022-09-26 11:56:28
  • java求100以内的素数示例分享

    2021-06-30 21:34:39
  • java身份证验证代码实现

    2023-12-09 16:10:50
  • Android图片加载利器之Picasso扩展功能

    2023-05-14 15:12:33
  • AndroidStudio Gradle基于友盟的多渠道打包方法

    2022-02-24 22:04:43
  • Java 反射机制详解及实例代码

    2023-07-13 15:22:29
  • C#最简单的字符串加密解密方法

    2022-10-30 12:46:36
  • Springboot自带定时任务实现动态配置Cron参数方式

    2023-11-10 10:21:31
  • Winform中GridView分组排序功能实现方法

    2022-01-29 16:20:03
  • 关于Android HTML5 audio autoplay无效问题的解决方案

    2021-09-22 04:10:30
  • Android studio so库找不到问题解决办法

    2023-10-28 02:20:49
  • asp之家 软件编程 m.aspxhome.com