Nacos Client初始化流程

前期准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@SpringBootApplication
@EnableNacosDiscovery(globalProperties = @NacosProperties(serverAddr = "127.0.0.1:8848"))
public class HBaseApiApplication {

public static void main(String[] args) {
SpringApplication.run(HBaseApiApplication.class);
}

}

<!--more-->

@Configuration
public class NacosConfigure {

@NacosInjected
private NamingService namingService;

...

@PostConstruct
public void registerInstance() throws NacosException {
namingService.registerInstance(applicationName, ip, serverPort, clusterName);
}
}

那么Nacos Cliend端的NamingService是如何初始化的呢?现在就来跟着代码了解下

代码解析

NacosNamingService 的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void init(Properties properties) {
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
initNamespace(properties);
initEndpoint(properties);
initWebRootContext();
initCacheDir();
initLogName(properties);

eventDispatcher = new EventDispatcher();
serverProxy = new NamingProxy(namespace, endpoint, serverList);
serverProxy.setProperties(properties);
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

可以看到这个init函数有一个Properties参数,所有Nacos相关的设置参数都会放在这个对象中;然后会从properties对象中获取Nacos Server的所有addr信息,就是@NacosProperties(serverAddr = "127.0.0.1:8848")里面的参数

initNamespace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void initNamespace(Properties properties) {
String tmpNamespace = null;

tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
return namespace;
}
});

tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);
return namespace;
}
});

tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = CredentialService.getInstance().getCredential().getTenantId();
LogUtils.NAMING_LOGGER.info("initializer namespace from Credential Module " + namespace);
return namespace;
}
});

if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
}

tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
return UtilAndComs.DEFAULT_NAMESPACE_ID;
}
});
namespace = tmpNamespace;
}

这里是对namespace进行初始化的操作,总共有五次对namespace进行初始化设置,只要其中有一个成功了,那么其他的设置就会不执行

initEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void initEndpoint(final Properties properties) {
if (properties == null) {
return;
}
//这里通过 dubbo/sca 侧来初始化默认传入的是 true
boolean isUseEndpointParsingRule = Boolean.valueOf(properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE));
String endpointUrl;
if (isUseEndpointParsingRule) {
endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));
if (com.alibaba.nacos.client.utils.StringUtils.isNotBlank(endpointUrl)) {
serverList = "";
}
} else {
endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);
}

if (StringUtils.isBlank(endpointUrl)) {
return;
}

String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() {
@Override
public String call() {
return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);
}
});

endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {
@Override
public String call() {
return "8080";
}
});

endpoint = endpointUrl + ":" + endpointPort;
}

这里Nacos Client对于Nacos Server有两种发现,一种是直接根据ip:port的规则,另外一种则是通过域名的规则,如果设置了endpoint,则ip:port的设置会被自动舍弃,转为使用endpoint规则

initWebRootContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void initWebRootContext() {
// support the web context with ali-yun if the app deploy by EDAS
final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);
TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {
@Override
public void run() {
UtilAndComs.WEB_CONTEXT = webContext.indexOf("/") > -1 ? webContext
: "/" + webContext;

UtilAndComs.NACOS_URL_BASE = UtilAndComs.WEB_CONTEXT + "/v1/ns";
UtilAndComs.NACOS_URL_INSTANCE = UtilAndComs.NACOS_URL_BASE + "/instance";
}
});
}

此处为初始化web request addr信息的,主要是为了支持阿里云

support the web context with ali-yun if the app deploy by EDAS

如果应用程序由EDAS部署,则使用ali-yun支持Web上下文

initCacheDir

1
2
3
4
5
6
private void initCacheDir() {
cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
if (StringUtils.isEmpty(cacheDir)) {
cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
}
}

此处是在Nacos Client初始化一个缓存目录,用于缓存从Nacos Server端接收的数据,用于解决Nacos Server容灾;当Nacos Server端无法使用时或者Nacos Client启动时直接从缓存目录下读取

eventDispatcher = new EventDispatcher()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public EventDispatcher() {

executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});

executor.execute(new Notifier());
}

private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}

if (serviceInfo == null) {
continue;
}

try {
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());

if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
}
}

} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}

Nacos Client端实现了事务分发器,此处创建了一个单线程池Executor,并且规定创建的线程为守护线程;通过此Executor将不同的事件通过此事件分发器分发给不同的listener,具体的关于事件分派由Notifier去实现,NotifierchangedServices这个阻塞队列中获取服务器推送来的事件,然后根据事件的serviceInfo.getKey()获取与之相关的所有listener,然后将事件广播给所有的listener

serverProxy = new NamingProxy(namespace, endpoint, serverList)

1
2
3
4
5
6
7
8
9
10
11
12
13
public NamingProxy(String namespaceId, String endpoint, String serverList) {

this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}

initRefreshSrvIfNeed();
}

NamingProxy是处理所有Nacos Client发出的Http请求,首先根据先前初始化好的namespace作为namespaceIdNacos Server Addr信息进行相应的设置,这里还涉及了另一个方法initRefreshSrvIfNeed,但是initRefreshSrvIfNeed方法具体会不会执行与我们使用Nacos Client时的设置有关:如果我们是直接设置了Nacos Server Addr信息的话,是不会执行initRefreshSrvIfNeed方法的,直接就 return 了,因此该方法的执行,只有采用设置Endpoint(我觉得这应该是Nacos推荐的使用方式)

initRefreshSrvIfNeed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void initRefreshSrvIfNeed() {
if (StringUtils.isEmpty(endpoint)) {
return;
}

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.serverlist.updater");
t.setDaemon(true);
return t;
}
});

executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshSrvIfNeed();
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);

refreshSrvIfNeed();
}

创建了一个定时任务,去定时的更新Nacos Server Cluster集群中的各Nacos Server Addr信息,具体的任务执行由refreshSrvIfNeed函数执行,而该函数又会调用getServerListFromEndpoint请求到Endpoint,查询 Nacos 服务端的 IP 列表,具体理解参考云栖社区的博客阿里巴巴基于 Nacos 实现环境隔离的实践

nacos-endpoint-server-ip

beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties))

然后是创建一个心跳任务,心跳任务其实就是Nacos Client主动上报信息到Nacos Server,因此需要一个执行Http请求的对象——NamingProxy以及用一个工作线程数为threadCount心跳任务线程池的,具体的心跳任务由对象BeatProcessor执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;

executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});

executorService.schedule(new BeatProcessor(), 0, TimeUnit.MILLISECONDS);
}

BeatProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class BeatProcessor implements Runnable {

@Override
public void run() {
try {
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}

class BeatTask implements Runnable {

BeatInfo beatInfo;

public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}

@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
}
}
}

dom2Beat对象是一个ConcurrentHashMapp<String, BeatInfo>的数据结构,在简单分析Nacos是如何进行服务发现以及服务注册的中的registerInstance函数中创建的BeatInfo对象就放在这个容器中;然后通过循环遍历每个BeatInfo,判断在本次心跳任务中是否被调度执行过了,如果没有没执行过,则将BeatInfo再次包装成BeatTask放入心跳任务池中执行。在BeatTask执行时会从Nacos Server端获取到一个long类型的数据,这个返回数据其实是Nacos Server端告诉Nacos Client心跳任务执行的频率——每个多久进行一次心跳任务;最后,我们可以看到,在BeatProcessorfinally代码块,再次将自己放入了心跳任务池中执行,并且重新设置了任务执行的频率为clientBeatInterval,也就是从Nacos Server返回的数据

new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties))

来看看这个对象在创建时做了哪些工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});

this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}

this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
}

首先可以看到,创建了一个用于长轮询任务的线程池;然后根据loadCacheAtStart标志进行判断是否开启了loadCacheAtStart,如果开启了会根据cacheDir加载缓存目录下的关于Nacos Server的信息数据,同时还开启了一个与容灾有关的FailoverReactor

来看看FailoverReactor做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void init() {

executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

// backup file on startup if failover directory is empty.
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);

if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}

File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}

}
}, 10000L, TimeUnit.MILLISECONDS);
}

其实就是开启了两个与容灾缓存文件有关的任务

SwitchRefresher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SwitchRefresher implements Runnable {
long lastModifiedMillis = 0L;

@Override
public void run() {
try {
File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
if (!switchFile.exists()) {
switchParams.put("failover-mode", "false");
NAMING_LOGGER.debug("failover switch is not found, " + switchFile.getName());
return;
}

long modified = switchFile.lastModified();

if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
List<String> lines = Arrays.asList(failover.split(DiskCache.getLineSeperator()));

for (String line : lines) {
String line1 = line.trim();
if ("1".equals(line1)) {
switchParams.put("failover-mode", "true");
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
} else if ("0".equals(line1)) {
switchParams.put("failover-mode", "false");
NAMING_LOGGER.info("failover-mode is off");
}
}
} else {
switchParams.put("failover-mode", "false");
}
}

} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
}
}
}

该任务时定时的去读取容灾缓存目录下的文件,根据文件是否存在 or 文件的内容去更新与容灾开关相关的配置信息,将其写入到内存中

DiskFileWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class DiskFileWriter extends TimerTask {
public void run() {
Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(
serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY)
|| StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00")
|| StringUtils.equals(serviceInfo.getName(), "vipclient.properties")
|| StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
continue;
}
DiskCache.write(serviceInfo, failoverDir);
}
}
}

这里就是真正涉及容灾相关处理——Nacos Server端的信息写入容灾缓存目录下的文件中