Nacos服务注册源码分析
服务注册表serviceMap
1 | /** |
二、Nacos服务注册入口
1、SpringBoot自动装配
使用Nacos做为注册中心,首先要在pom中引入:
1 | <dependency> |
嗯,然后呢?SpringBoot项目大家都知道有一个很重要的东西:spring.factories,这个是针对引入的jar包做自动装配用的;在SpringBoot项目运行时,SpringFactoriesLoader
类会去寻找META-INF/spring.factories
文件,spring.factories用键值对的方式记录了所有需要加入容器的类,key为:org.springframework.boot.autoconfigure.EnableAutoConfiguration
,value为各种xxxAutoConfiguration
;或key为:org.springframework.cloud.bootstrap.BootstrapConfiguration
,value为各种xxxBootstrapConfiguration
。
大家对EnableAutoConfiguration
都比较熟悉了,自动装配的撒;而BootstrapConfiguration
可能会相对陌生一点,它呢是做自定义启动配置的,也就是说:所有的Bean都会在SpingApplicatin启动前加入到它的上下文中。
2、Nacos中的spring.factories
查看引入的nacos-discovery包的层级结构,找到spring.factories文件;
打开这文件瞅一瞅:
上面我有聊到BootstrapConfiguration
是在SpringBoot项目启动时自动加载配置到SpingApplicatin的上下文中。
我们进去点进去看一下这个类NacosDiscoveryClientConfigServiceBootstrapConfiguration
:
emmm,它好像啥也没干,就引入了两个自动配置类:NacosDiscoveryClientAutoConfiguration
,NacosDiscoveryAutoConfiguration
;咱也不知道哪个是和服务注册相关的,先都点进去看看撒。
(1)NacosDiscoveryClientAutoConfiguration:
额额额,从命名来看,好像没有和注册相关的,先看看下一个类吧。
(2)NacosDiscoveryAutoConfiguration:
卧槽,这个好。这个类里能看到用注册register命名的方法;感觉就它了。
3、客户端自动注册入口类NacosDiscoveryClientAutoConfiguration
NacosDiscoveryAutoConfiguration
类内部内部管理了三个类:NacosServiceRegistry
(完成服务注册功能,实现ServiceRegistry接口),NacosRegistration
(注册时用来存储nacos服务端的相关信息),NacosAutoServiceRegistration
(自动注册功能)。
这个自动注册功能是怎么实现的呢?从类的命名上来看,感觉是在NacosAutoServiceRegistration
中的,我们跟进去看一下:NacosAutoServiceRegistration
类继承自AbstractAutoServiceRegistration
类;
再看AbstractAutoServiceRegistration
类实现了ApplicationListener
接口。ApplicationListener接口实际是一个事件监听器,其监听WebServerInitializedEvent
事件。
在AbstractAutoServiceRegistration
类的bind()方法中会绑定一个事件,启动调用start()方法进行事件的绑定操作。
start()方法中再调用register()方法实现注册功能,具体的register()内部逻辑由子类实现。
这里和AQS一样,是典型模板方法设计模式。
go on,go on;我们接着看ServiceRegistry
接口的实现类,点一下发现直接蹦到了NacosServiceRegistry
类上;感觉对味了。
虽然我们找到了服务注册的入口,但是呢,其实spring-cloud-commons
包中定义了一套服务注册规范,集成Spring
Cloud实现服务注册的组件都会实现ServiceRegistry
接口。
4、Nacos服务注册类NacosServiceRegistry
现在我们继续接着NacosServiceRegistry#register()方法来看:
先是组装服务实例信息,然后走入到了NamingService
接口#registerInstance()方法,我跟,我继续往里跟;点到NamingService
接口的实现类NacosNamingService
;
NacosNamingService类在初始化的时候会实例几个比较关键的类:
1 | // 1)发送心跳的 |
NacosNamingService
#registerInstance()方法套娃如下:
1 |
|
这里有两个重要的点:
1、如果实例节点是临时节点的话(默认是临时的),会组装一个心跳信息,然后通过
BeatReactor
组件发送心跳到服务端
,也就是服务续约
;
2、调用ServerProxy
组件注册服务
到服务端,即服务上线
。
在聊服务续约和服务注册之前,我们先看一下serviceName的组成,其实就是就是上面传递下来的groupName和serviceName用@@拼接起来作为新的serviceName,以达到Group层的数据隔离。
1 | NamingUtils.getGroupedName(serviceName, groupName) |
1)Client服务续约
即Nacos Client端定时发送心跳到服务端。
在BeatReactor
#addBeatInfo()方法中,会启动一个定时任务,立即执行BeatTask这个任务。
我们接着来看BeatTask这个Runnable接口的实现类,是如何运行的?
我们可以看到它主要展现两个能力:
(1)调用
NamingProxy
向Nacos服务端发送心跳;
(2)再开启一个定时任务,延时5S再持续发送心跳到Nacos服务端,即默认每5s向Nacos服务端发送一次心跳。
发送心跳就很简单了,直接采用HTTP接口调用
的方式,调用服务端的/nacos/v1/ns/instance/beat
接口。
2)Client服务注册
在NamingProxy
#registerService()方法中直接做HTTP请求调用Nacos Server的接口/nacos/v1/ns/instance
做服务注册:
1 | public static String WEB_CONTEXT = "/nacos"; |
5、Nacos Server端如何接收心跳、服务注册请求
上面我们聊到了Nacos Client端分别调用服务端的/nacos/v1/ns/instance/beat
、/nacos/v1/ns/instance
接口进行服务续约和服务注册,下面我们聊一下服务端对这个两个请求是如何处理的?
0)先找入口
Nacos Client通过NamingProxy
类调用Nacos Server,以开源框架的命名规范来看,Nacos Server的源码中应该有个和naming相关命名的模块;
接着往下展开,找到controllers包下的InstanceController
:
1)Server端接收服务注册请求
InstanceController#register()
方法是Nacos Server接收服务注册请求的入口:
1 |
|
其中namespaceId可以做一层数据隔离;
我们接着往里看getInstanceOperator().registerInstance()
:
InstanceOperator接口有两个实现类,分别是:InstanceOperatorClientImpl
,InstanceOperatorServiceImpl
,由于我这里是作为服务端,我们关注InstanceOperatorServiceImpl
;
1、组装服务的实例信息,包含:IP、Port
2、调用ServiceManager的registerInstanc()方法,完成真正的服务注册操作。
ServiceManager#registerInstanc()方法中主要做四件事:
1、如果服务不存在,则创建一个服务;
2、从本地缓存《Map(namespace, Map(group::serviceName, Service))》中获取服务信息;
3、校验服务不能为null;
4、将服务的实例信息添加到DataStore的dataMap缓存中
(1)创建服务
这里的操作也很简单,套娃套娃套娃,不对是封装、抽象。最终将服务信息保存到本地缓存serviceMap中。
1 | public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { |
注意:在service.init()初始化服务时,会启动一个定时任务做不健康服务的服务剔除
:
1 | /** |
走到HealthCheckReactor.scheduleCheck()
方法中,会延时5s开启一个固定延时5s的FixedDelay类型的定时任务执行服务剔除操作;
另外:由于ClientBeatCheckTask
不是NacosHealthCheckTask
的实现类,所以定时任务中执行的方法为ClientBeatCheckTask
中的run()方法;
看一下ClientBeatCheckTask
的run()方法:
通过判断当前时间和实例最后一次心跳时间的间隔是否大于阈值(默认15s),决定是否进行服务剔除/下线;
(2)从本地缓存Map中获取服务信息
从本地缓存serviceMap中获取服务信息,没别的啥操作
1 | public Service getService(String namespaceId, String serviceName) { |
(3)校验服务不能为null
一个单纯的判空处理;
1 | public void checkServiceIsNull(Service service, String namespaceId, String serviceName) throws NacosException { |
(4)**服务实例信息”持久化”
将服务的相应实例信息保存到DataStore的serviceMap中;由于存在多个服务实例同时注册的场景,所以要加一个synchronized锁。
1 | public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) |
ConsistencyService
是一个接口,由于我们默认是采用ephemeral
方式(在聊Nacos
Client时我们也有提到,服务端见Instance#isEphemeral()
或 SwitchDomain#isDefaultInstanceEphemeral()
),所以以临时Client为例,我们看一下DistroConsistencyServiceImpl
;如果是持久client,则关注RaftConsistencyServiceImpl。
1 |
|
进入到DistroConsistencyServiceImpl
类的onPut()方法:
1 | public void onPut(String key, Record value) { |
1. 往DataStore的dataMap中添加数据
1 |
|
2. 进入到Notifier类内部,通知服务实例信息变更
进入到DistroConsistencyServiceImpl的内部类Notifier
,先看其addTask()方法:
最重要的一步将服务实例信息添加到tasks任务队列中,按常理来说到这也就结束了;但是添加到任务队列之后呢,怎么处理嘞?
我们注意到DistroConsistencyServiceImpl
中有一个@PostConstruct
修饰的init()
方法,也就是说在DistroConsistencyServiceImpl
类构造器执行之后会执行这个方法启动Notifier通知器
:
我们走入Notifer#run()
方法:
首先从tasks任务丢列中取出任务,调用自己的handle()方法处理任务:
handle()方法中调用监听器listener的onChange()/onDelete()方法执行相应通知数据更新、删除的逻辑。
下面我着重看一下通知数据变更的onChange()方法:
RecordListener
是一个接口,它有三个实现,我们进入到它的实现类Service
中;
着重看一下updateIPs()方法,其他代码不涉及到主链路;
遍历要注册的instance集合,采用一个临时的Map记录clusterName和Instance实例的对应关系,然后更新clusterMap<clusterName,
Cluster>:
维护
Cluster
中实例的代码大家感兴趣可以点进去看看,主要思想还是比对新老数据,找出新的instance,与挂掉的instance,最后更新
cluster对象里面的存放临时节点的集合或 存放永久节点的集合
。
最后调用PushService通知单客户端服务信息发生变更。
OK,fine!Nacos服务注册,服务端主要是将服务信息和服务的实例信息保存到两个Map(serviceMap、dataMap)中;启动一个延时5s执行的定时任务做服务剔除/下线;Notifier
做普通服务集群实例信息维护,调用PushService通知客户端服务信息发生变更。
2)Server端接收心跳请求
InstanceController#beat()
方法是Nacos Server接收心跳请求的入口:
其内部调用InstanceOperator
接口的handleBeat()方法做心跳判断;InstanceOperator
有两个实现类,我们这里讨论的是服务端如何处理客户端的心跳请求,因此我们看InstanceOperatorServiceImpl
类;InstanceOperatorServiceImpl
#handleBeat():
1、如果服务还没有注册,就先注册;
2、接着获取服务信息,校验服务不能为null;
3、最后,调用Service#processClientBeat()
处理客户端心跳;
Service#processClientBeat()
中启动一个只执行一次的任务:
任务的逻辑体现在ClientBeatProcessor
#run()方法中:
1、更新实例的最后一次心跳时间,进而决定服务是否需要下线/剔除;
2、如果服务之前是不可用的,则设置服务为可用的,并调用PushService
通知客户端;
服务端对于心跳的处理,主要两件事:维护心跳的最后一次时间,如果服务变为可用状态则通知客户端;另外在服务注册创建Service服务时会启动一个固定延时5S执行的定时任务做服务的剔除,其中以心跳时间为根本逻辑。
__END__