返回顶部
首页 > 资讯 > 精选 >Nacos源码阅读方法是什么
  • 585
分享到

Nacos源码阅读方法是什么

2023-06-29 10:06:57 585人浏览 八月长安
摘要

这篇文章主要介绍“Nacos源码阅读方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Nacos源码阅读方法是什么”文章能帮助大家解决问题。先给大家献上一张我梳理的高清源码图,方便大家对nac

这篇文章主要介绍“Nacos源码阅读方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Nacos源码阅读方法是什么”文章能帮助大家解决问题。

先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识。

Nacos源码阅读方法是什么

有了这张图,我们就很容易去看nacos源码了。

如何找切入点

首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手

 <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>  </dependency>

进入这个依赖文件,会发现它又依赖了一个组件:

<dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId></dependency>

进入依赖之后,我们发现它长这样:

Nacos源码阅读方法是什么

从这张图中,我们发现了一个熟悉的配置文件spring.factories,这是sringboot自动装配的必备文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\  com.alibaba.cloud.nacos.reGIStry.NacosServiceRegistryAutoConfiguration,\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.Reactive.NacosReactiveDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.confiGClient.NacosConfigServerAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

因为这张主要说的是服务注册源码,所以我们可以只用关注(NacosServiceRegistryAutoConfiguration)自动装配文件

public class NacosServiceRegistryAutoConfiguration {@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosDiscoveryProperties);}@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties,ApplicationContext context) {return new NacosRegistration(nacosDiscoveryProperties, context);public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);}

我们看到的是三个bean注入,这里给大家介绍一个看源码的小技巧:自动装配的文件中申明的bean类,我们只需要看带有auto的bean,这个往往是入口;NacosAutoServiceRegistration 带有auto,我们点进去看看里面都有什么:

@Overrideprotected void register() {if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {log.debug("Registration disabled.");return;}if (this.registration.getPort() < 0) {this.registration.setPort(getPort().get());}super.register();}

里面有一个register()方法,我在这里打个断点,因为我猜测这个就是注册的入口,我现在使用debug模式,启动一个服务,看它会不会调用这个方法:

客户端注册

这里贴上我debug后,进入register方法的调用链截图

Nacos源码阅读方法是什么

看到这个调用链,看到一个onApplicationEvent的回调方法,找到这个方法所在的类AbstractAutoServiceRegistration
这个类继承了ApplicationListener这个多播器监听器,spring启动之后,会发布多播器事件,然后回调实现多播器组件的onApplicationEvent方法,我们从这个方法开始分析:

public void onApplicationEvent(WEBServerInitializedEvent event) {bind(event); // 绑定端口,并启动}@Deprecatedpublic void bind(WebServerInitializedEvent event) {// 设置端口    this.port.compareAndSet(0, event.getWebServer().getPort());    // 启动客户端注册组件this.start();}public void start() {        // 省略分支代码        // 调用注册register();}

因为SpringCloud提供了多种注册中心扩展,但是我们这里只引用了nacos注册中心,所以这里直接调用的是NacosServiceRegistry的register方法:

public void register(Registration registration) {    // 省略分支代码    // 获取服务idString serviceId = registration.getServiceId();// 获取组配置String group = nacosDiscoveryProperties.getGroup();     // 封装服务实例Instance instance = getNacosInstanceFromRegistration(registration);// 调用 命名服务的 registerInstance方法 注册实例namingService.registerInstance(serviceId, group, instance);}

进入到registerInstance方法

    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {        if (instance.isEphemeral()) {            // 省略分支代码            // 与服务端建立心跳,默认每隔5秒定时发送新跳包            this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);        }        // 通过Http方式向服务端发送注册请求        this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);    }

serverproxy通过调用对http进行封装的reapi方法,向服务端接口("/nacos/v1/ns/instance")发送请求,

   public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});        Map<String, String> params = new HashMap(9);        params.put("namespaceId", this.namespaceId);        params.put("serviceName", serviceName);        params.put("groupName", groupName);        params.put("clusterName", instance.getClusterName());        params.put("ip", instance.getIp());        params.put("port", String.valueOf(instance.getPort()));        params.put("weight", String.valueOf(instance.getWeight()));        params.put("enable", String.valueOf(instance.isEnabled()));        params.put("healthy", String.valueOf(instance.isHealthy()));        params.put("ephemeral", String.valueOf(instance.isEphemeral()));        params.put("metadata", JSON.tojsONString(instance.getMetadata()));        this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");    }

我们知道nacos经常是以集群形式部署的,那客户端是如何选择其中一个节点发送呢,肯定得实现负载均衡的逻辑,我们点击reqAPI,看它是如何实现的

 if (servers != null && !servers.isEmpty()) {                Random random = new Random(System.currentTimeMillis());                // 随机获取一个索引,servers保存的是所有nacos节点地址                int index = random.nextInt(servers.size());                // 遍历所有节点,根据index值,从servers中找到对应位置的server,进行请求调用,如果调用成功则返回,否则依次往后遍历,直到请求成功                for(int i = 0; i < servers.size(); ++i) {                    String server = (String)servers.get(index);                    try {                        return this.callServer(api, params, server, method);                    } catch (NacosException var11) {                        exception = var11;                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);                    } catch (Exception var12) {                        exception = var12;                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);                    }                    // index+1 然后取模 是保证index不会越界                    index = (index + 1) % servers.size();                }                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());            }

到这里,客户端注册的代码已经分析完了,不过这还不是本篇的结束,我们还得继续分析服务端是如何处理客户端发送过来的注册请求:

服务端处理客户端注册请求

如果需要查看服务端源码的话,则需要将nacos源码下下来 下载地址

我们从服务注册api接口地址(/nacos/v1/ns/instance),可以找到对应的controller为(com.alibaba.nacos.naming.controllers.InstanceController)

因为注册实例发送的是post请求,所以直接找被postmapping注解的register方法

 @CanDistro    @PostMapping    public String register(httpservletRequest request) throws Exception {// 获取服务名        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 获取命名空间id        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 注册实例serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));        return "ok";    }

我们点击进入到registerInstance方法:

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {        createEmptyService(namespaceId, serviceName, instance.isEphemeral());        Service service = getService(namespaceId, serviceName);        if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }// 执行添加实例的操作        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }

分析

在nacos中,注册实例后,还需要将注册信息同步到其他节点,所有在nacos中存在两种同步模式AP和CP,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
com.alibaba.nacos.naming.core.ServiceManager.addInstance()

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {        // 生成服务的key        String key = KeyBuilder.buildInstanceLisTKEy(namespaceId, serviceName, ephemeral);        // 获取服务        Service service = getService(namespaceId, serviceName);        // 使用同步处理        synchronized (service) {            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);            Instances instances = new Instances();            instances.setInstanceList(instanceList);            // 调用consistencyService.put 处理同步过来的服务            consistencyService.put(key, instances);        }    }

我们在进入到consistencyService.put方法中

Nacos源码阅读方法是什么

点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

    @Override    public void put(String key, Record value) throws NacosException {        // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步        mapConsistencyService(key).put(key, value);    }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

   private ConsistencyService mapConsistencyService(String key) {        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;    }

AP 方式同步的流程(ephemeralConsistencyService) 本地服务器处理注册信息&将注册信息同步到其它节点

    @Override    public void put(String key, Record value) throws NacosException {        // 处理本地注册列表        onPut(key, value);        // 添加阻塞任务,同步信息到其他集群节点        taskDispatcher.addTask(key);    }

处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

       @Override        public void run() {            List<String> keys = new ArrayList<>();            while (true) {                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),                        TimeUnit.MILLISECONDS);                    // 省略判断代码                    // 添加同步的key                    keys.add(key);                    // 计数                    dataSize++;                    // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {                        // 遍历所有集群节点,直接调用http进行同步                        for (Server member : dataSyncer.getServers()) {                            if (NetUtils.localServer().equals(member.getKey())) {                                continue;                            }                            SyncTask syncTask = new SyncTask();                            syncTask.setKeys(keys);                            syncTask.setTargetServer(member.getKey());                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));                            }                            dataSyncer.submit(syncTask, 0);                        }                        // 记录本次同步时间                        lastDispatchTime = System.currentTimeMillis();                        // 计数清零                        dataSize = 0;                    }            }        }    }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

同步少量的key解决方案: 只有积累到指定数量的key,才发起批量同步距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步 CP 方式同步的流程(RaftConsistencyServiceImpl)

cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

nacos使用raft协议来进行选举leader,来实现cp模式。

同样进入到 RaftConsistencyServiceImpl的put方法

    @Override    public void put(String key, Record value) throws NacosException {        try {            raftCore.signalPublish(key, value);        } catch (Exception e) {            Loggers.RAFT.error("Raft put failed.", e);            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);        }    }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走if (!isLeader()) {            JSONObject params = new JSONObject();            params.put("key", key);            params.put("value", value);            Map<String, String> parameters = new HashMap<>(1);            parameters.put("key", key);            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);            return;        }

同样采用同样队列的方式,去处理本地注册列表

onPublish(datum, peers.local());public void onPublish(Datum datum, RaftPeer source) throws Exception {               // 添加同步key任务到阻塞队列中        notifier.addTask(datum.key, ApplyAction.CHANGE);        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);    }

遍历所有集群节点,发送http同步请求

 for (final String server : peers.allServersIncludeMyself()) {                // 如果是leader,则不进行同步                if (isLeader(server)) {                    latch.countDown();                    continue;                }                // 组装url 发送同步请求到其它集群节点                final String url = buildURL(server, API_ON_PUB);                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {                    @Override                    public Integer onCompleted(Response response) throws Exception {                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",                                datum.key, server, response.getStatusCode());                            return 1;                        }                        latch.countDown();                        return 0;                    }                    @Override                    public STATE onContentWriteCompleted() {                        return STATE.CONTINUE;                    }                });            }

关于“Nacos源码阅读方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网精选频道,小编每天都会为大家更新不同的知识点。

--结束END--

本文标题: Nacos源码阅读方法是什么

本文链接: https://lsjlt.com/news/324118.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Nacos源码阅读方法是什么
    这篇文章主要介绍“Nacos源码阅读方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Nacos源码阅读方法是什么”文章能帮助大家解决问题。先给大家献上一张我梳理的高清源码图,方便大家对nac...
    99+
    2023-06-29
  • Nacos源码阅读方法
    为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的。 这篇文章将会带大家阅读Nacos源码 以及 教大家阅读源码的技巧,我们正式开始吧! 先给大家献上一张我...
    99+
    2024-04-02
  • jQuery1.5.1 animate方法源码阅读
    复制代码 代码如下: animate: function( prop, speed, easing, callback ) { if ( jQuery.isEmptyObject(...
    99+
    2022-11-21
    animate 源码阅读
  • 怎么阅读Java源码
    本篇内容主要讲解“怎么阅读Java源码”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么阅读Java源码”吧!Java源码初接触如果你进行过一年左右的开发,喜欢用eclipse的debug功能。...
    99+
    2023-06-17
  • 程序员是怎么阅读源码的
    本篇内容介绍了“程序员是怎么阅读源码的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!React:React 架构的演变 - 从同步到异步Re...
    99+
    2023-06-15
  • PostgreSQL怎么阅读源代码
    这篇文章主要介绍PostgreSQL怎么阅读源代码,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!自底向上的方法    先说自底向上的方法。简单来说,就是从一个具体的小功能点出发阅读和...
    99+
    2024-04-02
  • 怎么样阅读Java源码
    这篇文章主要介绍了怎么样阅读Java源码,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。阅读Java源码的前提条件:1、技术基础在阅读源码之前,我们要有一定程度的技术基础的支持...
    99+
    2023-06-02
  • 阅读jQuery 源码的18个惊喜指的是什么
    本篇文章为大家展示了阅读jQuery 源码的18个惊喜指的是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。我热爱 jQuery,且尽管我认为自己算是一名高级 J...
    99+
    2024-04-02
  • Liunx中快速阅读的方法是什么
    今天给大家介绍一下Liunx中快速阅读的方法是什么。文章的内容小编觉得不错,现在给大家分享一下,觉得有需要的朋友可以了解一下,希望对大家有所帮助,下面跟着小编的思路一起来阅读吧。你听说过“快速阅读”吗?说实话我也没有。直到一个名叫Sprit...
    99+
    2023-06-28
  • Java终止线程实例和stop()方法源码阅读
    了解线程概念线程 是程序中的执行线程。Java 虚拟机允许应用程序并发地运行多个执行线程。线程特点拥有状态,表示线程的状态,同一时刻中,JVM中的某个线程只有一种状态;·NEW尚未启动的线程(程序运行开始至今一次未启动的线程)·RUNNAB...
    99+
    2023-05-30
  • nacos中oracle数据源配置的方法是什么
    在Nacos中配置Oracle数据源,可以通过以下步骤进行: 1、登录Nacos控制台,进入配置管理页面。2、点击“添加配置”按钮,...
    99+
    2024-04-09
    nacos
  • Go Excelize API源码阅读Close及NewSheet方法示例解析
    目录一、Go-Excelize简介二、Close()三、NewSheet()一、Go-Excelize简介 Excelize 是 Go 语言编写的用于操作 Office Excel ...
    99+
    2024-04-02
  • 怎么进行axios源码阅读与分析
    怎么进行axios源码阅读与分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。概述在前端开发过程中,我们经常会遇到需要发送异步...
    99+
    2024-04-02
  • rss订阅源是什么意思
    rss(简易聚合)是一种网络格式,用于分发内容,方便订阅网站更新。通过订阅源文件,rss 订阅源自动收集内容并通知用户,节省时间、提供个性化体验、实时更新信息和备份内容。订阅方式包括订阅...
    99+
    2024-04-02
  • linux中nacos的使用方法是什么
    在Linux中使用Nacos,可以通过以下步骤进行: 下载Nacos Server:首先从Nacos的官方网站(https://...
    99+
    2024-03-01
    linux nacos
  • Linux中nacos重启的方法是什么
    在Linux中,可以使用以下命令重启nacos服务: 进入nacos安装目录(比如/home/nacos): cd /home/...
    99+
    2024-03-01
    linux nacos
  • nacos单机启动的方法是什么
    要启动nacos的单机模式,可以按照以下步骤操作: 下载Nacos Server压缩包并解压缩到任意目录。 在解压目录的co...
    99+
    2024-04-02
  • nacos单机部署的方法是什么
    nacos是一个开源的服务发现和配置管理平台,可以帮助管理不同应用之间的依赖关系和配置信息。下面是nacos单机部署的方法: 下...
    99+
    2024-04-02
  • Vue.js源码的使用方法是什么
    本篇内容介绍了“Vue.js源码的使用方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!立即执行函数...
    99+
    2024-04-02
  • Golang源码安装的方法是什么
    这篇文章主要介绍“Golang源码安装的方法是什么”,在日常操作中,相信很多人在Golang源码安装的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang源码安装的方法是什么”的疑惑有所帮助!...
    99+
    2023-07-05
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作