解析Nacos Client端实现配置的动态变更

注解NacosValue如何生效的

官方示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
@EnableNacosConfig(globalProperties = @NacosProperties(serverAddr = "127.0.0.1:8848"))
@NacosPropertySource(dataId = "example", autoRefreshed = true)
public class NacosConfiguration {}

@Controller
@RequestMapping("config")
public class ConfigController {

@NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
private boolean useLocalCache;

@RequestMapping(value = "/get", method = GET)
@ResponseBody
public boolean get() {
return useLocalCache;
}
}

服务配置管理是Nacos的一个功能点;从上面的代码可知,只需要在启动项上加上@EnableNacosConfig注解开启Nacos的配置管理功能后,在相应的Field加上一个@NacosValue注解就可以实现配置的动态变更。那么这背后的工作原理是怎么样的呢?

动态服务配置变更原理解析

EnableNacosConfig注解的作用
1
2
3
4
5
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(NacosConfigBeanDefinitionRegistrar.class)
public @interface EnableNacosConfig {}

可以看到,@EnableNacosConfig注解有一个@Import(NacosConfigBeanDefinitionRegistrar.class),因此在此注解还依赖于NacosConfigBeanDefinitionRegistrar.class的bean,因此registerBeanDefinitions方法会进行Nacos Config Client端的准备工作,比如@NacosValue@NacosConfigListener等注解的解析操作等等

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = fromMap(metadata.getAnnotationAttributes(EnableNacosConfig.class.getName()));
// Register Global Nacos Properties Bean
registerGlobalNacosProperties(attributes, registry, environment, CONFIG_GLOBAL_NACOS_PROPERTIES_BEAN_NAME);
// Register Nacos Common Beans
registerNacosCommonBeans(registry);
// Register Nacos Config Beans
registerNacosConfigBeans(registry, environment);
// Invoke NacosPropertySourcePostProcessor immediately
// in order to enhance the precedence of @NacosPropertySource process
invokeNacosPropertySourcePostProcessor(beanFactory);
}
NacosValue注解是如何生效的

NacosValue注解的生效由com.alibaba.nacos.spring.context包的NacosValueAnnotationBeanPostProcessor进行实现,这个类继承了spring的一些interface or Class

1
2
extends InstantiationAwareBeanPostProcessorAdapter implements MergedBeanDefinitionPostProcessor, PriorityOrdered,
BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, DisposableBean, BeanFactoryAware, ApplicationListener<NacosConfigReceivedEvent>

为了能够实现配置的动态变更,首先要做的就是将被@NacosValue作用的Field信息收集起来,而这个功能的实现是在postProcessBeforeInitialization方法进行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Object postProcessBeforeInitialization(Object bean, final String beanName) throws BeansException {

doWithFields(bean, beanName);

doWithMethods(bean, beanName);

return super.postProcessBeforeInitialization(bean, beanName);
}

private void doWithFields(final Object bean, final String beanName) {
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException {
NacosValue annotation = getAnnotation(field, NacosValue.class);
doWithAnnotation(beanName, bean, annotation, field.getModifiers(), null, field);
}
});
}

private void doWithMethods(final Object bean, final String beanName) {
ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException {
NacosValue annotation = getAnnotation(method, NacosValue.class);
doWithAnnotation(beanName, bean, annotation, method.getModifiers(), method, null);
}
});
}

这个方法里面执行了doWithFields以及doWithMethods,这两个方法就是将@NacosValue所修饰的Field或者Method进行缓存至一个Map中,同时解析@NacosValue里面的设置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doWithAnnotation(String beanName, Object bean, NacosValue annotation, int modifiers, Method method, Field field) {
if (annotation != null) {
if (Modifier.isStatic(modifiers)) {
return;
}

if (annotation.autoRefreshed()) {
String placeholder = resolvePlaceholder(annotation.value());

if (placeholder == null) {
return;
}

NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName, method, field);
put2ListMap(placeholderNacosValueTargetMap, placeholder, nacosValueTarget);
}
}
}

至此,@NacosValue在我们应用启动后,它所修饰的Field以及Method信息就都被缓存起来了,接来下就是如何接收NacosServer推送来的配置变更事件以及如何将变更后的配置重新转载入对应的Field或者Method

如何实现配置变更事件通知

在这里,Nacos 采用了Spring的事件机制,刚刚说到了NacosValueAnnotationBeanPostProcessor还实现了接口ApplicationListener<NacosConfigReceivedEvent>,就是用来接收配置变更事件的。相应的事件为NacosConfigReceivedEvent对象,该对象包含了dataIdgroupId以及配置内容content,通过方法toProperties(content)将配置内容字符串转为Properties对象,便于操作,然后就是进行相应的key-value匹配然后重新注入新配置内容的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void onApplicationEvent(NacosConfigReceivedEvent event) {
String content = event.getContent();
if (content != null) {
Properties configProperties = toProperties(content);

for (Object key : configProperties.keySet()) {
String propertyKey = (String)key;

List<NacosValueTarget> beanPropertyList = placeholderNacosValueTargetMap.get(propertyKey);
if (beanPropertyList == null) {
continue;
}

String propertyValue = configProperties.getProperty(propertyKey);
for (NacosValueTarget nacosValueTarget : beanPropertyList) {
if (nacosValueTarget.method == null) {
setField(nacosValueTarget, propertyValue);
} else {
setMethod(nacosValueTarget, propertyValue);
}
}
}
}
}

而具体的事件发布者,则是com.alibaba.nacos.spring.context.event.config中的DelegatingEventPublishingListener,当接收到Nacos Server关于服务配置信息变更时,具体的事件发布由receiveConfigInfo方法执行

1
2
3
4
5
6
7
8
9
10
@Override
public void receiveConfigInfo(String content) {
publishEvent(content);
onReceived(content);
}

private void publishEvent(String content) {
NacosConfigReceivedEvent event = new NacosConfigReceivedEvent(configService, dataId, groupId, content);
applicationEventPublisher.publishEvent(event);
}

而这里的configService对象,是由NacosFactory.createConfigService(properties)所创建的,就是com.alibaba.nacos.api.config中的ConfigService对象。

Client端如何获取Nacos Server端变更的最新配置信息

NacosConfigService创建的时候,会创建一个ClientWorker对象

1
2
3
4
5
6
7
8
9
10
11
12
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager);
}

而这个ClientWorker是实现从NacosServer获取最新配置信息以及更新相应被@NacosValue所修饰的Field的值的重要组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;

executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});

executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
});

executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}

可以看到,Executor这个线程池提交了一个checkConfigInfo()方法,并且该方法被执行的频率很高,每10ms执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}

具体的解析在博客Nacos配置中心原理——逅弈逐码)都说的比较清楚了,我就直接来说后面的——接收到配置变更后如何更新@NacosValue所修饰的Field的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;

Runnable job = new Runnable() {
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener)listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);

ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener);
} catch (NacosException de) {
...
}
}
};

final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
...
}

这一块就是承接配置变更到最终作用于Field的代码了,这里的listenerDelegatingEventPublishingListener,通过代码调试可以获取

DelegatingEventPublishingListener

然后DelegatingEventPublishingListener调用receiveConfigInfo方法将本次配置变更作为一个事件进行发布,最终被NacosValueAnnotationBeanPostProcessor事件监听器所获取,具体的方法上面说到了