返回顶部
首页 > 资讯 > 精选 >elasticsearch源码index action实现方式是什么
  • 929
分享到

elasticsearch源码index action实现方式是什么

2023-06-30 08:06:16 929人浏览 独家记忆
摘要

本篇内容主要讲解“elasticsearch源码index action实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“elasticsearch源码index 

本篇内容主要讲解“elasticsearch源码index action实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“elasticsearch源码index action实现方式是什么”吧!

action的作用

上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。

再概括一下action的作用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中会有一个实例(IndexAction INSTANCE = new IndexAction())这个实例用于client绑定对应的TransportAction(reGISterAction(IndexAction.INSTANCE, TransportIndexAction.class)),绑定过程发送在ActionModuel中。

另外在Action类中还会定义一个action的名字(String NAME = "indices:data/write/index")这个名字用于TransportService绑定对于的handle,用于处理NettyTransport接收到的信息。TransportAction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点能否处理,如果能够处理则调用相关的方法处理得到结果返回,否则将通过NettyTransport转发该请求到对应的node进行处理。所有的Transport的结构都是这种类型。

TransportAction的类图

首先看一下TransportAction的类图,所的Transport*action都继承自于它。

elasticsearch源码index action实现方式是什么

它主要由两个方法execute和doExecute,execute方法有两种实现,第一种实现需要自行添加actionListener。最终的逻辑都在doExecute方法中,这个方法在各个功能模块中实现。以下是TransportIndexAction的继承关系:

elasticsearch源码index action实现方式是什么

实现上由于功能划分的原因,TransportIndexAction直接继承自TranspShardReplicationOperationAction,这个抽象类中的方法是所有需要操作shard副本的功能action的父,因此它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下OperationTransportHandler,ReplicaoperationTransportHandler及AsyncShardOperationAction三个子类。

OperationTransportHandler的代码

如下所示:

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {//继承自BaseTransportRequestHanlder………………        @Override        public void messageReceived(final Request request, final TransportChannel channel) throws Exception {            // no need to have a threaded listener since we just send back a response            request.listenerThreaded(false);            // if we have a local operation, execute it on a thread since we don't spawn            request.operationThreaded(true);      //调用Transport的execute方法,通过channel返回结果            execute(request, new ActionListener<Response>() {                @Override                public void onResponse(Response result) {                    try {                        channel.sendResponse(result);                    } catch (Throwable e) {                        onFailure(e);                    }                }                @Override                public void onFailure(Throwable e) {                    try {                        channel.sendResponse(e);                    } catch (Throwable e1) {                        logger.warn("Failed to send response for " + actionName, e1);                    }                }            });        }

看过NettyTransport请求发送和处理的同学一定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求通过NettyTransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,可以看到它调用execute方法来处理。它的注册时在TransportShardReplicationOperationAction构造函数中完成的。

知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的实现方式跟前者完全一样,对应的action名称加了一个“[r]”,它的作用是处理需要在副本上进行的操作,代码如下所示:

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {……………………        @Override        public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {            try {                shardOperationOnReplica(request);            } catch (Throwable t) {                failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);                throw t;            }            channel.sendResponse(TransportResponse.Empty.INSTANCE);        }    }

可以看到代码结构非常像,只是调用了副本操作的方法shardOperationOnReplica,这个方法在这TransportShardReplicationOperationAction中是抽象的,它的实现在各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。

分析完这两个handle是不是对于action的处理过程有了一定的眉目了呢?但是这才是冰山一角,这两个Handler是用来接收来自其它节点的请求,如果请求的正好是本节点该如何处理呢?这些逻辑都在AsyncShardOperationAction类中。首先看一下它的内部结构:

elasticsearch源码index action实现方式是什么

因为TransportShardReplicationOperationAction的所有子类都是对索引的修改,会引起数据不一致,因此它的操作流程都是现在primaryShard上操作然后是Replicashard上操作。代码如下所示:

protected void doStart() throws ElasticsearchException {            try {          //检查是否有阻塞                ClusterBlockException blockException = checkGlobalBlock(observer.observedState());                if (blockException != null) {                    if (blockException.retryable()) {                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());                        retry(blockException);                        return;                    } else {                        throw blockException;                    }                }          //检测是否是创建索引                if (resolveIndex()) {                    internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));                } else {                    internalRequest.concreteIndex(internalRequest.request().index());                }                // check if we need to execute, and if not, return                if (!resolveRequest(observer.observedState(), internalRequest, listener)) {                    return;                }          //再次检测是否有阻塞                blockException = checkRequestBlock(observer.observedState(), internalRequest);                if (blockException != null) {                    if (blockException.retryable()) {                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());                        retry(blockException);                        return;                    } else {                        throw blockException;                    }                }                shardIt = shards(observer.observedState(), internalRequest);            } catch (Throwable e) {                listener.onFailure(e);                return;            }        //查找primaryShard            boolean foundPrimary = false;            ShardRouting shardX;            while ((shardX = shardIt.nextOrNull()) != null) {                final ShardRouting shard = shardX;                // we only deal with primary shardIt here...                if (!shard.primary()) {                    continue;                }                if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {                    logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");                    return;                }                if (!primaryOperationStarted.compareAndSet(false, true)) {                    return;                }                foundPrimary = true;          //primaryShard就在本地,直接进行相关操作                if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {                    try {                        if (internalRequest.request().operationThreaded()) {                            internalRequest.request().beforeLocalFork();                            threadPool.executor(executor).execute(new Runnable() {                                @Override                                public void run() {                                    try {                                        perfORMOnPrimary(shard.id(), shard);                                    } catch (Throwable t) {                                        listener.onFailure(t);                                    }                                }                            });                        } else {                            performOnPrimary(shard.id(), shard);                        }                    } catch (Throwable t) {                        listener.onFailure(t);                    }                } else {//primaryShard在其它节点上,将请求通过truansport发送到对应的节点。                    DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {                        @Override                        public Response newInstance() {                            return newResponseInstance();                        }                        @Override                        public String executor() {                            return ThreadPool.Names.SAME;                        }                        @Override                        public void handleResponse(Response response) {                            listener.onResponse(response);                        }                        @Override                        public void handleException(TransportException exp) {                            // if we Got disconnected from the node, or the node / shard is not in the right state (being closed)                            if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||                                    retryPrimaryException(exp)) {                                primaryOperationStarted.set(false);                                internalRequest.request().setCanHaveDuplicates();                                // we already marked it as started when we executed it (removed the listener) so pass false                                // to re-add to the cluster listener                                logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());                                retry(exp);                            } else {                                listener.onFailure(exp);                            }                        }                    });                }                break;            }            ………………        }

这就是对应请求的处理过程。

primary操作的方法

void performOnPrimary(int primaryShardId, final ShardRouting shard) {           ……                PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));                performReplicas(response);            …………        }

以上就是performOnPrimary方法的部分代码,首先调用外部类的shardOperationOnPrimary方法,该方法实现在各个子类中,在TransportIndexAction中的实现如下所示:

@Override    protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {        final IndexRequest request = shardRequest.request;        // 查看是否需要routing        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());        if (mappingMd != null && mappingMd.routing().required()) {            if (request.routing() == null) {                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());            }        }      //调用indexserice执行对应的index操作        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());        long version;        boolean created;        try {            Engine.IndexingOperation op;            if (request.opType() == IndexRequest.OpType.INDEX) {                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());                if (index.parsedDoc().mappingsModified()) {                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());                }                indexShard.index(index);                version = index.version();                op = index;                created = index.created();            } else {                Engine.Create create = indexShard.prepareCreate(sourceToParse,                        request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());                if (create.parsedDoc().mappingsModified()) {                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());                }                indexShard.create(create);                version = create.version();                op = create;                created = true;            }            if (request.refresh()) {                try {                    indexShard.refresh("refresh_flag_index");                } catch (Throwable e) {                    // ignore                }            }            // update the version on the request, so it will be used for the replicas            request.version(version);            request.versionType(request.versionType().versionTypeForReplicationAndRecovery());            assert request.versionType().validateVersionForWrites(request.version());            IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);            return new PrimaryResponse<>(shardRequest.request, response, op);        } catch (WriteFailureException e) {            if (e.getMappingTypeToUpdate() != null) {                DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());                if (docMapper != null) {                    mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());                }            }            throw e.getCause();        }    }

上面的代码就是index的执行过程,这一过程涉及到index的底层操作,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操作。副本可能有多个,因此首先调用了performReplicas方法,在这个方法中首先开始监听集群的状态,然后便利所有的副本进行处理,如果是异步则加入一个listener,否则同步执行返回结果。最后调用performReplica,在该方法中调用外部类的抽象方法shardOperationOnReplica。 这一过程比较简单,这里就不再贴代码,有兴趣可以参考相关源码。

到此,相信大家对“elasticsearch源码index action实现方式是什么”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

--结束END--

本文标题: elasticsearch源码index action实现方式是什么

本文链接: https://lsjlt.com/news/327757.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • elasticsearch源码index action实现方式是什么
    本篇内容主要讲解“elasticsearch源码index action实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“elasticsearch源码index ...
    99+
    2023-06-30
  • elasticsearch源码分析index action实现方式
    目录action的作用TransportAction的类图OperationTransportHandler的代码primary操作的方法总结action的作用 上一篇从结构上分析了...
    99+
    2024-04-02
  • vuex mutation action同级调用方式是什么
    本篇内容介绍了“vuex mutation action同级调用方式是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!...
    99+
    2023-06-29
  • 源码 | 解析 Redo Log 实现方式
    柯煜昌 顾问软件工程师 目前从事 RadonDB 容器化研发,华中科技大学研究生毕业,有多年的数据库内核开发经验。 | 前言 提及 Redo Log(重做日志)与 LSN(log sequece number)时,经常被问及以下问题:...
    99+
    2017-03-28
    源码 | 解析 Redo Log 实现方式
  • 返回最大值的index pytorch方式是什么
    这篇文章主要讲解了“返回最大值的index pytorch方式是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“返回最大值的index pytorch方式是什么”吧!返...
    99+
    2023-07-02
  • Redis源码类型的实现原理是什么
    这期内容当中小编将会给大家带来有关Redis源码类型的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。  Redis源码类型的实现原理有什么  Redis内部...
    99+
    2024-04-02
  • 开源纯C#表达式编译器的实现方法是什么
    这篇文章主要讲解了“开源纯C#表达式编译器的实现方法是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“开源纯C#表达式编译器的实现方法是什么”吧!一、   引...
    99+
    2023-06-19
  • Nacos源码阅读方法是什么
    这篇文章主要介绍“Nacos源码阅读方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Nacos源码阅读方法是什么”文章能帮助大家解决问题。先给大家献上一张我梳理的高清源码图,方便大家对nac...
    99+
    2023-06-29
  • 分布式锁redis实现方式是什么
    分布式锁的Redis实现方式有两种:基于SETNX命令和基于RedLock算法。1. 基于SETNX命令:使用Redis的SETNX...
    99+
    2023-09-12
    redis
  • Java泛型实现方式是什么
    这篇文章主要讲解了“Java泛型实现方式是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java泛型实现方式是什么”吧!Java 泛型实现方式Java 采用**类型擦除(Type era...
    99+
    2023-06-16
  • await错误捕获实现方式源码解析
    目录前言Promise 的使用方法await-to-js源码总结前言 Promise 是一种在 JavaScript 中用于处理异步操作的机制。Promise 在开发中被广泛使用,这...
    99+
    2022-12-25
    await 错误捕获 await 错误
  • Python实现单例模式的方式是什么
    本篇内容介绍了“Python实现单例模式的方式是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!单例模式(Singleton Patter...
    99+
    2023-07-04
  • Vue.js源码的使用方法是什么
    本篇内容介绍了“Vue.js源码的使用方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!立即执行函数...
    99+
    2024-04-02
  • Golang源码安装的方法是什么
    这篇文章主要介绍“Golang源码安装的方法是什么”,在日常操作中,相信很多人在Golang源码安装的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang源码安装的方法是什么”的疑惑有所帮助!...
    99+
    2023-07-05
  • Java泛型的实现方式是什么
    本篇内容主要讲解“Java泛型的实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java泛型的实现方式是什么”吧!Java 泛型实现方式Java 采用**类型擦除(Type eras...
    99+
    2023-06-16
  • BUILDER模式的实现方法是什么
    本篇内容主要讲解“BUILDER模式的实现方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“BUILDER模式的实现方法是什么”吧!效果它将构造代码和表示代码分开Builder模式将构建对...
    99+
    2023-06-19
  • CSS代码格式化的不同表现方式是什么
    这篇文章主要介绍了CSS代码格式化的不同表现方式是什么,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。   格式化与CSS的功能无关。这些仅...
    99+
    2024-04-02
  • Sourcemap源代码映射的方法是什么
    今天小编给大家分享一下Sourcemap源代码映射的方法是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。Sourcema...
    99+
    2023-07-05
  • php优惠券的实现方式是什么
    本文操作环境:windows7系统、PHP7.4版、DELL G3电脑php优惠券的实现方式是什么?用PHP做了一个领取优惠券活动的示例代码业务需求优惠券活动,具体还是要根据自己的需求。以下是最近实现的优惠券活动,主要的业务需求:根据后端设...
    99+
    2017-12-07
    PHP 领取优惠券
  • java零拷贝的实现方式是什么
    本篇内容主要讲解“java零拷贝的实现方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java零拷贝的实现方式是什么”吧!1.什么是零拷贝零拷贝字面上的意思包括两个,“零”和“拷贝”:“...
    99+
    2023-06-29
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作