RocketMQ生产者如何规避故障Broker方式详解

作者:梦想实现家_Z 时间:2022-06-23 04:36:10 

前言

在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢?

收集故障Broker

我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找:

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:

// 发送成功的话,isolation传false,失败isolation传true
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
   if (this.sendLatencyFaultEnable) {
       long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
       this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
   }
}
private long computeNotAvailableDuration(final long currentLatency) {
   for (int i = latencyMax.length - 1; i >= 0; i--) {
       if (currentLatency >= latencyMax[i])
           return this.notAvailableDuration[i];
   }
   return 0;
}
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

如果Broker产生故障,那么会创建一个FaultItem对象记录故障的Broker,并把结果放进故障规避表faultItemTable中,数据格式如下:

"broker-a": {
 // broker名称
 "name": "broker-a",
 "currentLatency": 发送消息消耗的时间,毫秒值之差,
 // 解除规避的时间,绝对时间
 "startTimestamp": 时间戳毫秒值
},
"broker-b": {
 // broker名称
 "name": "broker-b",
 "currentLatency": 发送消息消耗的时间,毫秒值之差,
 // 解除规避的时间,绝对时间
 "startTimestamp": 时间戳毫秒值
}

发送成功的Broker设置的故障规避时间为0,发送失败的Broker将被设置为规避30秒;

选择Broker

MQFaultStrategy.selectOneMessageQueue()方法中,我们分三部分来分析如何选择Broker。

  • 轮询选择一个可用的Broker

// 轮询的基本套路,一个自增变量
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    // 通过对队列数量取模,获取选定的Broker所在的位置
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    // 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
        return mq;
}

1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;

2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;

  • 选择一个相对延迟低的Broker

// 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判断该Broker是否允许写消息
if (writeQueueNums > 0) {
   final MessageQueue mq = tpInfo.selectOneMessageQueue();
   if (notBestBroker != null) {
       mq.setBrokerName(notBestBroker);
       mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
   }
   // 返回选中的Broker
   return mq;
}

1.从规避列表中找到延时比较低的Broker;

2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;

  • 默认的选择

return tpInfo.selectOneMessageQueue();

最后直接轮询一个Broker直接返回:

public MessageQueue selectOneMessageQueue() {
       int index = this.sendWhichQueue.incrementAndGet();
       int pos = Math.abs(index) % this.messageQueueList.size();
       if (pos < 0)
           pos = 0;
       return this.messageQueueList.get(pos);
   }

该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;

小结

RocketMQ通过设置故障规避表的方式,把所有的Broker的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:

1.优先选择不在规避时间范围内的Broker

2.如果所有Broker都在规避时间内,优先选择延迟低的Broker

3.如果依然没有选中合适的Broker,那么就直接挑一个Broker来用;

来源:https://juejin.cn/post/7169958001225400333

标签:RocketMQ,生产者,规避故障,Broker
0
投稿

猜你喜欢

  • java中使用数组进行模拟加密的方法

    2023-11-18 15:37:39
  • java 静态代理 动态代理深入学习

    2022-07-24 21:46:25
  • Java经典面试题汇总--多线程

    2023-07-13 01:17:48
  • 关于SpringGateway调用服务 接受不到参数问题

    2023-08-31 13:05:00
  • Java编程接口回调一般用法代码解析

    2023-11-11 06:55:11
  • Spring Security 强制退出指定用户的方法

    2022-10-04 18:13:04
  • ShardingSphere解析SQL示例详解

    2023-11-23 13:57:55
  • Java 用Prometheus搭建实时监控系统过程详解

    2023-09-06 12:07:40
  • java调用外部程序的方法及代码演示

    2023-11-13 22:42:55
  • Java线程池ThreadPoolExecutor原理及使用实例

    2022-04-30 05:53:00
  • Spring Data JPA调用存储过程实例代码

    2023-11-25 00:13:15
  • 基于Mybatis-Plus的CRUD的实现

    2023-09-10 14:38:45
  • Spring Boot日志技术logback原理及配置解析

    2022-09-14 10:51:57
  • 基于Java信号量解决死锁过程解析

    2023-05-13 22:23:02
  • Mybatis-Plus注入SQL原理分析

    2022-11-09 21:17:22
  • Java上传文件错误java.lang.NoSuchMethodException的解决办法

    2023-11-10 13:15:43
  • ServletWebServerApplicationContext创建Web容器Tomcat示例

    2023-10-12 12:28:33
  • ConcurrentHashMap是如何实现线程安全的你知道吗

    2023-11-28 23:14:25
  • Spring自动注入失败的解决方法

    2022-08-13 03:41:31
  • request如何获取body的json数据

    2021-11-16 20:30:31
  • asp之家 软件编程 m.aspxhome.com