Nacos是如何实现TCP端口测探方式检测实例是否存活

如何发起健康检查

在注册实例时,会需要带上集群信息;而集群信息在nacos-server端,会有相应的额外信息增加以及额外的初始化动作。其中,健康检查的设定操作如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Cluster
public void init() {
if (inited) {
return;
}
checkTask = new HealthCheckTask(this);
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}

// HealthCheckTask
public HealthCheckTask(Cluster cluster) {
this.cluster = cluster;
distroMapper = SpringContext.getAppContext().getBean(DistroMapper.class);
switchDomain = SpringContext.getAppContext().getBean(SwitchDomain.class);
healthCheckProcessor = SpringContext.getAppContext().getBean(HealthCheckProcessorDelegate.class);
initCheckRT();
}

Clusterinit函数中,可以看到在创建Cluster对象时,会创建一个HealthCheckTask,然后将该任务放入HealthCheckReactor中进行调度运行;而在创建HealthCheckTask时,会创建一个健康检查方式的策略对象HealthCheckProcessorDelegate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component("healthCheckDelegate")
public class HealthCheckProcessorDelegate implements HealthCheckProcessor {

private Map<String, HealthCheckProcessor> healthCheckProcessorMap
= new HashMap<>();

public HealthCheckProcessorDelegate(HealthCheckExtendProvider provider) {
provider.init();
}

@Autowired
public void addProcessor(Collection<HealthCheckProcessor> processors){
healthCheckProcessorMap.putAll(processors.stream()
.filter(processor -> processor.getType() != null)
.collect(Collectors.toMap(HealthCheckProcessor::getType, processor -> processor)));
}

@Override
public void process(HealthCheckTask task) {

// 典型的策略模式的的实现
String type = task.getCluster().getHealthChecker().getType();
HealthCheckProcessor processor = healthCheckProcessorMap.get(type);
if(processor == null){
processor = healthCheckProcessorMap.get("none");
}

processor.process(task);
}

@Override
public String getType() {
return null;
}
}

该对象在初始化后,会调用addProcessor(Collection<HealthCheckProcessor> processors)方法,将所有类型的HealthCheckProcessor加载。

加载完毕后,因为HealthCheckTask是一个任务,会被线程池中的线程调度,那么接下来要看的就是run方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public void run() {

try {
if (distroMapper.responsible(cluster.getService().getName()) &&
switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
healthCheckProcessor.process(this);
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
}
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}",
cluster.getService().getName(), cluster.getName(), e);
} finally {
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);

// worst == 0 means never checked
if (this.getCheckRTWorst() > 0
&& switchDomain.isHealthCheckEnabled(cluster.getService().getName())
&& distroMapper.responsible(cluster.getService().getName())) {
// TLog doesn't support float so we must convert it into long
long diff = ((this.getCheckRTLast() - this.getCheckRTLastLast()) * 10000)
/ this.getCheckRTLastLast();

this.setCheckRTLastLast(this.getCheckRTLast());

Cluster cluster = this.getCluster();
Loggers.CHECK_RT.info("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
this.getCheckRTNormalized(), this.getCheckRTWorst(), this.getCheckRTBest(),
this.getCheckRTLast(), diff);
}
}
}
}

首先回去判断集群是否开启了健康检查,如果开启的话,调用健康检查策略对象healthCheckProcessor执行process方法,执行健康检查,因为这里要介绍的TCP形式的健康检查,因此直接来看该方法执行跳转到TcpSuperSenseProcessorprocess方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {

...

@Override
public void process(HealthCheckTask task) {

// 首先,TCP 测探实例健康,必须要求实例为持久化实例
List<Instance> ips = task.getCluster().allIPs(false);

if (CollectionUtils.isEmpty(ips)) {
return;
}
Service service = task.getCluster().getService();
// 遍历集群中的所有服务实例
for (Instance ip : ips) {

if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
}
continue;
}

if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished, service: "
+ task.getCluster().getService().getName() + ":"
+ task.getCluster().getName() + ":"
+ ip.getIp() + ":"
+ ip.getPort());

healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}

// 创建心跳任务信息,加入任务队列中
Beat beat = new Beat(ip, task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}

// selector.wakeup();
}
...
}

首先遍历该集群下的所有服务实例,对每一个服务实例Instance,创建一个心跳信息Beat,然后加入任务队列taskQueue中;由于TcpSuperSenseProcessor本身也是一个Runnable,因此,这个taskQueue是会在TcpSuperSenseProcessor中不断被消费的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processTask() throws Exception {
Collection<Callable<Void>> tasks = new LinkedList<Callable<Void>>();
do {
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
if (beat == null) {
return;
}

tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);

for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) {
f.get();
}
}

@Override
public void run() {
while (true) {
try {
processTask();

int readyCount = selector.selectNow();
if (readyCount <= 0) {
continue;
}

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();

NIO_EXECUTOR.execute(new PostProcessor(key));
}
} catch (Throwable e) {
SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
}
}
}

run方法中,首先会调用processTask()方法,从taskQueue中弹出最大任务数量为NIO_THREAD_COUNT * 64TaskProcessor任务,然后调入线程池NIO_EXECUTOR.invokeAll(tasks)全部执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private class TaskProcessor implements Callable<Void> {

private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
Beat beat = null;

public TaskProcessor(Beat beat) {
this.beat = beat;
}

@Override
public Void call() {
long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}

SocketChannel channel = null;
try {
Instance instance = beat.getIp();
Cluster cluster = beat.getTask().getCluster();

BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.setBeingChecked(false);
return null;
}

beatKey.key.cancel();
beatKey.key.channel().close();
}

channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);

int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(), port));

SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));

beat.setStartTime(System.currentTimeMillis());

NIO_EXECUTOR.schedule(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());

if (channel != null) {
try {
channel.close();
} catch (Exception ignore) {
}
}
}

return null;
}
}

在这里nacos-server采用的是SocketChannel非阻塞的Socket,然后注册感兴趣的事件SelectionKey.OP_CONNECT | SelectionKey.OP_READ,同时将心跳信息作为附件放入key中——key.attach(beat);发起链接操作——channel.connect之后,会开启一个超时任务NIO_EXECUTOR.schedule(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);该任务根据延迟执行的方式,如果执行时,发现channel仍旧为链接成功,因此认为本次检查失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 超时任务
private static class TimeOutTask implements Runnable {
SelectionKey key;
...

@Override
public void run() {
if (key != null && key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
Beat beat = (Beat) key.attachment();

if (channel.isConnected()) {
return;
}

try {
// 调用此方法完成链接,然后将此移除,因为已经超过了规定的最大延迟时间
channel.finishConnect();
} catch (Exception ignore) {
}

try {
beat.finishCheck(false, false, beat.getTask().getCheckRTNormalized() * 2, "tcp:timeout");

key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}

接下来继续TcpSuperSenseProcessor中的run方法,执行完processTask()方法后,会通过selector.selectNow()判断当前已准备好的channle数量,然后通过selector.selectedKeys().iterator()访问“已选择键集(selected key set)”中的就绪通道,最终在循环中,通过创建PostProcessor完成最终的TCP健康检查步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PostProcessor implements Runnable {
SelectionKey key;
...

@Override
public void run() {
Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {
if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();

beat.finishCheck();
return;
}

if (key.isValid() && key.isConnectable()) {
//connected
channel.finishConnect();
beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+");
}

if (key.isValid() && key.isReadable()) {
//disconnected
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {
key.cancel();
key.channel().close();
} else {
// not terminate request, ignore
}
}
} catch (ConnectException e) {
// unable to connect, possibly port not opened
beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());

try {
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}

在此方法中最终完成TCP健康检查的所有操作,如果服务实例可以连接,则本次健康检查成功,随后,执行链接断开操作,至此,整个nacos-server端的TCP策略的健康检查流程完成