Eureka 一致性策略 Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存
DiscoveryClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void fetchRegistryFromBackup () { try { @SuppressWarnings ("deprecation" ) BackupRegistry backupRegistryInstance = newBackupRegistryInstance(); if (null == backupRegistryInstance) { backupRegistryInstance = backupRegistryProvider.get(); } if (null != backupRegistryInstance) { Applications apps = null ; if (isFetchingRemoteRegionRegistries()) { String remoteRegionsStr = remoteRegionsToFetch.get(); if (null != remoteRegionsStr) { apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split("," )); } } else { apps = backupRegistryInstance.fetchRegistry(); } if (apps != null ) { final Applications applications = this .filterAndShuffle(apps); applications.setAppsHashCode(applications.getReconcileHashCode()); localRegionApps.set(applications); logTotalInstances(); logger.info("Fetched registry successfully from the backup" ); } } else { logger.warn("No backup registry instance defined & unable to find any discovery servers." ); } } catch (Throwable e) { logger.warn("Cannot fetch applications from apps although backup registry was specified" , e); } }
正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)
Nacos一致性策略——Distro Nacos在AP模式下的一致性策略就类似于Eureka,采用Server
之间互相的数据同步来实现数据在集群中的同步、复制操作。
触发数据广播 1 2 3 4 5 6 7 DistroConsistencyServiceImpl.java @Override public void put (String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); }
当调用ConsistencyService
接口定义的put
、remove
方法时,涉及到了Server
端数据的变更,此时会创建一个任务,将数据的key
传入taskDispatcher.addTask
方法中,用于后面数据变更时数据查找操作
1 2 3 4 5 TaskDispatcher.java public void addTask (String key) { taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key); }
这里有一个方法需要注意——shakeUp
,查看官方代码注解可知这是将key
(key
可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler
对象,确保每个TaskScheduler
所承担的任务都差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class TaskScheduler implements Runnable { private int dataSize = 0 ; private long lastDispatchTime = 0L ; private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024 ); ... public void addTask (String key) { queue.offer(key); } @Override public void run () { List<String> keys = new ArrayList<>(); while (true ) { try { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.EPHEMERAL.debug("got key: {}" , key); } if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) { continue ; } if (StringUtils.isBlank(key)) { continue ; } if (dataSize == 0 ) { keys = new ArrayList<>(); } keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue ; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.EPHEMERAL.debug("add sync task: {}" , JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0 ); } lastDispatchTime = System.currentTimeMillis(); dataSize = 0 ; } } catch (Exception e) { Loggers.EPHEMERAL.error("dispatch sync task failed." , e); } } } }
核心方法就是for (Server member : dataSyncer.getServers()) {..}
循环体内的代码,此处就是将数据在Nacos Server
中进行广播操作;具体步骤如下
- 创建`SyncTask`,并设置事件集合(就是`key`集合)
- 将目标`Server`信息设置到`SyncTask`中——`syncTask.setTargetServer(member.getKey())`
- 将数据广播任务提交到`DataSyncer`中
执行数据广播——DataSyncer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public void submit (SyncTask task, long delay) { if (task.getRetryCount() == 0 ) { Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) { if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("sync already in process, key: {}" , key); } iterator.remove(); } } } if (task.getKeys().isEmpty()) { return ; } GlobalExecutor.submitDataSync(new Runnable() { @Override public void run () { try { if (servers == null || servers.isEmpty()) { Loggers.SRV_LOG.warn("try to sync data but server list is empty." ); return ; } List<String> keys = task.getKeys(); if (Loggers.EPHEMERAL.isDebugEnabled()) { Loggers.EPHEMERAL.debug("sync keys: {}" , keys); } Map<String, Datum> datumMap = dataStore.batchGet(keys); if (datumMap == null || datumMap.isEmpty()) { for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } return ; } byte [] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); boolean success = NamingProxy.syncData(data, task.getTargetServer()); if (!success) { SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1 ); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); } else { for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } } } catch (Exception e) { Loggers.EPHEMERAL.error("sync data failed." , e); } } }, delay); }
GlobalExecutor.submitDataSync(Runnable runnable)
提交一个数据广播任务;首先通过SyncTask
中的key
集合去DataStore
中去查询key
所对应的数据集合,然后对数据进行序列化操作,转为byte[]
数组后,执行Http
请求操作——NamingProxy.syncData(data, task.getTargetServer())
;如果数据广播失败,则将任务重新打包再次压入GlobalExecutor
中
(这里有一个疑问,SyncTask记录了任务重试的次数,但是却没有根据该次数做一些判断,比如超过多少次server未响应可能是server挂掉了,这里仅仅是记录了重试的次数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static boolean syncData (byte [] data, String curServer) throws Exception { try { Map<String, String> headers = new HashMap<>(128 ); headers.put("Client-Version" , UtilsAndCommons.SERVER_VERSION); headers.put("User-Agent" , UtilsAndCommons.SERVER_VERSION); headers.put("Accept-Encoding" , "gzip,deflate,sdch" ); headers.put("Connection" , "Keep-Alive" ); headers.put("Content-Encoding" , "gzip" ); HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data); if (HttpURLConnection.HTTP_OK == result.code) { return true ; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return true ; } throw new IOException("failed to req API:" + "http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.code + " msg: " + result.content); } catch (Exception e) { Loggers.SRV_LOG.warn("NamingProxy" , e); } return false ; }
这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum
中,而该URL对应的处理器为DistroController
中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public String onSyncDatum (HttpServletRequest request, HttpServletResponse response) throws Exception { String entity = IOUtils.toString(request.getInputStream(), "UTF-8" ); if (StringUtils.isBlank(entity)) { Loggers.EPHEMERAL.error("[onSync] receive empty entity!" ); throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!" ); } Map<String, Datum<Instances>> dataMap = serializer.deserializeMap(entity.getBytes(), Instances.class); for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) { if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) { String namespaceId = KeyBuilder.getNamespace(entry.getKey()); String serviceName = KeyBuilder.getServiceName(entry.getKey()); if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) { serviceManager.createEmptyService(namespaceId, serviceName, true ); } consistencyService.onPut(entry.getKey(), entry.getValue().value); } } return "ok" ; }
这里会调用consistencyService.onPut(entry.getKey(), entry.getValue().value)
方法进行数据的更新,注意,onPut
方法并不会涉及taskDispatcher.addTask(key);
操作,而是将数据更新压入了Notifier
的Task
列表中(Notifier
的作用看Nacos Server端注册一个服务实例流程 );至此完成了Nacos Server
在AP模式下的数据的最终一致性操作。