简单分析Nacos是如何进行服务发现以及服务注册的

什么是Nacos

官方定义

an easy-to-use dynamic service discovery, configuration and service management platform for building cloud native applications

官网

https://nacos.io/en-us/

几个重要概念

  • Instance
    • 提供一个或多个服务的具有可访问网络地址(IP:Port)的进程
  • Service
    • 通过预定义接口网络访问的提供给客户端的软件功能
  • Group
    • Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或 Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和 MQ_topic 配置

Demo演示

由于目前我认领了Nacos的关于curd servicefeature,因此我就直接拿我的测试用例来讲解Nacos Client是如何进行服务注册的吧

所有测试用例需要先行设置好Nacos Client信息

1
2
3
4
5
6
7
@Before
public void before() throws NacosException {
Properties properties = new Properties();
// Nacos 的服务中心的地址
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
nameService = NacosFactory.createNamingService(properties);
}

然后我们在测试用例中写上

1
2
3
4
5
@Test
public void registerInstance() throws NacosException {
// 向Nacos的nacos-api服务中注册一个实例,声明该实例所在的ip以及端口信息
nameService.registerInstance("nacos-api", "127.0.0.1", 8009);
}

最终执行后,Nacos中的控制台信息如下

nacos控制台信息

可以发现,我们的实例以及在Nacos中注册成功了,接下来就是分析,这个实例的注册,Nacos client是如何执行的

代码分析

Nacos Client端

我就直接定位到registerInstance的最底层函数实现好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);

beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

这里的registerInstance函数参数有三个:serviceNamegroupName以及Instance instance,而这个Instance对象就是一个实例,而serviceName以及groupName参数是将实例注册到Nacos中的分组名为groupName服务名为serviceName的服务中

这里出现了BeatInfo对象,这个BeatInfo对象是Nacos用于心跳任务的心跳信息,首先要对实例进行判断是否是一个临时标签的实例对象,如果是的话需要为该Instance设置一个心跳任务信息,用于Client主动上报自己的健康状态信息。那为什么只有临时的实例才需要设置心跳任务信息呢?这里引用官方博客的解释

如果是临时实例,则不会在Nacos服务端持久化存储,需要通过上报心跳的方式进行保活,如果一段时间内没有上报心跳,则会被Nacos服务端摘除。在被摘除后如果又开始上报心跳,则会重新将这个实例注册。持久化实例则会持久化到Nacos服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康

因此由于临时的实例不会在Nacos服务中心进行持久化存储,因此才需要一个心跳任务,将实例的信息放在心跳任务中不断的向Nacos服务中心上报。

接着就是向Nacos Server段发送注册实例的Http请求了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);

final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));

reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

这个registerService就是将Instance实例的信息封装成Http的请求进行发送到Nacos Server端进行注册

1
2
3
4
5
6
7
8
9
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {

List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}

return reqAPI(api, params, snapshot, method);
}

这个reqAPI函数中我们看到了一个很有意思的操作List<String> snapshot = serversFromEndpoint,然后又对serverList进行是否empty判断,如果serverList不为空,则替换serversFromEndpoint的数据。这是为什么呢?这里其实涉及到Nacos集群的概念了,serversFromEndpoint其实是向Nacos Server获取当前Nacos集群中的server列表,但是Nacos中用户设置的Nacos Server Addr的优先级是大于Nacos Client端去远程获取到的server列表的,这里其实就是获取Nacos Server的地址列表信息。

最后就是Nacos Client端真正发起注册实例的请求了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method);
} catch (NacosException e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}

可以看到如果Nacos Server端的地址列表为空,那么Nacos Server应该是单机模式部署的,因此直接到最后一个for循环,循环次数为默认设置的Http请求可重试次数;如果Nacos Serve是已集群模式部署的话,那么会采用随机策略选择一个Nacos Server Addr作为进行Instance注册的Http请求地址;如果请求失败的话则再次重新选取一个Nacos Server

Nacos Server端

Nacos Server端负责处理Nacos Client的实例注册的Controllercom.alibaba.nacos.naming.controllers里面的InstanceController,根据官网nacos-open-api可以找到处理Instance注册的控制器

1
2
3
4
5
6
7
8
9
10
@CanDistro
@RequestMapping(value = "", method = RequestMethod.POST)
public String register(HttpServletRequest request) throws Exception {

String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}

register(HttpServletRequest request)就是Nacos Server端负责处理Instance注册请求的

1
2
3
4
5
6
7
8
9
10
11
12
13
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

createEmptyService(namespaceId, serviceName, instance.isEphemeral());

Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

这里要先解释一下,NacosModelservice->cluster->instance这样的,因此在注册实例时,需要先注册一个Service,具体的操作在createEmptyService函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {

Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
service.validate();
if (local) {
putService(service);
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
} else {
addOrReplaceService(service);
}
}
}

这里会先根据namespaceId以及serviceName去获取一个Service对象,如果service==null,则代表该Service未在Nacos Server端注册,因此需要先注册一个Service,才可以进行接下来的注册Instance;同时根据实例是否是临时的标识决定该实例是否需要持久化到Nacos Server中,如果是一个临时的Instance,则Nacos Server会为其设置一致性consistencyServicelistener,同时Service开启一个ClientBeatCheckTask,用于检查该Service下的Instance实例的健康状态,引用官方代码注解的原话

Check and update statues of ephemeral instances, remove them if they have been expired

接着对Service创建之后就是注册Instance

1
2
3
4
5
6
7
8
9
10
11
12
13
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);

consistencyService.put(key, instances);
}

这里再次根据namespaceId以及serviceName获取service对象,并将Instance[]数组注册到service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {

Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

Map<String, Instance> oldInstanceMap = new HashMap<>(16);
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

for (Instance instance : currentIPs) {
map.put(instance.toIPAddr(), instance);
}
if (datum != null) {
oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
}

// use HashMap for deep copy:
HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
instanceMap.putAll(oldInstanceMap);

for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName());
cluster.setService(service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}

if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
instanceMap.put(instance.getDatumKey(), instance);
}

}

if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
+ JSON.toJSONString(instanceMap.values()));
}

return new ArrayList<>(instanceMap.values());
}

这里其实就是更新Instance注册后,需要更新Instance实例的addr信息,然后将更新后的Instance地址列表信息返回,更新Service下的Instance的列表信息,也就是刚刚的addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)函数的后面三行代码。到此,InstanceNacos Client端注册到Nacos Server的流程就完了。