elasticsearch集群发现zendiscovery的Ping机制分析

作者:zziawan 时间:2021-05-25 05:40:55 

 zenDiscovery实现机制

ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群。zenDiscovery实现了两种ping机制:广播与单播。本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫。

广播的过程

首先看一下广播(MulticastZenPing),广播的原理很简单,节点启动后向网络发送广播信息,任何收到的节点只要集群名字相同都应该对此广播信息作出回应。这样该节点就获取了集群的相关信息。它定义了一个action:"internal:discovery/zen/multicast"和广播的信息头:INTERNAL_HEADER 。之前说过NettyTransport是cluster通信的基础,但是广播却没有使它。它使用了java的MulticastSocket。这里简单的介绍一下MulticastSocket的使用。它是一个UDP 机制的socket,用来进行多个数据包的广播。它可以帮到一个ip形成一个group,任何MulticastSocket都可以join进来,组内的socket发送的信息会被订阅了改组的所有机器接收到。elasticsearch对其进行了封装形成了MulticastChannel,有兴趣可以参考相关源码。 

首先看一下MulticastZenPing的几个辅助内部类:

elasticsearch集群发现zendiscovery的Ping机制分析

它总共定义了4个内部类,这些内部类和它一起完成广播功能。FinalizingPingCollection是一pingresponse的容器,所有的响应都用它来存储。MulticastPingResponseRequestHandler它是response处理类,类似于之前所说的nettytransportHandler,它虽然使用的不是netty,但是它也定义了一个messageReceived的方法,当收到请求时直接返回一个response。

MulticastPingResponse就不用细说了,它就是一个响应类。最后要着重说一下Receiver类,因为广播并不是使用NettyTransport,因此对于消息处理逻辑都在Receiver中。在初始化MulticastZenPing时会将receiver注册进去。

protected void doStart() throws ElasticsearchException {
       try {
           ....
           multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                   new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                   new Receiver());//将receiver注册到channel中
       } catch (Throwable t) {
         ....
       }
   }

Receiver类基础了Listener,实现了3个方法,消息经过onMessage方法区分,如果是内部ping则使用handleNodePingRequest方法处理,否则使用handleExternalPingRequest处理,区分方法很简单,就是读取信息都看它是否符合所定义的INTERNAL_HEADER 信息头。

nodeping处理代码

private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
          ....
           final DiscoveryNodes discoveryNodes = contextProvider.nodes();
           final DiscoveryNode requestingNode = requestingNodeX;
           if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
               // 自身发出的ping,忽略
               return;
           }
//只接受本集群ping
           if (!requestClusterName.equals(clusterName)) {
         ...return;
           }
           // 两个client间不需要ping
           if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
           }
//新建一个response
           final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
           multicastPingResponse.id = id;
           multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//无法连接的情况
           if (!transportService.nodeConnected(requestingNode)) {
               // do the connect and send on a thread pool
               threadPool.generic().execute(new Runnable() {
                   @Override
                   public void run() {
                       // connect to the node if possible
                       try {
                           transportService.connectToNode(requestingNode);
                           transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                               @Override
                               public void handleException(TransportException exp) {
                                   logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                               }
                           });
                       } catch (Exception e) {
                           if (lifecycle.started()) {
                               logger.warn("failed to connect to requesting node {}", e, requestingNode);
                           }
                       }
                   }
               });
           } else {
               transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                   @Override
                   public void handleException(TransportException exp) {
                       if (lifecycle.started()) {
                           logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                       }
                   }
               });
           }
       }
   }

另外的一个方法是处理外部ping信息,处理过程是返回cluster的信息(这种外部ping的具体作用没有研究不是太清楚)。以上是响应MulticastZenPing的过程,收到其它节点的响应信息后它会把本节点及集群的master节点相关信息返回给广播节点。这样广播节点就获知了集群的相关信息。在MulticastZenPing类中还有一个类 MulticastPingResponseRequestHandler,它的作用是广播节点对其它节点对广播信息响应的回应,广播节点的第二次发送信息的过程。它跟其它TransportRequestHandler一样它有messageReceived方法,在启动时注册到transportserver中,只处理一类action:"internal:discovery/zen/multicast"。

ping请求的发送策略

代码如下:

public void ping(final PingListener listener, final TimeValue timeout) {
      ....
//产生一个id
       final int id = pingIdGenerator.incrementAndGet();
       try {
           receivedResponses.put(id, new PingCollection());
           sendPingRequest(id);//第一次发送ping请求
           // 等待时间的1/2后再次发送一个请求
           threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
               @Override
               public void onFailure(Throwable t) {
                   logger.warn("[{}] failed to send second ping request", t, id);
                   finalizePingCycle(id, listener);
               }
               @Override
               public void doRun() {
                   sendPingRequest(id);
//再过1/2时间再次发送一个请求
                   threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                       @Override
                       public void onFailure(Throwable t) {
                           logger.warn("[{}] failed to send third ping request", t, id);
                           finalizePingCycle(id, listener);
                       }
                       @Override
                       public void doRun() {
                           // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                           PingCollection collection = receivedResponses.get(id);
                           FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                           receivedResponses.put(id, finalizingPingCollection);
                           logger.trace("[{}] sending last pings", id);
                           sendPingRequest(id);
//最后一次发送请求,超时的1/4后
                           threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                               @Override
                               public void onFailure(Throwable t) {
                                   logger.warn("[{}] failed to finalize ping", t, id);
                               }
                               @Override
                               protected void doRun() throws Exception {
                                   finalizePingCycle(id, listener);
                               }
                           });
                       }
                   });
               }
           });
       } catch (Exception e) {
           logger.warn("failed to ping", e);
           finalizePingCycle(id, listener);
       }
   }

发送过程主要是调用sendPingRequest(id)方法,在该方法中会将id,信息头,版本,本地节点信息一起写入到BytesStreamOutput中然后将其进行广播,这个广播信息会被其它机器上的Receiver接收并处理,并且响应该ping请求。另外一个需要关注的是以上加说明的部分,它通过链时的定期发送请求,在等待时间内可能会发出4次请求,这种发送方式会造成大量的ping请求重复,幸好ping的资源消耗小,但是好处是可以尽可能保证在timeout这个时间段内集群的新增节点都能收到这个ping信息。在单播中也采用了该策略。

来源:https://www.cnblogs.com/zziawanblog/p/6551549.html

标签:elasticsearch,zendiscovery,Ping,集群发现
0
投稿

猜你喜欢

  • Android 防止多次重复点击的三种方法的示例

    2022-02-04 17:48:42
  • Log4j.properties配置及其使用

    2023-05-14 21:06:18
  • 一篇文章弄懂Java和Kotlin的泛型难点

    2022-11-19 11:37:19
  • Java JVM字节码指令集总结整理与介绍

    2021-09-18 17:10:20
  • c#中的浮点型转整形的舍取 四舍五入和银行家舍入实现代码

    2023-01-10 02:04:41
  • C#简单读写txt文件的方法

    2023-01-17 06:30:27
  • C#/VB.NET 给Excel添加、删除数字签名的方法

    2022-03-21 08:29:07
  • Ajax 验证用户输入的验证码是否与随机生成的一致

    2022-06-29 00:43:32
  • Android可配置透明度的水印

    2021-06-06 14:07:42
  • 解决Android webview设置cookie和cookie丢失的问题

    2021-09-12 21:55:52
  • 利用java实现单词倒序排列

    2023-07-01 04:30:51
  • Java中的装箱和拆箱深入理解

    2023-02-22 08:18:10
  • C#获取指定PDF文件页数的方法

    2021-10-10 17:15:43
  • C#命令模式用法实例

    2021-10-21 12:46:02
  • 如何使用Spring自定义Xml标签

    2022-11-14 19:01:08
  • Java C++题解leetcode904水果成篮

    2023-12-08 21:06:09
  • java生成excel并导出到对应位置的方式

    2021-12-30 16:05:14
  • java调用回调机制详解

    2023-11-14 21:53:21
  • Android Gradle模块依赖替换使用技巧

    2021-07-03 20:22:53
  • Android实现语音合成与识别功能

    2023-10-01 01:41:00
  • asp之家 软件编程 m.aspxhome.com