publicvoidload()throws Exception { if (SystemUtils.STANDALONE_MODE) { initialized = true; return; } // size = 1 means only myself in the list, we need at least one another server alive: // 集群模式下,需要等待至少两个节点才可以将逻辑进行 while (serverListManager.getHealthyServers().size() <= 1) { Thread.sleep(1000L); Loggers.DISTRO.info("waiting server list init..."); }
// 获取所有健康的集群节点 for (Server server : serverListManager.getHealthyServers()) { // 自己则不需要进行数据同步广播操作 if (NetUtils.localServer().equals(server.getKey())) { continue; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync from " + server); } // 从别的服务器进行全量数据拉取操作,只需要执行一次即可,剩下的交由增量同步任务去完成 if (syncAllDataFromRemote(server)) { initialized = true; return; } } }
全量数据拉取的动作
数据拉取执行者的动作
1 2 3 4 5 6 7 8 9 10 11 12
publicbooleansyncAllDataFromRemote(Server server){ try { // 获取数据 byte[] data = NamingProxy.getAllData(server.getKey()); // 接收到的数据进行处理 processData(data); returntrue; } catch (Exception e) { Loggers.DISTRO.error("sync full data from " + server + " failed!", e); returnfalse; } }
// 对数据进行遍历处理 for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { // 数据放入数据存储容器——DataStore中 dataStore.put(entry.getKey(), entry.getValue()); // 判断监听器是否包含了对这个Key的监听,如果没有,表明是一个新的数据 if (!listeners.containsKey(entry.getKey())) { // pretty sure the service not exist: if (switchDomain.isDefaultInstanceEphemeral()) { // create empty service Loggers.DISTRO.info("creating service {}", entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); // 回调 Listener 监听器,告知新的Service数据 listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } // 进行 Listener 的监听回调 for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { // Should not happen: Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); continue; }
try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e); continue; }
// Update data store if listener executed successfully: dataStore.put(entry.getKey(), entry.getValue()); } } }
if (CollectionUtils.isEmpty(healthyList)) { // means distro config is not ready yet returnfalse; }
int index = healthyList.indexOf(NetUtils.localServer()); int lastIndex = healthyList.lastIndexOf(NetUtils.localServer()); if (lastIndex < 0 || index < 0) { returntrue; }
int target = distroHash(serviceName) % healthyList.size(); return target >= index && target <= lastIndex; }
// 根据 ServiceName 找到权威 Server 的地址 public String mapSrv(String serviceName){ if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) { return NetUtils.localServer(); }
// 任务提交 publicvoidsubmit(SyncTask task, long delay){ // If it's a new task: if (task.getRetryCount() == 0) { // 遍历所有的任务 Key Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) { String key = iterator.next(); // 数据任务放入 Map 中,避免数据同步任务重复提交 if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) { // associated key already exist: if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync already in process, key: {}", key); } // 如果任务已经存在,则移除该任务的 Key iterator.remove(); } } } // 如果所有的任务都已经移除了,结束本次任务提交 if (task.getKeys().isEmpty()) { // all keys are removed: return; } // 异步任务执行数据同步 GlobalExecutor.submitDataSync(() -> { // 1. check the server if (getServers() == null || getServers().isEmpty()) { Loggers.SRV_LOG.warn("try to sync data but server list is empty."); return; } // 获取数据同步任务的实际同步数据 List<String> keys = task.getKeys(); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys); } // 2. get the datums by keys and check the datum is empty or not // 通过key进行批量数据获取 Map<String, Datum> datumMap = dataStore.batchGet(keys); // 如果数据已经被移除了,取消本次任务 if (datumMap == null || datumMap.isEmpty()) { // clear all flags of this task: for (String key : keys) { taskMap.remove(buildKey(key, task.getTargetServer())); } return; } // 数据序列化 byte[] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); // 进行增量数据同步提交给其他节点 boolean success = NamingProxy.syncData(data, task.getTargetServer()); // 如果本次数据同步任务失败,则重新创建SyncTask,设置重试的次数信息 if (!success) { SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); } else { // clear all flags of this task: for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } } }, delay); }
// 任务重试 publicvoidretrySync(SyncTask syncTask){ Server server = new Server(); server.setIp(syncTask.getTargetServer().split(":")[0]); server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1])); if (!getServers().contains(server)) { // if server is no longer in healthy server list, ignore this task: return; } // TODO may choose other retry policy. // 自动延迟重试任务的下次执行时间 submit(syncTask, partitionConfig.getSyncRetryDelay()); }