Nacos疑问之为什么我服务明明下线了却还是可以调用到?

疑问

之前在参与nacos的开发过程中,有不少同学都在问,为什么我在nacos console中将服务进行下线了,但是这个被下线的服务还是可以被调用到,这不太符合官方宣称的秒级上下线特点呀。经过进一步询问发现,那些存在说实例下线后依旧可以对外提供服务的问题,有一个共同的特点——都有rabbion这个负载均衡的组件。因此本文将从两个方面探讨这个问题:nacos的秒级上下线的实现方式以及rabbion的实例更新机制导致实例上下线感知延迟

Nacos 秒级上下线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@CanDistro
@RequestMapping(value = "", method = RequestMethod.PUT)
public String update(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

String agent = request.getHeader("Client-Version");
if (StringUtils.isBlank(agent)) {
agent = request.getHeader("User-Agent");
}

ClientInfo clientInfo = new ClientInfo(agent);

if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
serviceManager.updateInstance(namespaceId, serviceName, parseInstance(request));
} else {
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
}
return "ok";
}

上面就是nacos console端实例上下线的接口,parseInstance(request)方法就是从request中提取instance实例信息。而背后的updateInstance方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

if (!service.allIPs().contains(instance)) {
throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);
}

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);

consistencyService.put(key, instances);
}

接下来的方法就和之前的博文Nacos Server端注册一个服务实例流程一样了。因此在nacos console中一旦点击实例下线,是立马更新nacos naming server中的实例信息数据的。

Rabbion的实例更新机制

首先看nacos实现的rabbion的实例拉取代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class NacosServerList extends AbstractServerList<NacosServer> {

private NacosDiscoveryProperties discoveryProperties;

private String serviceId;

public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
this.discoveryProperties = discoveryProperties;
}

@Override
public List<NacosServer> getInitialListOfServers() {
return getServers();
}

@Override
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}

private List<NacosServer> getServers() {
try {
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}

private List<NacosServer> instancesToServerList(List<Instance> instances) {
List<NacosServer> result = new ArrayList<>();
if (null == instances) {
return result;
}
for (Instance instance : instances) {
result.add(new NacosServer(instance));
}

return result;
}

public String getServiceId() {
return serviceId;
}

@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
this.serviceId = iClientConfig.getClientName();
}
}

可以看到NacosServerList继承了AbstractServerList,那么这个AbstractServerList最终在哪里被收集呢?通过代码跟踪可以看到,最终是在DynamicServerListLoadBalancer这个类中被收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};

public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.NIWSServerListClassName, DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
// 获取所有ServerList接口的实现类
this.serverListImpl = niwsServerListImpl;

// 获取Filter(对拉取的servers列表实行过滤操作)
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}

// 获取获取ServerListUpdater对象实现类类名
String serverListUpdaterClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.ServerListUpdaterClassName, DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS);

// 获取ServerListUpdater对象(实际对象为PollingServerListUpdater)
this.serverListUpdater = (ServerListUpdater) ClientFactory.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

// 初始化或者重置
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}

void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
// 开启定时任务,这个任务就是定时刷新实例信息缓存
enableAndInitLearnNewServersFeature();

// 开启前进行一次实例拉取操作
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections() .primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

// 这里就是进行实例信息缓存更新的操作
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 调用拉取新实例信息的方法
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);

// 用Filter对拉取的servers列表进行更新
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
}
}
// 更新实例列表
updateAllServerList(servers);
}

来看看enableAndInitLearnNewServersFeature();的最终调用是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
// 这里的UpdateAction对象就是在DynamicServerListLoadBalancer中封装的updateListOfServers实现
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};

// 默认任务执行时间间隔为30s
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS);
} else {
logger.info("Already active, no-op");
}
}

因此不难看出,虽然nacos实现了秒级的实例上下线,但是由于在Spring Cloud中,负载组件rabbion的实例信息更新是采用了定时任务的形式,有可能这个任务上一秒刚刚执行完,下一秒你就执行实例上下线操作,那么rabbion要感知这个变化,就必须要等待refreshIntervalMs秒后才可以感知到。