Client端实例注册 简单分析Nacos是如何进行服务发现以及服务注册的
Nacos Server端是如何注册一个Instance的 Nacos Server端负责处理Service、Instance的创建、删除、更改的请求处理器在com.alibaba.nacos.naming.controllers包下的ServiceController以及InstanceController;这次先来解析一个Instance实例的注册请求在Nacos Server端是怎么被处理的。
处理Nacos Client端的instance注册请求 1 2 3 4 5 6 7 8 9 10 @CanDistro @RequestMapping (value = "" , method = RequestMethod.POST)public String register (HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok" ; }
请求处理器接收到注册Instance的请求后,先获取serviceName以及namespaceId信息,然后调用ServiceManager的registerInstance进行注册,在注册之前,需要先根据请求将Instance进行创建,注意,这里创建出来的Instance可不是之前介绍的在com.alibaba.nacos.api.naming.pojo下的Instance,这里创建的Instance是它的之类,多了很多在Server端需要用到的信息字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private Instance parseInstance (HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String app = WebUtils.optional(request, "app" , "DEFAULT" ); String metadata = WebUtils.optional(request, "metadata" , StringUtils.EMPTY); Instance instance = getIPAddress(request); instance.setApp(app); instance.setServiceName(serviceName); instance.setInstanceId(instance.generateInstanceId()); instance.setLastBeat(System.currentTimeMillis()); if (StringUtils.isNotEmpty(metadata)) { instance.setMetadata(UtilsAndCommons.parseMetadata(metadata)); } if (!instance.validate()) { throw new NacosException(NacosException.INVALID_PARAM, "instance format invalid:" + instance); } return instance; }
随后在跟进serviceManager.registerInstance方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null ) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
这里看到有一个createEmptyService方法,这是由于Nacos的Model设计——service --> cluster --> instance,所有instance是需要挂在service下面的,因此如果没有service的话,instance就无法注册,而server端不知道instance所挂的service是否存在,因此需要先执行这个createEmptyService方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void createEmptyService (String namespaceId, String serviceName, boolean local) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null ) { Loggers.SRV_LOG.info("creating empty service {}:{}" , namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); service.validate(); if (local) { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); } else { addOrReplaceService(service); } } }
在createEmptyService方法的末尾,有一个consistencyService.listen方法以及addOrReplaceService方法,为什么会执行两次的consistencyService.listener方法呢?这两个方法的作用是什么?现在来分析一下。
首先看到if (local)这个判断语句,其实是判断实例是临时实例还是持久化实例,当local=true时表示为AP模式;因为不同的实例注册模式对应的分布式一致性策略不同,当实例注册为临时实例时对应的是AP模式,实例注册为持久化实例时对应的是CP模式;在AP模式下,Service退化成一个字符串,仅仅是作为一个key的作用而存在,因此不用将Service进入一致性策略;如果是CP模式下的话,则需要将Service进入一致性策略
这两个方法都会调用ConsistencyService consistencyService进行一致性策略的选择,主要有两个一致性策略,如下面代码所示
1 2 3 4 5 @Autowired private PersistentConsistencyService persistentConsistencyService;@Autowired private EphemeralConsistencyService ephemeralConsistencyService;
这两个一致性策略的实现分别是PersistentConsistencyService对应RaftConsistencyServiceImpl;EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,这里引用阿里Nacos团队发表的文章中的一段话来大概描述下Nacos的一致性策略
目前的一致性协议实现,一个是基于简化的Raft的CP一致性,一个是基于自研协议Distro的AP一致性。Raft协议不必多言,基于Leader进行写入,其CP也并不是严格的,只是能保证一半所见一致,以及数据的丢失概率较小。Distro协议则是参考了内部ConfigServer和开源Eureka,在不借助第三方存储的情况下,实现基本大同小异。Distro重点是做了一些逻辑的优化和性能的调优
接下来就是简要分析下这两个一致性策略的实现了
RaftConsistencyServiceImpl
这里先放上一个Raft协议原理的动画演示 http://thesecretlivesofdata.com/raft/
对于注册形式为持久化的Instance来说,Nacos的一致性策略是采用了简化版的Raft协议
DistroConsistencyServiceImpl
DistroConsistency是针对注册形式为临时实例的Instance,该一致性策略其实是阿里Nacos团队根据他们内部的ConfigServer以及Eureka的一致性策略来实现的,Distro一致性策略的实现,在构造函数执行完后,会执行一个init函数,init函数内部提交了一个任务,任务里面执行了一个load函数以及提交了一个Notifier的Runnable;而这个load函数,会先判断Nacos的启动是单机模式还是集群模式,如果是集群模式那就很简单了;但是如果是集群模式下的话,就比较多了,首先会判断当前Nacos集群中的server数量是否为1,如果为1的话,表示当前集群中只有一个server实例,因此会让线程休眠一下,等待其他的server加入然后根据远程的server进行数据的远程同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @PostConstruct public void init () throws Exception { GlobalExecutor.submit(new Runnable() { @Override public void run () { try { load(); } catch (Exception e) { Loggers.EPHEMERAL.error("load data failed." , e); } } }); executor.submit(notifier); } public void load () throws Exception { if (SystemUtils.STANDALONE_MODE) { initialized = true ; return ; } while (serverListManager.getHealthyServers().size() <= 1 ) { Thread.sleep(1000L ); Loggers.EPHEMERAL.info("waiting server list init..." ); } for (Server server : serverListManager.getHealthyServers()) { if (NetUtils.localServer().equals(server.getKey())) { continue ; } if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("sync from " + server); } if (syncAllDataFromRemote(server)) { initialized = true ; return ; } } }
现在来看看syncAllDataFromRemote方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public boolean syncAllDataFromRemote (Server server) { try { byte [] data = NamingProxy.getAllData(server.getKey()); processData(data); return true ; } catch (Exception e) { Loggers.EPHEMERAL.error("sync full data from " + server + " failed!" , e); return false ; } } public static byte [] getAllData(String server) throws Exception { Map<String, String> params = new HashMap<>(8 ); HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params); if (HttpURLConnection.HTTP_OK == result.code) { return result.content.getBytes(); } throw new IOException("failed to req API: " + "http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.code + " msg: " + result.content); } public void processData (byte [] data) throws Exception { if (data.length > 0 ) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { if (switchDomain.isDefaultInstanceEphemeral()) { Loggers.EPHEMERAL.info("creating service {}" , entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0 ) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { Loggers.EPHEMERAL.warn("listener of {} not found." , entry.getKey()); continue ; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while execute listener of key: {}" , entry.getKey(), e); continue ; } dataStore.put(entry.getKey(), entry.getValue()); } } }
首先是执行远程获取数据的方法getAllData,获取到数据后,进行数据的反序列化操作
1 Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class)
然后进行遍历操作,通过两次if语句的筛选,确定了不存在service对象后,会对service进行初始化操作,然后相应的listener发起相应的onChange事件,而这里的listener则是com.alibaba.nacos.naming.core.ServiceManager对象,为什么会是ServiceManager呢,先看看listener总共有几个实现
Service、ServiceManager、SwitchManager,他们各自监听的对象分别为Instances、Service以及SwitchDomain,所以得出这个listener的实现为ServiceManager
;因此这个时候在跟进看看ServiceManager的onChange事件发生时候做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public void onChange (String key, Service service) throws Exception { try { if (service == null ) { Loggers.SRV_LOG.warn("received empty push from raft, key: {}" , key); return ; } if (StringUtils.isBlank(service.getNamespaceId())) { service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID); } Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}" , key, service); Service oldDom = getService(service.getNamespaceId(), service.getName()); if (oldDom != null ) { oldDom.update(service); } else { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}" , service.toJSON()); } } catch (Throwable e) { Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update" , e); } }
实际上就是根据传入的service对象,然后查找是否存在一个旧的service,如果存在则执行更新操作,否则就执行类似于createEmptyServer的方法。
而接着的下一个循环内部呢,还有一个listener.onChange事件,而这个时候的listener,就是com.alibaba.nacos.naming.core.Service这个对象了,来看看它的onChange事件做了什么吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public void onChange (String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}" , key, value); for (Instance instance : value.getInstanceList()) { if (instance == null ) { throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0 D) { instance.setWeight(10000.0 D); } if (instance.getWeight() < 0.01 D && instance.getWeight() > 0.0 D) { instance.setWeight(0.01 D); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
Service中的onChange事件,首先是对Instance的weight信息根据当前weight的值进行相应的重置操作,随后执行updateIPs操作,而这个updateIPs涉及了Service的updateIPs以及Cluster的updateIPs操作,其实就是根据远程Server的数据去对自己Service->Cluster->Instance这个模型的所有数据进行更新操作,以保证数据的一致性,其原理就类似于Eureka的一致性策略
在Eureka平台中,如果某台服务器宕机,Eureka不会有类似于ZooKeeper的选举leader的过程;客户端请求会自动切换到新的Eureka节点;当宕机的服务器重新恢复后,Eureka会再次将其纳入到服务器集群管理之中;而对于它来说,所有要做的无非是同步一些新的服务注册信息而已
执行完后,如果listener执行没有出错,则将dataStore存储的数据执行插入更新操作dataStore.put(entry.getKey(), entry.getValue())。然后再看看DistroConsistencyServiceImpl的init函数的第二个任务提交executor.submit(notifier)做了什么吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024 ); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024 ); public void addTask (String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return ; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize () { return tasks.size(); } @Override public void run () { Loggers.EPHEMERAL.info("distro notifier started" ); while (true ) { try { Pair pair = tasks.take(); if (pair == null ) { continue ; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0 ; if (!listeners.containsKey(datumKey)) { continue ; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue ; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue ; } } catch (Throwable e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {}" , datumKey, e); } } if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}" , datumKey, count, action.name()); } } catch (Throwable e) { Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task" , e); } } } }
Notifier其实就是不断的轮询service事件容器,然后让相应的listener执行onChange事件,而事件的发生,在DistroConsistencyServiceImpl的onPut以及onRemove操作中,对应着Service 加入以及离
执行完createEmptyService方法后,就是开始真正执行instance的注册操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); }
此函数中根据namespaceId以及serviceName获取对应的service信息,然后进入addIpAddresses函数进行更新service下的实例信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public List<Instance> updateIpAddresses (Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); Map<String, Instance> oldInstanceMap = new HashMap<>(16 ); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(), instance); } if (datum != null ) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map); } HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName()); cluster.setService(service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration." , instance.getClusterName(), instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); }
这个updateIpAddresses函数就是addIpAddresses函数的底层调用,它的作用就是根据新注册的Instance以及所属的service,对service下的instance实例列表信息进行更新操作,然后返回更新后的实例列表