如何发起健康检查 在注册实例时,会需要带上集群信息;而集群信息在nacos-server
端,会有相应的额外信息增加以及额外的初始化动作。其中,健康检查的设定操作如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void init () { if (inited) { return ; } checkTask = new HealthCheckTask(this ); HealthCheckReactor.scheduleCheck(checkTask); inited = true ; } public HealthCheckTask (Cluster cluster) { this .cluster = cluster; distroMapper = SpringContext.getAppContext().getBean(DistroMapper.class); switchDomain = SpringContext.getAppContext().getBean(SwitchDomain.class); healthCheckProcessor = SpringContext.getAppContext().getBean(HealthCheckProcessorDelegate.class); initCheckRT(); }
在Cluster
的init
函数中,可以看到在创建Cluster
对象时,会创建一个HealthCheckTask
,然后将该任务放入HealthCheckReactor
中进行调度运行;而在创建HealthCheckTask
时,会创建一个健康检查方式的策略对象HealthCheckProcessorDelegate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Component ("healthCheckDelegate" )public class HealthCheckProcessorDelegate implements HealthCheckProcessor { private Map<String, HealthCheckProcessor> healthCheckProcessorMap = new HashMap<>(); public HealthCheckProcessorDelegate (HealthCheckExtendProvider provider) { provider.init(); } @Autowired public void addProcessor (Collection<HealthCheckProcessor> processors) { healthCheckProcessorMap.putAll(processors.stream() .filter(processor -> processor.getType() != null ) .collect(Collectors.toMap(HealthCheckProcessor::getType, processor -> processor))); } @Override public void process (HealthCheckTask task) { String type = task.getCluster().getHealthChecker().getType(); HealthCheckProcessor processor = healthCheckProcessorMap.get(type); if (processor == null ){ processor = healthCheckProcessorMap.get("none" ); } processor.process(task); } @Override public String getType () { return null ; } }
该对象在初始化后,会调用addProcessor(Collection<HealthCheckProcessor> processors)
方法,将所有类型的HealthCheckProcessor
加载。
加载完毕后,因为HealthCheckTask
是一个任务,会被线程池中的线程调度,那么接下来要看的就是run
方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public void run () { try { if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) { healthCheckProcessor.process(this ); if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}" , cluster.getService().getName()); } } } catch (Throwable e) { Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}" , cluster.getService().getName(), cluster.getName(), e); } finally { if (!cancelled) { HealthCheckReactor.scheduleCheck(this ); if (this .getCheckRTWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName()) && distroMapper.responsible(cluster.getService().getName())) { long diff = ((this .getCheckRTLast() - this .getCheckRTLastLast()) * 10000 ) / this .getCheckRTLastLast(); this .setCheckRTLastLast(this .getCheckRTLast()); Cluster cluster = this .getCluster(); Loggers.CHECK_RT.info("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}" , cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(), this .getCheckRTNormalized(), this .getCheckRTWorst(), this .getCheckRTBest(), this .getCheckRTLast(), diff); } } } }
首先回去判断集群是否开启了健康检查,如果开启的话,调用健康检查策略对象healthCheckProcessor
执行process
方法,执行健康检查,因为这里要介绍的TCP
形式的健康检查,因此直接来看该方法执行跳转到TcpSuperSenseProcessor
的process
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class TcpSuperSenseProcessor implements HealthCheckProcessor , Runnable { ... @Override public void process (HealthCheckTask task) { List<Instance> ips = task.getCluster().allIPs(false ); if (CollectionUtils.isEmpty(ips)) { return ; } Service service = task.getCluster().getService(); for (Instance ip : ips) { if (ip.isMarked()) { if (SRV_LOG.isDebugEnabled()) { SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp()); } continue ; } if (!ip.markChecking()) { SRV_LOG.warn("tcp check started before last one finished, service: " + task.getCluster().getService().getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":" + ip.getPort()); healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2 , task, switchDomain.getTcpHealthParams()); continue ; } Beat beat = new Beat(ip, task); taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); } } ... }
首先遍历该集群下的所有服务实例,对每一个服务实例Instance
,创建一个心跳信息Beat
,然后加入任务队列taskQueue
中;由于TcpSuperSenseProcessor
本身也是一个Runnable
,因此,这个taskQueue
是会在TcpSuperSenseProcessor
中不断被消费的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void processTask () throws Exception { Collection<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2 , TimeUnit.MILLISECONDS); if (beat == null ) { return ; } tasks.add(new TaskProcessor(beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64 ); for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) { f.get(); } } @Override public void run () { while (true ) { try { processTask(); int readyCount = selector.selectNow(); if (readyCount <= 0 ) { continue ; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); NIO_EXECUTOR.execute(new PostProcessor(key)); } } catch (Throwable e) { SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task" , e); } } }
在run
方法中,首先会调用processTask()
方法,从taskQueue
中弹出最大任务数量为NIO_THREAD_COUNT * 64
的TaskProcessor
任务,然后调入线程池NIO_EXECUTOR.invokeAll(tasks)
全部执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private class TaskProcessor implements Callable <Void > { private static final int MAX_WAIT_TIME_MILLISECONDS = 500 ; Beat beat = null ; public TaskProcessor (Beat beat) { this .beat = beat; } @Override public Void call () { long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) { Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms" ); } SocketChannel channel = null ; try { Instance instance = beat.getIp(); Cluster cluster = beat.getTask().getCluster(); BeatKey beatKey = keyMap.get(beat.toString()); if (beatKey != null && beatKey.key.isValid()) { if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) { instance.setBeingChecked(false ); return null ; } beatKey.key.cancel(); beatKey.key.channel().close(); } channel = SocketChannel.open(); channel.configureBlocking(false ); channel.socket().setSoLinger(false , -1 ); channel.socket().setReuseAddress(true ); channel.socket().setKeepAlive(true ); channel.socket().setTcpNoDelay(true ); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress(instance.getIp(), port)); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey(key)); beat.setStartTime(System.currentTimeMillis()); NIO_EXECUTOR.schedule(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false , false , switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null ) { try { channel.close(); } catch (Exception ignore) { } } } return null ; } }
在这里nacos-server
采用的是SocketChannel
非阻塞的Socket
,然后注册感兴趣的事件SelectionKey.OP_CONNECT | SelectionKey.OP_READ
,同时将心跳信息作为附件放入key
中——key.attach(beat)
;发起链接操作——channel.connect
之后,会开启一个超时任务NIO_EXECUTOR.schedule(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
该任务根据延迟执行的方式,如果执行时,发现channel
仍旧为链接成功,因此认为本次检查失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private static class TimeOutTask implements Runnable { SelectionKey key; ... @Override public void run () { if (key != null && key.isValid()) { SocketChannel channel = (SocketChannel) key.channel(); Beat beat = (Beat) key.attachment(); if (channel.isConnected()) { return ; } try { channel.finishConnect(); } catch (Exception ignore) { } try { beat.finishCheck(false , false , beat.getTask().getCheckRTNormalized() * 2 , "tcp:timeout" ); key.cancel(); key.channel().close(); } catch (Exception ignore) { } } } }
接下来继续TcpSuperSenseProcessor
中的run
方法,执行完processTask()
方法后,会通过selector.selectNow()
判断当前已准备好的channle数量,然后通过selector.selectedKeys().iterator()
访问“已选择键集(selected key set)”中的就绪通道,最终在循环中,通过创建PostProcessor
完成最终的TCP
健康检查步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class PostProcessor implements Runnable { SelectionKey key; ... @Override public void run () { Beat beat = (Beat) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); try { if (!beat.isHealthy()) { key.cancel(); key.channel().close(); beat.finishCheck(); return ; } if (key.isValid() && key.isConnectable()) { channel.finishConnect(); beat.finishCheck(true , false , System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+" ); } if (key.isValid() && key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(128 ); if (channel.read(buffer) == -1 ) { key.cancel(); key.channel().close(); } else { } } } catch (ConnectException e) { beat.finishCheck(false , true , switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage()); } catch (Exception e) { beat.finishCheck(false , false , switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); try { key.cancel(); key.channel().close(); } catch (Exception ignore) { } } } }
在此方法中最终完成TCP
健康检查的所有操作,如果服务实例可以连接,则本次健康检查成功,随后,执行链接断开操作,至此,整个nacos-server
端的TCP
策略的健康检查流程完成