前期准备 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf 4j@SpringBootApplication @EnableNacosDiscovery (globalProperties = @NacosProperties (serverAddr = "127.0.0.1:8848" ))public class HBaseApiApplication { public static void main (String[] args) { SpringApplication.run(HBaseApiApplication.class); } } <!--more--> @Configuration public class NacosConfigure { @NacosInjected private NamingService namingService; ... @PostConstruct public void registerInstance () throws NacosException { namingService.registerInstance(applicationName, ip, serverPort, clusterName); } }
那么Nacos Cliend
端的NamingService
是如何初始化的呢?现在就来跟着代码了解下
代码解析
NacosNamingService 的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void init (Properties properties) { serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR); initNamespace(properties); initEndpoint(properties); initWebRootContext(); initCacheDir(); initLogName(properties); eventDispatcher = new EventDispatcher(); serverProxy = new NamingProxy(namespace, endpoint, serverList); serverProxy.setProperties(properties); beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties)); hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); }
可以看到这个init
函数有一个Properties
参数,所有Nacos
相关的设置参数都会放在这个对象中;然后会从properties
对象中获取Nacos Server
的所有addr
信息,就是@NacosProperties(serverAddr = "127.0.0.1:8848")
里面的参数
initNamespace 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private void initNamespace (Properties properties) { String tmpNamespace = null ; tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = System.getProperty(PropertyKeyConst.NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace); return namespace; } }); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { String namespace = CredentialService.getInstance().getCredential().getTenantId(); LogUtils.NAMING_LOGGER.info("initializer namespace from Credential Module " + namespace); return namespace; } }); if (StringUtils.isEmpty(tmpNamespace) && properties != null ) { tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE); } tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call () { return UtilAndComs.DEFAULT_NAMESPACE_ID; } }); namespace = tmpNamespace; }
这里是对namespace
进行初始化的操作,总共有五次对namespace
进行初始化设置,只要其中有一个成功了,那么其他的设置就会不执行
initEndpoint 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void initEndpoint (final Properties properties) { if (properties == null ) { return ; } boolean isUseEndpointParsingRule = Boolean.valueOf(properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)); String endpointUrl; if (isUseEndpointParsingRule) { endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT)); if (com.alibaba.nacos.client.utils.StringUtils.isNotBlank(endpointUrl)) { serverList = "" ; } } else { endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT); } if (StringUtils.isBlank(endpointUrl)) { return ; } String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() { @Override public String call () { return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT); } }); endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() { @Override public String call () { return "8080" ; } }); endpoint = endpointUrl + ":" + endpointPort; }
这里Nacos Client
对于Nacos Server
有两种发现,一种是直接根据ip:port
的规则,另外一种则是通过域名的规则,如果设置了endpoint
,则ip:port
的设置会被自动舍弃,转为使用endpoint
规则
initWebRootContext 1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void initWebRootContext () { final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT); TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() { @Override public void run () { UtilAndComs.WEB_CONTEXT = webContext.indexOf("/" ) > -1 ? webContext : "/" + webContext; UtilAndComs.NACOS_URL_BASE = UtilAndComs.WEB_CONTEXT + "/v1/ns" ; UtilAndComs.NACOS_URL_INSTANCE = UtilAndComs.NACOS_URL_BASE + "/instance" ; } }); }
此处为初始化web request addr
信息的,主要是为了支持阿里云
support the web context with ali-yun if the app deploy by EDAS
如果应用程序由EDAS部署,则使用ali-yun支持Web上下文
initCacheDir 1 2 3 4 5 6 private void initCacheDir () { cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir" ); if (StringUtils.isEmpty(cacheDir)) { cacheDir = System.getProperty("user.home" ) + "/nacos/naming/" + namespace; } }
此处是在Nacos Client
初始化一个缓存目录,用于缓存从Nacos Server
端接收的数据,用于解决Nacos Server
容灾;当Nacos Server
端无法使用时或者Nacos Client
启动时直接从缓存目录下读取
eventDispatcher = new EventDispatcher() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public EventDispatcher () { executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener" ); thread.setDaemon(true ); return thread; } }); executor.execute(new Notifier()); } private class Notifier implements Runnable { @Override public void run () { while (true ) { ServiceInfo serviceInfo = null ; try { serviceInfo = changedServices.poll(5 , TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null ) { continue ; } try { List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } }
Nacos Client
端实现了事务分发器,此处创建了一个单线程池Executor
,并且规定创建的线程为守护线程;通过此Executor
将不同的事件通过此事件分发器分发给不同的listener
,具体的关于事件分派由Notifier
去实现,Notifier
从changedServices
这个阻塞队列中获取服务器推送来的事件,然后根据事件的serviceInfo.getKey()
获取与之相关的所有listener
,然后将事件广播给所有的listener
serverProxy = new NamingProxy(namespace, endpoint, serverList) 1 2 3 4 5 6 7 8 9 10 11 12 13 public NamingProxy (String namespaceId, String endpoint, String serverList) { this .namespaceId = namespaceId; this .endpoint = endpoint; if (StringUtils.isNotEmpty(serverList)) { this .serverList = Arrays.asList(serverList.split("," )); if (this .serverList.size() == 1 ) { this .nacosDomain = serverList; } } initRefreshSrvIfNeed(); }
NamingProxy
是处理所有Nacos Client
发出的Http
请求,首先根据先前初始化好的namespace
作为namespaceId
、Nacos Server Addr
信息进行相应的设置,这里还涉及了另一个方法initRefreshSrvIfNeed
,但是initRefreshSrvIfNeed
方法具体会不会执行与我们使用Nacos Client
时的设置有关:如果我们是直接设置了Nacos Server Addr
信息的话,是不会执行initRefreshSrvIfNeed
方法的,直接就 return 了,因此该方法的执行,只有采用设置Endpoint
(我觉得这应该是Nacos
推荐的使用方式)
initRefreshSrvIfNeed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void initRefreshSrvIfNeed () { if (StringUtils.isEmpty(endpoint)) { return ; } ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1 , new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.naming.serverlist.updater" ); t.setDaemon(true ); return t; } }); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run () { refreshSrvIfNeed(); } }, 0 , vipSrvRefInterMillis, TimeUnit.MILLISECONDS); refreshSrvIfNeed(); }
创建了一个定时任务,去定时的更新Nacos Server Cluster
集群中的各Nacos Server Addr
信息,具体的任务执行由refreshSrvIfNeed
函数执行,而该函数又会调用getServerListFromEndpoint
请求到Endpoint
,查询 Nacos
服务端的 IP 列表,具体理解参考云栖社区的博客阿里巴巴基于 Nacos 实现环境隔离的实践
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties)) 然后是创建一个心跳任务,心跳任务其实就是Nacos Client
主动上报信息到Nacos Server
,因此需要一个执行Http
请求的对象——NamingProxy
以及用一个工作线程数为threadCount
心跳任务线程池的,具体的心跳任务由对象BeatProcessor
执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public BeatReactor (NamingProxy serverProxy, int threadCount) { this .serverProxy = serverProxy; executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("com.alibaba.nacos.naming.beat.sender" ); return thread; } }); executorService.schedule(new BeatProcessor(), 0 , TimeUnit.MILLISECONDS); }
BeatProcessor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class BeatProcessor implements Runnable { @Override public void run () { try { for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) { BeatInfo beatInfo = entry.getValue(); if (beatInfo.isScheduled()) { continue ; } beatInfo.setScheduled(true ); executorService.schedule(new BeatTask(beatInfo), 0 , TimeUnit.MILLISECONDS); } } catch (Exception e) { NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat." , e); } finally { executorService.schedule(this , clientBeatInterval, TimeUnit.MILLISECONDS); } } } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask (BeatInfo beatInfo) { this .beatInfo = beatInfo; } @Override public void run () { long result = serverProxy.sendBeat(beatInfo); beatInfo.setScheduled(false ); if (result > 0 ) { clientBeatInterval = result; } } }
dom2Beat
对象是一个ConcurrentHashMapp<String, BeatInfo>
的数据结构,在简单分析Nacos是如何进行服务发现以及服务注册的 中的registerInstance
函数中创建的BeatInfo
对象就放在这个容器中;然后通过循环遍历每个BeatInfo
,判断在本次心跳任务中是否被调度执行过了,如果没有没执行过,则将BeatInfo
再次包装成BeatTask
放入心跳任务池中执行。在BeatTask
执行时会从Nacos Server
端获取到一个long
类型的数据,这个返回数据其实是Nacos Server
端告诉Nacos Client
心跳任务执行的频率——每个多久进行一次心跳任务;最后,我们可以看到,在BeatProcessor
的finally
代码块,再次将自己放入了心跳任务池中执行,并且重新设置了任务执行的频率为clientBeatInterval
,也就是从Nacos Server
返回的数据
new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)) 来看看这个对象在创建时做了哪些工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public HostReactor (EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("com.alibaba.nacos.client.naming.updater" ); return thread; } }); this .eventDispatcher = eventDispatcher; this .serverProxy = serverProxy; this .cacheDir = cacheDir; if (loadCacheAtStart) { this .serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this .cacheDir)); } else { this .serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16 ); } this .updatingMap = new ConcurrentHashMap<String, Object>(); this .failoverReactor = new FailoverReactor(this , cacheDir); this .pushReceiver = new PushReceiver(this ); }
首先可以看到,创建了一个用于长轮询任务的线程池;然后根据loadCacheAtStart
标志进行判断是否开启了loadCacheAtStart
,如果开启了会根据cacheDir
加载缓存目录下的关于Nacos Server
的信息数据,同时还开启了一个与容灾有关的FailoverReactor
来看看FailoverReactor
做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void init () { executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L , 5000L , TimeUnit.MILLISECONDS); executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30 , DAY_PERIOD_MINUTES, TimeUnit.MINUTES); executorService.schedule(new Runnable() { @Override public void run () { try { File cacheDir = new File(failoverDir); if (!cacheDir.exists() && !cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + failoverDir); } File[] files = cacheDir.listFiles(); if (files == null || files.length <= 0 ) { new DiskFileWriter().run(); } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to backup file on startup." , e); } } }, 10000L , TimeUnit.MILLISECONDS); }
其实就是开启了两个与容灾缓存文件有关的任务
SwitchRefresher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L ; @Override public void run () { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); if (!switchFile.exists()) { switchParams.put("failover-mode" , "false" ); NAMING_LOGGER.debug("failover switch is not found, " + switchFile.getName()); return ; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { List<String> lines = Arrays.asList(failover.split(DiskCache.getLineSeperator())); for (String line : lines) { String line1 = line.trim(); if ("1" .equals(line1)) { switchParams.put("failover-mode" , "true" ); NAMING_LOGGER.info("failover-mode is on" ); new FailoverFileReader().run(); } else if ("0" .equals(line1)) { switchParams.put("failover-mode" , "false" ); NAMING_LOGGER.info("failover-mode is off" ); } } } else { switchParams.put("failover-mode" , "false" ); } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch." , e); } } }
该任务时定时的去读取容灾缓存目录下的文件,根据文件是否存在 or 文件的内容去更新与容灾开关相关的配置信息,将其写入到内存中
DiskFileWriter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class DiskFileWriter extends TimerTask { public void run () { Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap(); for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals( serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00" ) || StringUtils.equals(serviceInfo.getName(), "vipclient.properties" ) || StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00" )) { continue ; } DiskCache.write(serviceInfo, failoverDir); } } }
这里就是真正涉及容灾相关处理——Nacos Server
端的信息写入容灾缓存目录下的文件中