DNS协议 DNS
协议其本身就是一个天然的分布式服务发现中心,通过域名映射IP,用户只需要记住域名即可访问对应的服务。
纵观Nacos
、Eureka
、Etcd
以及Zookeeper
,这些充当服务发现中心的组件,其都需要一个Client-SDK
为上层应用提供根据服务名查找到一个机器IP的能力,而这就带来了一个问题,应用必须要集成这一些组件的客户端SDK,一旦组件的SDK出现问题需要升级时,就需要推动应用方的升级,这是不想见到的。而DNS协议,天然跨语言,因此Consul
或者Kubernetes
都选择使用DNS
协议作为服务发现,直接解决了跨语言的问题。
Nacos的DNS
官网介绍
通过支持权重路由,动态DNS服务能让您轻松实现中间层负载均衡、更灵活的路由策略、流量控制以及简单数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以DNS协议为基础的服务发现,以消除耦合到厂商私有服务发现API上的风险。
目前,Nacos
的DNS实现,是依赖了CoreDNS
,其项目在nacos-coredns-plugin 。
Java版本的DNS 由于目前Nacos-Client
的SDK
其功能比较完备的属于Java
版本,因此考虑使用Java
去实现一个Nacos DNS
插件
依赖包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 plugins { id 'java' } group 'com.alibaba.nacos.dns' version '0.0.1' sourceCompatibility = 1.8 jar { manifest { attributes 'Main-Class' : 'com.conf.nacos.dns.Main' } from { configurations .compile .collect { it.isDirectory() ? it : zipTree(it) } } into ('assets' ) { from 'assets' } } tasks.withType(JavaCompile) { options .encoding = 'UTF-8' } repositories { mavenCentral() } dependencies { compile group : 'com.alibaba.nacos' , name: 'nacos-client' , version: '1.3.0' compile group : 'dnsjava' , name: 'dnsjava' , version: '3.1.0' compile group : 'org.slf4j' , name: 'slf4j-api' , version: '1.7.30' testCompile group : 'junit' , name: 'junit' , version: '4.12' }
实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 public class DnsServer { private static final ThreadLocal<ByteBuffer> THREAD_LOCAL = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1024 )); private static final Logger LOGGER = LoggerFactory.getLogger(DnsServer.class); private static NacosDnsCore nacosDnsCore; private static DatagramChannel serverChannel; private static Selector selector = null ; private static final Executor executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), new ThreadFactory() { AtomicInteger workerId = new AtomicInteger(); @Override public Thread newThread (Runnable r) { final Thread t = new Thread(r); t.setName("com.conf.nacos.dns.worker-" + workerId.getAndIncrement()); t.setDaemon(true ); return t; } }); public static DnsServer create () throws NacosDnsException { return new DnsServer(); } private DnsServer () throws NacosDnsException { try { init(); nacosDnsCore = new NacosDnsCore(); } catch (Throwable ex) { throw new NacosDnsException(Code.CREATE_DNS_SERVER_FAILED, ex); } } private void init () throws Exception { AccessController.doPrivileged((PrivilegedAction<Void>) () -> { try { selector = Selector.open(); serverChannel = DatagramChannel.open(); serverChannel.socket().bind(new InetSocketAddress("127.0.0.1" , 53 )); serverChannel.configureBlocking(false ); serverChannel.register(selector, SelectionKey.OP_READ); } catch (Throwable ex) { throw new NacosDnsException(Code.CREATE_DNS_SERVER_FAILED, ex); } return null ; }); } public void start () { LOGGER.info("dns-server starting" ); final ByteBuffer buffer = ByteBuffer.allocate(1024 ); for ( ; ; ) { try { selector.select(); for (SelectionKey key : selector.selectedKeys()) { if (key.isReadable()) { buffer.clear(); SocketAddress client = serverChannel.receive(buffer); buffer.flip(); byte [] requestData = new byte [buffer.limit()]; buffer.get(requestData, 0 , requestData.length); executor.execute(() -> handler(requestData, client)); } } } catch (Throwable ex) { LOGGER.error("handler client request has error : {}" , ExceptionUtil.getStackTrace(ex)); } } } private void handler (final byte [] data, final SocketAddress client) { final ByteBuffer buffer = THREAD_LOCAL.get(); buffer.clear(); try { final Message message = new Message(data); final Record question = message.getQuestion(); final String domain = question.getName().toString(); final Record response = createRecordByQuery(question, domain); final Message out = message.clone(); out.addRecord(response, Section.ANSWER); buffer.put(out.toWire()); buffer.flip(); serverChannel.send(buffer, client); } catch (Throwable ex) { LOGGER.error("response to client has error : {}" , ExceptionUtil.getStackTrace(ex)); } finally { THREAD_LOCAL.set(buffer); } } private static Record createRecordByQuery (final Record request, final String domain) { InstanceRecord record = nacosDnsCore.selectOne(domain); if (record == null ) { return NULLRecord.newRecord(request.getName(), request.getType(), request.getDClass()); } InetSocketAddress address = new InetSocketAddress(record.getIp(), record.getPort()); return new ARecord(request.getName(), request.getDClass(), request.getTTL(), address.getAddress()); } }
DnsServer
就完成了DNS
协议报文的处理以及如何回复,而NacosDnsCore
是处理上层的DnsServer
告知一个域名之后,怎么样去根据域名,找一个机器的IP地址返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 public class NacosDnsCore { private static final Logger logger = LoggerFactory.getLogger(NacosDnsCore.class); private static final Map<String, List<InstanceRecord>> serviceMap = new ConcurrentHashMap<>( 32 ); private static String LOAD_BALANCER_NAME = Constants.RANDOM_LOAD_BALANCER; private static final Map<String, LoadBalancer> balancers = new HashMap<>(); private static NamingService nacosClient; public NacosDnsCore () throws Throwable { try (InputStream stream = NacosDnsCore.class.getClassLoader() .getResourceAsStream("nacos-dns.properties" )) { Properties properties = new Properties(); properties.load(stream); Properties config = new Properties(); for (String[] keys : Constants.NACOS_PEOPERTIES_KEY) { MapUtils.putIfValNoNull(config, keys[0 ], properties.getProperty(keys[1 ])); } nacosClient = NacosFactory.createNamingService(config); initLoadBalancer(); } } private void initLoadBalancer () { ServiceLoader<LoadBalancer> loader = ServiceLoader.load(LoadBalancer.class); loader.forEach(loadBalancer -> balancers.put(loadBalancer.name(), loadBalancer)); } public Optional<InstanceRecord> selectOne (final String domain) { List<InstanceRecord> list = findAllInstanceByServiceName(domain); if (list.isEmpty()) { return Optional.empty(); } return Optional.of(balancers.get(LOAD_BALANCER_NAME).selectOne(list)); } private List<InstanceRecord> findAllInstanceByServiceName (final String serviceName) { if (!serviceMap.containsKey(serviceName)) { obtainServiceFromRemoteServer(serviceName); } return serviceMap.getOrDefault(serviceName, Collections.emptyList()); } private static void obtainServiceFromRemoteServer (final String serviceName) { serviceMap.computeIfAbsent(serviceName, name -> { try { String domain = serviceName.substring(0 , serviceName.length() - 1 ).replace("\\@\\@" , "@@" ); final String _serviceName = NamingUtils.getServiceName(domain); final String _groupName = NamingUtils.getGroupName(domain); List<Instance> instances = nacosClient.getAllInstances(_serviceName, _groupName); registerInstanceChangeObserver(serviceName); return parseToInstanceRecord(instances); } catch (Throwable ex) { logger.error( "An error occurred querying the service instance remotely : {}" , ExceptionUtil.getStackTrace(ex)); return Collections.emptyList(); } }); } private static void registerInstanceChangeObserver (final String serviceName) throws NacosException { nacosClient.subscribe(serviceName, event -> { NamingEvent namingEvent = (NamingEvent) event; final String name = namingEvent.getServiceName(); final List<Instance> newInstances = namingEvent.getInstances(); serviceMap.computeIfPresent(name, (s, instanceRecords) -> { instanceRecords.clear(); return parseToInstanceRecord(newInstances); }); }); } private static List<InstanceRecord> parseToInstanceRecord (List<Instance> instances) { int maxSize = 10_000 ; Stream<Instance> stream; if (instances.size() < maxSize) { stream = instances.stream(); } else { stream = instances.parallelStream(); } return stream.map(instance -> InstanceRecord.builder().ip(instance.getIp()) .port(instance.getPort()).healthy(instance.isHealthy()) .enabled(instance.isEnabled()).weight(instance.getWeight()) .metadata(instance.getMetadata()).build()) .collect(CopyOnWriteArrayList::new , CopyOnWriteArrayList::add, CopyOnWriteArrayList::addAll); } }
验证
Nacos 控制台
DNS 查询服务