nacos是如何知道哪些客户端订阅了某项服务的

为什么会存在订阅者?

因为存在某些需求,我的服务不想注册到服务发现中心去,我只是想安安静静的做一个索取者,我只想获取可以调用的服务而已

nacos是如何知道哪些客户端订阅了服务的

在nacos-client中,如果要获取服务实例,那么会调用这个方法

1
2
// String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
serverProxy.queryList(serviceName, clusters, 0, false);

在这个方法中看到了一个参数udpPort,等等会用到;这个方法最终调用的HTTP-URL为/v1/ns/instance/list,接着去看nacos-naming-server对应的该controller做了什么事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RequestMapping(value = "/list", method = RequestMethod.GET)
public JSONObject list(HttpServletRequest request) throws Exception {

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String agent = request.getHeader("Client-Version");
if (StringUtils.isBlank(agent)) {
agent = request.getHeader("User-Agent");
}
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);
}

这里其实是对请求进行参数的一些提取操作,最终的执行方法是在doSrvIPXT,这里只截取部分代码(关于nacos是如何存储订阅者的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

// 根据 agent 创建 ClientInfo实例(该对象是 nacos 多语言 SDK 对应的信息)
ClientInfo clientInfo = new ClientInfo(agent);
JSONObject result = new JSONObject();
Service service = serviceManager.getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.NOT_FOUND, "service not found: " + serviceName);
}

checkIfDisabled(service);
long cacheMillis = switchDomain.getDefaultCacheMillis();

// now try to enable the push
try {
// 此处注册订阅者信息到server端
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService.addClient(namespaceId, serviceName,
clusters,
agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource,
tid,
app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-API] failed to added push client", e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
...
}

此处可以看到PushService会进行判断是否允许执行主动下推消息到客户端以及客户端对应的UDP PORT是否合法,如果合法的话进行创建对应的PushClient对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PushClient client = new PushService.PushClient(namespaceId,
serviceName,
clusters,
agent,
socketAddr,
dataSource,
tenant,
app);

public static void addClient(PushClient client) {
// client is stored by key 'serviceName' because notify event is driven by serviceName change
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
// 判断是否已存在此client订阅信息
if (clients == null) {
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<String, PushClient>(1024));
clients = clientMap.get(serviceKey);
}

// 如果存在老的订阅者信息
PushClient oldClient = clients.get(client.toString());
if (oldClient != null) {
// 刷新时间标志信心
oldClient.refresh();
} else {
// 将Client存入容器中
PushClient res = clients.putIfAbsent(client.toString(), client);
if (res != null) {
Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
}
Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
}
}

到这里,nacos server就将订阅了服务的Client信息都存储起来了。现在来看看在哪些情况下nacos-server会主动推送消息到PushClient

更新Instance实例健康信息时

1
2
3
4
5
6
7
@CanDistro
@RequestMapping(value = {"", "/instance"}, method = RequestMethod.PUT)
public String update(HttpServletRequest request) throws Exception {
...
pushService.serviceChanged(namespaceId, service.getName());
...
}

在心跳任务中穿插主动消息下发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ClientBeatCheckTask implements Runnable {

@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
...
getPushService().serviceChanged(service.getNamespaceId(), service.getName());
...
}
}
}

实例健康检查 HealthCheckCommon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void checkOK(Instance ip, HealthCheckTask task, String msg) {
...
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
...
}

public void checkFail(Instance ip, HealthCheckTask task, String msg) {
...
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
...
}

public void checkFailNow(Instance ip, HealthCheckTask task, String msg) {
...
pushService.serviceChanged(vDom.getNamespaceId(), vDom.getName());
...
}

Service的onChange事件

1
2
3
4
5
6
7
8
9
10
11
public void onChange(String key, Instances value) throws Exception {
...
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
...
}

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
...
getPushService().serviceChanged(namespaceId, getName());
...
}

以上的方法,在某些条件下都会触发nacos server的主动推送消息到订阅者,当这些事件发生时,调用了serviceChangde将订阅者所感兴趣的服务信息数据主动推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 当服务端数据存在变动时,
public void serviceChanged(final String namespaceId, final String serviceName) {

// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
return;
}

Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");

// 获取当前匹配事件类型的所有监听客户端 PushClient
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}

// cache :由于订阅的是某一类服务的数据信息
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
// 如果当前客户端为僵尸客户端,则移除
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}

Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());

// 计算唯一性key用于服务数据获取任务的去重,注意,此处的key创建只使用了 serviceName 以及 client.agent()
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}

if (compressData != null) {
// 发送数据的准备工作
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
// 获取所订阅服务的信息数据
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
// cache中压入 Pair对象
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
// 进行数据发送
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}

Nacos Client接收到推送的信息做了什么

nacos client对应的接收nacos serverUDP主动信息下发的实现在com.alibaba.nacos.client.naming.core.PushReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class PushReceiver implements Runnable {

private ScheduledExecutorService executorService;
public static final int UDP_MSS = 64 * 1024;
private DatagramSocket udpSocket;
private HostReactor hostReactor;

public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();

// 初始化时,将自己压入 ExecutorService中进行任务执行
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});

// 本身是一个 Runnable,进入线程池中进行调度操作
executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

@Override
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
// 创建用于接收来自 nacos server 推送来的数据信息
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// 设置数据接收放在buffer缓冲区,并且该方法是一个阻塞操作
udpSocket.receive(packet);

// 数据 json 格式化
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 对订阅者当前订阅的服务信息进行更新
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}

// 数据包发送
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")), ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}

...
}