RocketMQ消息生产者是如何选择Broker示例详解
作者:梦想实现家_Z 时间:2023-11-10 21:45:49
前言
在RocketMQ
中为,我们创建消息生产者时,只需要设置NameServer
地址,消息就能正确地发送到对应的Broker
中,那么RocketMQ
消息生产者是如何找到Broker
的呢?如果有多个Broker
实例,那么消息发送是如何选择发送到哪个Broker
的呢?
从NameServer查询Topic信息
通过Debug消息发送send()
方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()
这个方法,并且我们找到了最关键的Topic信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
这个方法就是通过topic
从NameServer
拉出对应的Broker
信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1.一开始的话,是从当前缓存中找Topic
信息,第一次肯定是找不到的;
2.找不到Topic
信息,那么就调用updateTopicRouteInfoFromNameServer(topic)
从NameServer
拉对应的信息,如果拉到了就更新到缓存中;
3.如果依然找不到Topic
信息,说明没有任何Broker
上面是有这个Topic
的;但是我们还要拉开启了自动创建Topic
配置的Broker
信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)
实现;
生产者客户端会从两个地方获取Broker
信息,第一个就是从内存缓存中获取,第二个就是从NameServer
中获取。从NameServer
中分两次获取,一次是获取存在的Topic
对应的Broker
信息,第二次是获取还没有创建出来的Topic
对应的Broker
信息;
如何选择Broker
当客户端拿到了Topic
对应的Broker
信息后,它是如何选择目标Broker
的呢?继续向下看,我们找到了关键代码:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
......
1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;
2.选择Broker
的关键代码就在selectOneMessageQueue()
方法中,通过前面拿到的topicPublishInfo
作为参数,lastBrokerName
作为额外的考虑参数;
追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()
中:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1.如果开启了延迟故障规避,那么执行规避策略;
1.1:轮询找一个
Broker
,该Broker
要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);1.2:如果所有的
Broker
都没有度过规避期,那么从比较好的那一部分Broker
里面找一个出来;1.3:如果依然没有找到合适的
Broker
,那么就随机选一个Broker
;
2.否则就随机选一个Broker
;
下面我们来看一下随机发送的策略是怎么实现的:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker
,其实就是轮询方式;
2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker
服务,也是为了让Broker
服务负载均衡;
3.如果没有避开上一次的Broker
,那么再向后找另一个Broker
;除非只有一个Broker
服务,否则会尽可能避开上次发送的Broker
;
小结
通过源码分析,我们已经知道了生产者是如何选择目标Broker
的了:
1.第一次发消息,通过轮询的方式选择Broker
;
2.后续发消息会规避上次的Broker
,同样采用轮询的方式选择Broker
;
3.在消息发送过程中,存在一个Broker
规避列表,用户可以通过setSendLatencyFaultEnable(true)
开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker
,如果所有的Broker
都在规避列表中,那么会选择一个相对比较好的Broker
来用;
来源:https://juejin.cn/post/7169539055548858398
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
java基础知识之FileInputStream流的使用
![](https://img.aspxhome.com/file/2023/6/104036_0s.jpg)
C# 计算DataTime的4种时间差的方法(相差天数、相差小时、相差分钟、相差秒)
Java多线程编程详细解释
一篇超详细的SpringBoot整合MybatisPlus的文章
![](https://img.aspxhome.com/file/2023/9/116119_0s.png)
Android RecyclerView添加搜索过滤器的示例代码
![](https://img.aspxhome.com/file/2023/8/108278_0s.gif)
Java面试题冲刺第十六天--消息队列
![](https://img.aspxhome.com/file/2023/0/69090_0s.png)
Java和C#输入输出流的方法(详解)
Android检测手机多点触摸点数的方法
![](https://img.aspxhome.com/file/2023/1/113271_0s.jpg)
Java Controller实现参数验证与统一异常处理流程详细讲解
Java实现顺序表和链表结构
![](https://img.aspxhome.com/file/2023/2/62832_0s.jpg)
java基础javeSE程序逻辑控制语法
![](https://img.aspxhome.com/file/2023/4/79914_0s.png)
Java中JFrame实现无边框无标题方法
![](https://img.aspxhome.com/file/2023/4/96074_0s.png)
浅析Disruptor高性能线程消息传递并发框架
![](https://img.aspxhome.com/file/2023/8/74988_0s.png)
Java 的可变参数方法详述
![](https://img.aspxhome.com/file/2023/8/110908_0s.png)
WPF实现页面的切换的示例代码
![](https://img.aspxhome.com/file/2023/0/102150_0s.jpg)
SpringBoot Redis用注释实现接口限流详解
c#对象反序列化与对象序列化示例详解
![](https://img.aspxhome.com/file/2023/1/106471_0s.jpg)
Android自定义控件实现优雅的广告轮播图
![](https://img.aspxhome.com/file/2023/0/139240_0s.gif)
java数据结构-堆实现优先队列
![](https://img.aspxhome.com/file/2023/5/59335_0s.jpg)
Java SpringCache+Redis缓存数据详解
![](https://img.aspxhome.com/file/2023/1/60131_0s.jpg)