Client端实例注册 简单分析Nacos是如何进行服务发现以及服务注册的
Nacos Server端是如何注册一个Instance的 Nacos Server端负责处理Service
、Instance
的创建、删除、更改的请求处理器在com.alibaba.nacos.naming.controllers
包下的ServiceController
以及InstanceController
;这次先来解析一个Instance
实例的注册请求在Nacos Server
端是怎么被处理的。
处理Nacos Client
端的instance
注册请求 1 2 3 4 5 6 7 8 9 10 @CanDistro @RequestMapping (value = "" , method = RequestMethod.POST)public String register (HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok" ; }
请求处理器接收到注册Instance
的请求后,先获取serviceName
以及namespaceId
信息,然后调用ServiceManager
的registerInstance
进行注册,在注册之前,需要先根据请求将Instance
进行创建,注意,这里创建出来的Instance
可不是之前介绍的在com.alibaba.nacos.api.naming.pojo
下的Instance
,这里创建的Instance
是它的之类,多了很多在Server
端需要用到的信息字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private Instance parseInstance (HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String app = WebUtils.optional(request, "app" , "DEFAULT" ); String metadata = WebUtils.optional(request, "metadata" , StringUtils.EMPTY); Instance instance = getIPAddress(request); instance.setApp(app); instance.setServiceName(serviceName); instance.setInstanceId(instance.generateInstanceId()); instance.setLastBeat(System.currentTimeMillis()); if (StringUtils.isNotEmpty(metadata)) { instance.setMetadata(UtilsAndCommons.parseMetadata(metadata)); } if (!instance.validate()) { throw new NacosException(NacosException.INVALID_PARAM, "instance format invalid:" + instance); } return instance; }
随后在跟进serviceManager.registerInstance
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null ) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
这里看到有一个createEmptyService
方法,这是由于Nacos
的Model
设计——service --> cluster --> instance
,所有instance
是需要挂在service
下面的,因此如果没有service
的话,instance
就无法注册,而server
端不知道instance
所挂的service
是否存在,因此需要先执行这个createEmptyService
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void createEmptyService (String namespaceId, String serviceName, boolean local) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null ) { Loggers.SRV_LOG.info("creating empty service {}:{}" , namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); service.validate(); if (local) { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); } else { addOrReplaceService(service); } } }
在createEmptyService
方法的末尾,有一个consistencyService.listen
方法以及addOrReplaceService
方法,为什么会执行两次的consistencyService.listener
方法呢?这两个方法的作用是什么?现在来分析一下。
首先看到if (local)
这个判断语句,其实是判断实例是临时实例还是持久化实例,当local=true
时表示为AP模式;因为不同的实例注册模式对应的分布式一致性策略不同,当实例注册为临时实例时对应的是AP模式,实例注册为持久化实例时对应的是CP模式;在AP模式下,Service
退化成一个字符串,仅仅是作为一个key
的作用而存在,因此不用将Service
进入一致性策略;如果是CP模式下的话,则需要将Service
进入一致性策略
这两个方法都会调用ConsistencyService consistencyService
进行一致性策略的选择,主要有两个一致性策略,如下面代码所示
1 2 3 4 5 @Autowired private PersistentConsistencyService persistentConsistencyService;@Autowired private EphemeralConsistencyService ephemeralConsistencyService;
这两个一致性策略的实现分别是PersistentConsistencyService
对应RaftConsistencyServiceImpl
;EphemeralConsistencyService
对应的是DistroConsistencyServiceImpl
,这里引用阿里Nacos
团队发表的文章中的一段话来大概描述下Nacos
的一致性策略
目前的一致性协议实现,一个是基于简化的Raft的CP一致性,一个是基于自研协议Distro的AP一致性。Raft协议不必多言,基于Leader进行写入,其CP也并不是严格的,只是能保证一半所见一致,以及数据的丢失概率较小。Distro协议则是参考了内部ConfigServer和开源Eureka,在不借助第三方存储的情况下,实现基本大同小异。Distro重点是做了一些逻辑的优化和性能的调优
接下来就是简要分析下这两个一致性策略的实现了
RaftConsistencyServiceImpl
这里先放上一个Raft
协议原理的动画演示 http://thesecretlivesofdata.com/raft/
对于注册形式为持久化的Instance
来说,Nacos
的一致性策略是采用了简化版的Raft
协议
DistroConsistencyServiceImpl
DistroConsistency
是针对注册形式为临时实例的Instance
,该一致性策略其实是阿里Nacos
团队根据他们内部的ConfigServer
以及Eureka
的一致性策略来实现的,Distro
一致性策略的实现,在构造函数执行完后,会执行一个init
函数,init
函数内部提交了一个任务,任务里面执行了一个load
函数以及提交了一个Notifier
的Runnable
;而这个load
函数,会先判断Nacos
的启动是单机模式还是集群模式,如果是集群模式那就很简单了;但是如果是集群模式下的话,就比较多了,首先会判断当前Nacos
集群中的server
数量是否为1,如果为1的话,表示当前集群中只有一个server
实例,因此会让线程休眠一下,等待其他的server
加入然后根据远程的server
进行数据的远程同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @PostConstruct public void init () throws Exception { GlobalExecutor.submit(new Runnable() { @Override public void run () { try { load(); } catch (Exception e) { Loggers.EPHEMERAL.error("load data failed." , e); } } }); executor.submit(notifier); } public void load () throws Exception { if (SystemUtils.STANDALONE_MODE) { initialized = true ; return ; } while (serverListManager.getHealthyServers().size() <= 1 ) { Thread.sleep(1000L ); Loggers.EPHEMERAL.info("waiting server list init..." ); } for (Server server : serverListManager.getHealthyServers()) { if (NetUtils.localServer().equals(server.getKey())) { continue ; } if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("sync from " + server); } if (syncAllDataFromRemote(server)) { initialized = true ; return ; } } }
现在来看看syncAllDataFromRemote
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public boolean syncAllDataFromRemote (Server server) { try { byte [] data = NamingProxy.getAllData(server.getKey()); processData(data); return true ; } catch (Exception e) { Loggers.EPHEMERAL.error("sync full data from " + server + " failed!" , e); return false ; } } public static byte [] getAllData(String server) throws Exception { Map<String, String> params = new HashMap<>(8 ); HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params); if (HttpURLConnection.HTTP_OK == result.code) { return result.content.getBytes(); } throw new IOException("failed to req API: " + "http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.code + " msg: " + result.content); } public void processData (byte [] data) throws Exception { if (data.length > 0 ) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { if (switchDomain.isDefaultInstanceEphemeral()) { Loggers.EPHEMERAL.info("creating service {}" , entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0 ) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { Loggers.EPHEMERAL.warn("listener of {} not found." , entry.getKey()); continue ; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while execute listener of key: {}" , entry.getKey(), e); continue ; } dataStore.put(entry.getKey(), entry.getValue()); } } }
首先是执行远程获取数据的方法getAllData
,获取到数据后,进行数据的反序列化操作
1 Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class)
然后进行遍历操作,通过两次if
语句的筛选,确定了不存在service
对象后,会对service
进行初始化操作,然后相应的listener
发起相应的onChange
事件,而这里的listener
则是com.alibaba.nacos.naming.core.ServiceManager
对象,为什么会是ServiceManager
呢,先看看listener
总共有几个实现
Service
、ServiceManager
、SwitchManager
,他们各自监听的对象分别为Instances
、Service
以及SwitchDomain
,所以得出这个listener
的实现为ServiceManager
;因此这个时候在跟进看看ServiceManager
的onChange
事件发生时候做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public void onChange (String key, Service service) throws Exception { try { if (service == null ) { Loggers.SRV_LOG.warn("received empty push from raft, key: {}" , key); return ; } if (StringUtils.isBlank(service.getNamespaceId())) { service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID); } Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}" , key, service); Service oldDom = getService(service.getNamespaceId(), service.getName()); if (oldDom != null ) { oldDom.update(service); } else { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}" , service.toJSON()); } } catch (Throwable e) { Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update" , e); } }
实际上就是根据传入的service
对象,然后查找是否存在一个旧的service
,如果存在则执行更新操作,否则就执行类似于createEmptyServer
的方法。
而接着的下一个循环内部呢,还有一个listener.onChange
事件,而这个时候的listener
,就是com.alibaba.nacos.naming.core.Service
这个对象了,来看看它的onChange
事件做了什么吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public void onChange (String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}" , key, value); for (Instance instance : value.getInstanceList()) { if (instance == null ) { throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0 D) { instance.setWeight(10000.0 D); } if (instance.getWeight() < 0.01 D && instance.getWeight() > 0.0 D) { instance.setWeight(0.01 D); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
Service
中的onChange
事件,首先是对Instance
的weight
信息根据当前weight
的值进行相应的重置操作,随后执行updateIPs
操作,而这个updateIPs
涉及了Service
的updateIPs
以及Cluster
的updateIPs
操作,其实就是根据远程Server
的数据去对自己Service
->Cluster
->Instance
这个模型的所有数据进行更新操作,以保证数据的一致性,其原理就类似于Eureka
的一致性策略
在Eureka平台中,如果某台服务器宕机,Eureka不会有类似于ZooKeeper的选举leader的过程;客户端请求会自动切换到新的Eureka节点;当宕机的服务器重新恢复后,Eureka会再次将其纳入到服务器集群管理之中;而对于它来说,所有要做的无非是同步一些新的服务注册信息而已
执行完后,如果listener
执行没有出错,则将dataStore
存储的数据执行插入更新操作dataStore.put(entry.getKey(), entry.getValue())
。然后再看看DistroConsistencyServiceImpl
的init
函数的第二个任务提交executor.submit(notifier)
做了什么吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024 ); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024 ); public void addTask (String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return ; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize () { return tasks.size(); } @Override public void run () { Loggers.EPHEMERAL.info("distro notifier started" ); while (true ) { try { Pair pair = tasks.take(); if (pair == null ) { continue ; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0 ; if (!listeners.containsKey(datumKey)) { continue ; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue ; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue ; } } catch (Throwable e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {}" , datumKey, e); } } if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}" , datumKey, count, action.name()); } } catch (Throwable e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task" , e); } } } }
Notifier
其实就是不断的轮询service
事件容器,然后让相应的listener
执行onChange
事件,而事件的发生,在DistroConsistencyServiceImpl
的onPut
以及onRemove
操作中,对应着Service
加入以及离
执行完createEmptyService
方法后,就是开始真正执行instance
的注册操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); }
此函数中根据namespaceId
以及serviceName
获取对应的service
信息,然后进入addIpAddresses
函数进行更新service
下的实例信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public List<Instance> updateIpAddresses (Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); Map<String, Instance> oldInstanceMap = new HashMap<>(16 ); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(), instance); } if (datum != null ) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map); } HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName()); cluster.setService(service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration." , instance.getClusterName(), instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); }
这个updateIpAddresses
函数就是addIpAddresses
函数的底层调用,它的作用就是根据新注册的Instance
以及所属的service
,对service
下的instance
实例列表信息进行更新操作,然后返回更新后的实例列表