elasticsearch数据信息索引操作action support示例分析

作者:zziawan 时间:2022-03-18 02:09:07 

抽象类分析

Action这一部分主要是数据(索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotThread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

doExecute方法

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doExecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doExecute方法调用该方法进行。TransportBroadcastOperationAction就属于前者,它实现了内部操作器AsyncBroadcastAction。TransportCountAction继承于它,它doExecute方法如下所示:

@Override
   protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
       request.nowInMillis = System.currentTimeMillis();
       super.doExecute(request, listener);
   }

调用父类的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的实现如下所示:

@Override
   protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {
       new AsyncBroadcastAction(request, listener).start();
   }

可以看到它初始化了AsyncBroadcastAction并启动。AsyncBroadcastAction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而IndexExistAction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

public void start() {
//没有shards
           if (shardsIts.size() == 0) {
               // no shards
               try {
                   listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
               } catch (Throwable e) {
                   listener.onFailure(e);
               }
               return;
           }
           request.beforeStart();
           // count the local operations, and perform the non local ones
           int shardIndex = -1;
//遍历对每个shards进行操作
           for (final ShardIterator shardIt : shardsIts) {
               shardIndex++;
               final ShardRouting shard = shardIt.nextOrNull();
               if (shard != null) {
                   performOperation(shardIt, shard, shardIndex);
               } else {
                   // really, no shards active in this group
                   onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
               }
           }
       }

start方法就是遍历所有shards,如果shard存在则执行performOperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardOperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onOperation操作,遍历该shard的其它副本看能否找到可以操作的shard。

performOperation代码

如下所示:

protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
           if (shard == null) {//shard 为null抛出异常
               // no more active shards... (we should not really get here, just safety)
               onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
           } else {
               try {
                   final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                   if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地执行shardOperation方法,并通过onOperation方法封装结果
                       threadPool.executor(executor).execute(new Runnable() {
                           @Override
                           public void run() {
                               try {
                                   onOperation(shard, shardIndex, shardOperation(shardRequest));
                               } catch (Throwable e) {
                                   onOperation(shard, shardIt, shardIndex, e);
                               }
                           }
                       });
                   } else {//不是本地shard,发送到对应节点。
                       DiscoveryNode node = nodes.get(shard.currentNodeId());
                       if (node == null) {
                           // no node connected, act as failure
                           onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                       } else {
                           transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {
                               @Override
                               public ShardResponse newInstance() {
                                   return newShardResponse();
                               }
                               @Override
                               public String executor() {
                                   return ThreadPool.Names.SAME;
                               }
                               @Override
                               public void handleResponse(ShardResponse response) {
                                   onOperation(shard, shardIndex, response);
                               }
                               @Override
                               public void handleException(TransportException e) {
                                   onOperation(shard, shardIt, shardIndex, e);
                               }
                           });
                       }
                   }
               } catch (Throwable e) {
                   onOperation(shard, shardIt, shardIndex, e);
               }
           }
       }

方法shardOperation在countTransportAction的实现如下所示:

@Override
   protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
       IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
       IndexShard indexShard = indexService.shardSafe(request.shardId().id());
//构造查询context
       SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
       SearchContext context = new DefaultSearchContext(0,
               new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
               shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
               scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
       SearchContext.setCurrent(context);
       try {
           // TODO: min score should move to be "null" as a value that is not initialized...
           if (request.minScore() != -1) {
               context.minimumScore(request.minScore());
           }
           BytesReference source = request.querySource();
           if (source != null &amp;&amp; source.length() &gt; 0) {
               try {
                   QueryParseContext.setTypes(request.types());
                   context.parsedQuery(indexService.queryParserService().parseQuery(source));
               } finally {
                   QueryParseContext.removeTypes();
               }
           }
           final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
           boolean terminatedEarly = false;
           context.preProcess();
           try {
               long count;
               if (hasTerminateAfterCount) {//调用lucene的封装接口执行查询并返回结果
                   final Lucene.EarlyTerminatingCollector countCollector =
                           Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
                   terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
                   count = countCollector.count();
               } else {
                   count = Lucene.count(context.searcher(), context.query());
               }
               return new ShardCountResponse(request.shardId(), count, terminatedEarly);
           } catch (Exception e) {
               throw new QueryPhaseExecutionException(context, "failed to execute count", e);
           }
       } finally {
           // this will also release the index searcher
           context.close();
           SearchContext.removeCurrent();
       }
   }

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

master的相关操作

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考TransportMasterNodeOperationAction原码。

来源:https://www.cnblogs.com/zziawanblog/p/6671286.html

标签:elasticsearch,action,support,信息索引
0
投稿

猜你喜欢

  • Java I/O中I/O流的典型使用方式详解

    2023-07-08 21:07:51
  • C# 添加对System.Configuration.dll文件的引用操作

    2022-03-05 22:20:31
  • 解决@PathVariable对于特殊字符截断的问题

    2021-10-10 08:19:40
  • java多线程并发中使用Lockers类将多线程共享资源锁定

    2021-11-14 11:08:37
  • java8学习教程之函数引用的使用方法

    2023-08-28 12:03:19
  • 解决Eclipse创建android项目无法正常预览布局文件问题的方法

    2023-07-31 09:51:12
  • C# 定时器定时更新的简单实例

    2023-01-08 12:45:50
  • 如何用Java Stream写出既高雅又装*的代码

    2022-04-13 23:23:58
  • 浅谈Java生命周期管理机制

    2022-02-21 19:07:47
  • 通过IDEA快速定位和排除依赖冲突问题

    2021-06-07 02:01:16
  • SpringBoot解决BigDecimal传到前端后精度丢失问题

    2021-12-13 14:44:39
  • Netty分布式高性能工具类同线程下回收对象解析

    2023-05-24 22:00:55
  • 一文详解Java抽象类到底有多抽象

    2023-08-27 01:41:26
  • Java实现将图片上传到webapp路径下 路径获取方式

    2023-07-10 12:44:13
  • Spring+SpringMVC+MyBatis深入学习及搭建(一)之MyBatis的基础知识

    2021-09-27 15:12:59
  • Spring Cache手动清理Redis缓存

    2023-11-29 02:49:52
  • springboot如何读取自定义配置项

    2021-06-18 10:44:53
  • Java @GlobalLock注解详细分析讲解

    2023-06-03 03:55:53
  • java map转Multipart/form-data类型body实例

    2023-04-19 13:16:18
  • Spring Cloud Gateway替代zuul作为API网关的方法

    2023-05-03 07:19:58
  • asp之家 软件编程 m.aspxhome.com