返回顶部
首页 > 资讯 > 后端开发 > Python >elasticsearch数据信息索引操作action support示例分析
  • 165
分享到

elasticsearch数据信息索引操作action support示例分析

2024-04-02 19:04:59 165人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

目录抽象类分析doExecute方法perfORMOperation代码master的相关操作总结抽象类分析 Action这一部分主要是数据(索引)的操作和部分集群信息操作。&nbs

抽象类分析

Action这一部分主要是数据(索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotThread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

doExecute方法

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doExecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doExecute方法调用该方法进行。TransportBroadcastOperationAction就属于前者,它实现了内部操作器AsyncBroadcastAction。TransportCountAction继承于它,它doExecute方法如下所示:

@Override
    protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
        request.nowInMillis = System.currentTimeMillis();
        super.doExecute(request, listener);
    }

调用父类的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的实现如下所示:

@Override
    protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {
        new AsyncBroadcastAction(request, listener).start();
    }

可以看到它初始化了AsyncBroadcastAction并启动。AsyncBroadcastAction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而IndexExistAction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

public void start() {
      //没有shards
            if (shardsIts.size() == 0) {
                // no shards
                try {
                    listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
                } catch (Throwable e) {
                    listener.onFailure(e);
                }
                return;
            }
            request.beforeStart();
            // count the local operations, and perform the non local ones
            int shardIndex = -1;
       //遍历对每个shards进行操作
            for (final ShardIterator shardIt : shardsIts) {
                shardIndex++;
                final ShardRouting shard = shardIt.nextOrNull();
                if (shard != null) {
                    performOperation(shardIt, shard, shardIndex);
                } else {
                    // really, no shards active in this group
                    onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                }
            }
        }

start方法就是遍历所有shards,如果shard存在则执行performOperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardOperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onOperation操作,遍历该shard的其它副本看能否找到可以操作的shard。

performOperation代码

如下所示:

protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
            if (shard == null) {//shard 为null抛出异常
                // no more active shards... (we should not really get here, just safety)
                onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
            } else {
                try {
                    final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                    if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地执行shardOperation方法,并通过onOperation方法封装结果
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    onOperation(shard, shardIndex, shardOperation(shardRequest));
                                } catch (Throwable e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            }
                        });
                    } else {//不是本地shard,发送到对应节点。
                        DiscoveryNode node = nodes.get(shard.currentNodeId());
                        if (node == null) {
                            // no node connected, act as failure
                            onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                        } else {
                            transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {
                                @Override
                                public ShardResponse newInstance() {
                                    return newShardResponse();
                                }
                                @Override
                                public String executor() {
                                    return ThreadPool.Names.SAME;
                                }
                                @Override
                                public void handleResponse(ShardResponse response) {
                                    onOperation(shard, shardIndex, response);
                                }
                                @Override
                                public void handleException(TransportException e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            });
                        }
                    }
                } catch (Throwable e) {
                    onOperation(shard, shardIt, shardIndex, e);
                }
            }
        }

方法shardOperation在countTransportAction的实现如下所示:

@Override
    protected ShardCountResponse shardOperation(ShardCountRequest request) throws elasticsearchException {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());
    //构造查询context
        SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
        SearchContext context = new DefaultSearchContext(0,
                new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
                shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
                scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
        SearchContext.setCurrent(context);
        try {
            // TODO: min score should move to be "null" as a value that is not initialized...
            if (request.minScore() != -1) {
                context.minimumScore(request.minScore());
            }
            BytesReference source = request.querySource();
            if (source != null &amp;&amp; source.length() &gt; 0) {
                try {
                    QueryParseContext.setTypes(request.types());
                    context.parsedQuery(indexService.queryParserService().parseQuery(source));
                } finally {
                    QueryParseContext.removeTypes();
                }
            }
            final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
            boolean terminatedEarly = false;
            context.preProcess();
            try {
                long count;
                if (hasTerminateAfterCount) {//调用lucene的封装接口执行查询并返回结果
                    final Lucene.EarlyTerminatinGCollector countCollector =
                            Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
                    terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
                    count = countCollector.count();
                } else {
                    count = Lucene.count(context.searcher(), context.query());
                }
                return new ShardCountResponse(request.shardId(), count, terminatedEarly);
            } catch (Exception e) {
                throw new QueryPhaseExecutionException(context, "failed to execute count", e);
            }
        } finally {
            // this will also release the index searcher
            context.close();
            SearchContext.removeCurrent();
        }
    }

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

master的相关操作

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考TransportMasterNodeOperationAction原码。

总结

本篇概括说了support action,并以countTransportAction为例说明了support Action中的异步操作器实现,最后简单的分析了master的同步操作。因为这里涉及到很多action不可能一一分析,有兴趣可以参考对应的代码。而且这里有以下index部分的内容,所以没有更深入的分析。在后面分析完index的相关功能后,会挑出几个重要的action做详细分析。

以上就是elasticsearch数据信息索引操作action support示例分析的详细内容,更多关于elasticsearch数据信息索引操作action support的资料请关注编程网其它相关文章!

--结束END--

本文标题: elasticsearch数据信息索引操作action support示例分析

本文链接: https://lsjlt.com/news/146836.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • elasticsearch数据信息索引操作action support示例分析
    目录抽象类分析doExecute方法performOperation代码master的相关操作总结抽象类分析 Action这一部分主要是数据(索引)的操作和部分集群信息操作。&nbs...
    99+
    2024-04-02
  • elasticsearch索引index数据功能源码示例
    从本篇开始,对elasticsearch的介绍将进入数据功能部分(index),这一部分包括索引的创建,管理,数据索引及搜索等相关功能。对于这一部分的介绍,首先对各个功能模块的分析,...
    99+
    2024-04-02
  • Elasticsearch文档索引基本操作增删改查示例
    接口幂等性 接口幂等性:数学概念,多次请求,相当于一次请求get,put,delete都是幂等性的接口post 存在幂等性的问题前端速度很快,点了两次,会生成两个订单用户在...
    99+
    2024-04-02
  • elasticsearch索引index之Translog数据功能分析
    目录translog的结构及写入方式translogFile的继承关系TranslogFile快照的方法总结translog的结构及写入方式 跟大多数分布式系统一样,es也通过临时写...
    99+
    2024-04-02
  • Elasticsearch 在地理信息空间索引的探索和演进问题分析
    目录一、业务背景二、背景知识三、方案演进3.1 史前时代3.2 Elasticsearch 2.0 版本3.3 Elasticsearch 2.2 版本3.4 Elasticsear...
    99+
    2024-04-02
  • mysql数据库索引应用的示例分析
    mysql数据库索引应用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一、索引的概念    ...
    99+
    2024-04-02
  • HTML5的IndexedDB索引数据库的示例分析
    本篇文章给大家分享的是有关HTML5的IndexedDB索引数据库的示例分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。IndexedDB是...
    99+
    2024-04-02
  • mysql中数据操作的示例分析
    这篇文章给大家分享的是有关mysql中数据操作的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取数据select * from tb1;select...
    99+
    2023-06-15
  • MongoDB数据库中索引和explain的示例分析
    这篇文章主要介绍了MongoDB数据库中索引和explain的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。mongodb 索引使用...
    99+
    2024-04-02
  • Python字符数据操作的示例分析
    这篇文章将为大家详细讲解有关Python字符数据操作的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。字符串操作字符串 + 运算符+运算符用于连接字符串,返回一个由连接在一起的操作数组成的字符串。&...
    99+
    2023-06-29
  • MongoDB数据库基础操作的示例分析
    这篇文章将为大家详细讲解有关MongoDB数据库基础操作的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。为了保存网站的用户数据和业务数据,通常需要一个数据库。Mo...
    99+
    2024-04-02
  • MySQL数据库基本操作的示例分析
    这篇文章主要介绍了MySQL数据库基本操作的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、数据库的安装这个就不在这里过多阐述了,...
    99+
    2024-04-02
  • oracle数据库中表的操作示例分析
    这篇文章主要为大家展示了“oracle数据库中表的操作示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“oracle数据库中表的操作示例分析”这篇文章吧。1...
    99+
    2024-04-02
  • MySQL数据库常用操作的示例分析
    小编给大家分享一下MySQL数据库常用操作的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!具体如下:一、查询不同表中同名...
    99+
    2024-04-02
  • PHP7进行数据库操作的示例分析
    这篇文章给大家分享的是有关PHP7进行数据库操作的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。更新 mysqli连接,推荐$conn = mysqli_connect('12...
    99+
    2023-06-14
  • node.js中fs文件系统目录操作与文件信息操作的示例分析
    这篇文章主要介绍node.js中fs文件系统目录操作与文件信息操作的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!目录操作如果存在该目录,就创建失败同步创建目录fs.mkdi...
    99+
    2024-04-02
  • Redis中数据结构与数据操作的示例分析
    小编给大家分享一下Redis中数据结构与数据操作的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Redis完成数据操作的...
    99+
    2024-04-02
  • 微信小程序中数据双向绑定与数据操作的示例分析
    这篇文章主要为大家展示了“微信小程序中数据双向绑定与数据操作的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“微信小程序中数据双向绑定与数据操作的示例分析...
    99+
    2024-04-02
  • TensorFlow中数据类型信息及转换的示例分析
    这篇文章主要介绍了TensorFlow中数据类型信息及转换的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、数据类型在tf中,数据类型有整型(默认是int32),浮...
    99+
    2023-06-25
  • HTML5的IndexedDB索引数据库实例分析
    本篇内容主要讲解“HTML5的IndexedDB索引数据库实例分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“HTML5的IndexedDB索引数据库实例分析...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作