Nacos Server端注册一个服务实例流程

Client端实例注册

简单分析Nacos是如何进行服务发现以及服务注册的

Nacos Server端是如何注册一个Instance的

Nacos Server端负责处理ServiceInstance的创建、删除、更改的请求处理器在com.alibaba.nacos.naming.controllers包下的ServiceController以及InstanceController;这次先来解析一个Instance实例的注册请求在Nacos Server端是怎么被处理的。

处理Nacos Client端的instance注册请求

1
2
3
4
5
6
7
8
9
10
@CanDistro
@RequestMapping(value = "", method = RequestMethod.POST)
public String register(HttpServletRequest request) throws Exception {

String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}

请求处理器接收到注册Instance的请求后,先获取serviceName以及namespaceId信息,然后调用ServiceManagerregisterInstance进行注册,在注册之前,需要先根据请求将Instance进行创建,注意,这里创建出来的Instance可不是之前介绍的在com.alibaba.nacos.api.naming.pojo下的Instance,这里创建的Instance是它的之类,多了很多在Server端需要用到的信息字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Instance parseInstance(HttpServletRequest request) throws Exception {

String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String app = WebUtils.optional(request, "app", "DEFAULT");
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);

Instance instance = getIPAddress(request);
instance.setApp(app);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.generateInstanceId());
instance.setLastBeat(System.currentTimeMillis());
if (StringUtils.isNotEmpty(metadata)) {
instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
}

if (!instance.validate()) {
throw new NacosException(NacosException.INVALID_PARAM, "instance format invalid:" + instance);
}

return instance;
}

随后在跟进serviceManager.registerInstance方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

// SDK端提供service的curd操作——可能是指这个,根据 instance 是否是临时的属性决定是否放入 serviceMap 中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());

// 根据 namespaceId 以及对应的 serviceName 获取所属的 service 对象信息
Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

这里看到有一个createEmptyService方法,这是由于NacosModel设计——service --> cluster --> instance,所有instance是需要挂在service下面的,因此如果没有service的话,instance就无法注册,而server端不知道instance所挂的service是否存在,因此需要先执行这个createEmptyService方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
Service service = getService(namespaceId, serviceName);
// 如果服务不存在,则创建一个新的服务——Service
if (service == null) {

Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);

// 设置服务所在的分组信息
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());

// 重新计算 service 的签名信息
service.recalculateChecksum();
// 对 cluster 进行校验
service.validate();
if (local) {
//将 service 放入容器中
putService(service);
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
} else {
addOrReplaceService(service);
}
}
}

createEmptyService方法的末尾,有一个consistencyService.listen方法以及addOrReplaceService方法,为什么会执行两次的consistencyService.listener方法呢?这两个方法的作用是什么?现在来分析一下。

首先看到if (local)这个判断语句,其实是判断实例是临时实例还是持久化实例,当local=true时表示为AP模式;因为不同的实例注册模式对应的分布式一致性策略不同,当实例注册为临时实例时对应的是AP模式,实例注册为持久化实例时对应的是CP模式;在AP模式下,Service退化成一个字符串,仅仅是作为一个key的作用而存在,因此不用将Service进入一致性策略;如果是CP模式下的话,则需要将Service进入一致性策略

这两个方法都会调用ConsistencyService consistencyService进行一致性策略的选择,主要有两个一致性策略,如下面代码所示

1
2
3
4
5
@Autowired
private PersistentConsistencyService persistentConsistencyService;

@Autowired
private EphemeralConsistencyService ephemeralConsistencyService;

这两个一致性策略的实现分别是PersistentConsistencyService对应RaftConsistencyServiceImplEphemeralConsistencyService对应的是DistroConsistencyServiceImpl,这里引用阿里Nacos团队发表的文章中的一段话来大概描述下Nacos的一致性策略

目前的一致性协议实现,一个是基于简化的Raft的CP一致性,一个是基于自研协议Distro的AP一致性。Raft协议不必多言,基于Leader进行写入,其CP也并不是严格的,只是能保证一半所见一致,以及数据的丢失概率较小。Distro协议则是参考了内部ConfigServer和开源Eureka,在不借助第三方存储的情况下,实现基本大同小异。Distro重点是做了一些逻辑的优化和性能的调优

Nacos一致性协议

接下来就是简要分析下这两个一致性策略的实现了

RaftConsistencyServiceImpl

这里先放上一个Raft协议原理的动画演示 http://thesecretlivesofdata.com/raft/

对于注册形式为持久化的Instance来说,Nacos的一致性策略是采用了简化版的Raft协议

DistroConsistencyServiceImpl

DistroConsistency是针对注册形式为临时实例的Instance,该一致性策略其实是阿里Nacos团队根据他们内部的ConfigServer以及Eureka的一致性策略来实现的,Distro一致性策略的实现,在构造函数执行完后,会执行一个init函数,init函数内部提交了一个任务,任务里面执行了一个load函数以及提交了一个NotifierRunnable;而这个load函数,会先判断Nacos的启动是单机模式还是集群模式,如果是集群模式那就很简单了;但是如果是集群模式下的话,就比较多了,首先会判断当前Nacos集群中的server数量是否为1,如果为1的话,表示当前集群中只有一个server实例,因此会让线程休眠一下,等待其他的server加入然后根据远程的server进行数据的远程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@PostConstruct
public void init() throws Exception {
GlobalExecutor.submit(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
Loggers.EPHEMERAL.error("load data failed.", e);
}
}
});
executor.submit(notifier);
}

public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.EPHEMERAL.info("waiting server list init...");
}

for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.EPHEMERAL.isDebugEnabled()) {
Loggers.EPHEMERAL.debug("sync from " + server);
}
// try sync data from remote server:
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}

现在来看看syncAllDataFromRemote方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public boolean syncAllDataFromRemote(Server server) {

try {
byte[] data = NamingProxy.getAllData(server.getKey());
processData(data);
return true;
} catch (Exception e) {
Loggers.EPHEMERAL.error("sync full data from " + server + " failed!", e);
return false;
}
}

public static byte[] getAllData(String server) throws Exception {

Map<String, String> params = new HashMap<>(8);
HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params);

if (HttpURLConnection.HTTP_OK == result.code) {
return result.content.getBytes();
}

throw new IOException("failed to req API: " + "http://" + server
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: "
+ result.code + " msg: " + result.content);
}

public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);

for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());

if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.EPHEMERAL.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// 这里获取到的 listener 是 ServiceManager
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}

for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {

if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.EPHEMERAL.warn("listener of {} not found.", entry.getKey());
continue;
}

try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}

// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}

首先是执行远程获取数据的方法getAllData,获取到数据后,进行数据的反序列化操作

1
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class)

然后进行遍历操作,通过两次if语句的筛选,确定了不存在service对象后,会对service进行初始化操作,然后相应的listener发起相应的onChange事件,而这里的listener则是com.alibaba.nacos.naming.core.ServiceManager对象,为什么会是ServiceManager呢,先看看listener总共有几个实现

ServiceServiceManagerSwitchManager,他们各自监听的对象分别为InstancesService以及SwitchDomain,所以得出这个listener的实现为ServiceManager

;因此这个时候在跟进看看ServiceManageronChange事件发生时候做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void onChange(String key, Service service) throws Exception {
try {
if (service == null) {
Loggers.SRV_LOG.warn("received empty push from raft, key: {}", key);
return;
}

if (StringUtils.isBlank(service.getNamespaceId())) {
service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
}

Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}", key, service);

Service oldDom = getService(service.getNamespaceId(), service.getName());

if (oldDom != null) {
oldDom.update(service);
} else {
putService(service);
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);
}
}

实际上就是根据传入的service对象,然后查找是否存在一个旧的service,如果存在则执行更新操作,否则就执行类似于createEmptyServer的方法。

而接着的下一个循环内部呢,还有一个listener.onChange事件,而这个时候的listener,就是com.alibaba.nacos.naming.core.Service这个对象了,来看看它的onChange事件做了什么吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onChange(String key, Instances value) throws Exception {

Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

for (Instance instance : value.getInstanceList()) {

if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}

if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}

if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}

updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}

Service中的onChange事件,首先是对Instanceweight信息根据当前weight的值进行相应的重置操作,随后执行updateIPs操作,而这个updateIPs涉及了ServiceupdateIPs以及ClusterupdateIPs操作,其实就是根据远程Server的数据去对自己Service->Cluster->Instance这个模型的所有数据进行更新操作,以保证数据的一致性,其原理就类似于Eureka的一致性策略

在Eureka平台中,如果某台服务器宕机,Eureka不会有类似于ZooKeeper的选举leader的过程;客户端请求会自动切换到新的Eureka节点;当宕机的服务器重新恢复后,Eureka会再次将其纳入到服务器集群管理之中;而对于它来说,所有要做的无非是同步一些新的服务注册信息而已

执行完后,如果listener执行没有出错,则将dataStore存储的数据执行插入更新操作dataStore.put(entry.getKey(), entry.getValue())。然后再看看DistroConsistencyServiceImplinit函数的第二个任务提交executor.submit(notifier)做了什么吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class Notifier implements Runnable {

private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

public void addTask(String datumKey, ApplyAction action) {

if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}

public int getTaskSize() {
return tasks.size();
}

@Override
public void run() {
Loggers.EPHEMERAL.info("distro notifier started");

while (true) {
try {

Pair pair = tasks.take();

if (pair == null) {
continue;
}

String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();

services.remove(datumKey);

int count = 0;

if (!listeners.containsKey(datumKey)) {
continue;
}

for (RecordListener listener : listeners.get(datumKey)) {

count++;

try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}

if (Loggers.EPHEMERAL.isDebugEnabled()) {
Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}

Notifier其实就是不断的轮询service事件容器,然后让相应的listener执行onChange事件,而事件的发生,在DistroConsistencyServiceImplonPut以及onRemove操作中,对应着Service 加入以及离

执行完createEmptyService方法后,就是开始真正执行instance的注册操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);

// 将该 service 下的 instance 进行更新(根据 ephemeral 进行存储策略选择)
consistencyService.put(key, instances);
}

此函数中根据namespaceId以及serviceName获取对应的service信息,然后进入addIpAddresses函数进行更新service下的实例信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 更新实例地址信息
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {

Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

// 旧的 instance 信息
Map<String, Instance> oldInstanceMap = new HashMap<>(16);

// 根据 instance 的 ephemeral 参数属性获取 instance 列表信息
List<Instance> currentIPs = service.allIPs(ephemeral);

// 使用 ConcurrentHashMap 的原因
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

for (Instance instance : currentIPs) {
map.put(instance.toIPAddr(), instance);
}

if (datum != null) {
oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
}

// use HashMap for deep copy:
HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
instanceMap.putAll(oldInstanceMap);

// 遍历传进来的 instance 列表
for (Instance instance : ips) {

// 判断 instance 所属的集群信息是否存在,如果不存在则创建对应的 cluster 对象
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName());
cluster.setService(service);
cluster.init();

// 将创建的 cluster 信息更新至 service 所管理的 clusterMap 中
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}

// 根据 Command 策略决定 instanceMap 对 instance 的操作
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
instanceMap.put(instance.getDatumKey(), instance);
}

}

if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
+ JSON.toJSONString(instanceMap.values()));
}

// 返回更新后的 instance 信息
return new ArrayList<>(instanceMap.values());
}

这个updateIpAddresses函数就是addIpAddresses函数的底层调用,它的作用就是根据新注册的Instance以及所属的service,对service下的instance实例列表信息进行更新操作,然后返回更新后的实例列表