前言
根据Zookeeper的一些特点,它是可以作为配置中心使用的。
何为配置中心?
我们在项目开发维护过程中会有很多公共变量或资源,需要统一管理,以前我们把它们写在程序公共类或者配置文件中,可是这样以后有变动,程序就需要重新部署,很是不方便,而且分布式、微服务等技术出现,修改维护多个项目管理也变得复杂。
为了解决以上问题,实现一次打包多地部署需求,减少项目管理及安全风险,我们需要将可变变量外移并通过页面统一可视化管理,基于此,我们统一建设了配置中心。
引入Zookeeper后,我们把数据存放在Zookeeper节点Znode上,可以选择主动轮询查询或者等待Zookeeper通知。当数据发生变化时,我们可以直接通过通知去执行某些业务操作。
一般为了数据准确性,我们会主动轮询查询或者通知+轮询的方式。
PS:当然使用数据库保存这些数据也是可以的,采用定期查询的方式,而且有的配置中心就与之类似,我们在这儿不做更广泛讨论。
分析
注册中心可以分为服务端和客户端两部分。
服务端一般用于存储配置数据,提供数据管理等等服务。客户端一般为业务端调用数据提供API服务等。
当然现在有一些开源的配置中心,如spring-cloud-config,diamond,disconf,apollo 等,我们以后有接触在具体介绍研究它们。
今天我们基于Zookeeper实现自己的一个简单的注册中心。
如下的配置中心流程图也就比较好理解了。
正文
先来实现基于Zookeeper的配置中心客户端吧。
PS:了解这篇文章之前可以先看看 Zookeeper Java客户端Curator
先来了解下curator-recipes 包下的一个类PathChildrenCache。
该类是从本地缓存ZK路径的所有子路径中保存所有数据的一个工具类,它将监视ZK路径、响应更新/创建/删除事件、下拉数据等等,此类不能保证事务处理时的强同步。
这个类有一个全参构造方法:
1
| public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){...}
|
Client是我们的ZKClient需要创建,path指要监控的路径,cacheData指是否缓存数据。
同时我们可以为其添加Listener,当节点/子节点数据有变化时,可以进行通知等。
使用该方法:
1
| pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
|
我们想实现自己的配置中心客户端,与SpringBoot进行集成,其目录结构如下创建:
ConfigCenterAutoConfig:SpringBoot自动配置类,会提供相应的Bean。
ConfigCenterConfiguration:自动配置类,从properties文件中获取配置属性。
ConfigCenterException:异常处理类。
ConfigCenterListener:配置中心监听listener。
ConfigCenterListenerAdapter:考虑到监听可以有多个,这个类用来处理它们。
LocalCacheService:主要用来定时轮询Zookeeper的配置。
ZKConfigService:主要用来创建Zookeeper连接及添加监听等。
ConfigUtil:工具类。
CacheNodeVo:节点Vo。
ConfigCenterClient:配置中心客户端。
先从配置类说起吧,连接配置文件properties的类ConfigCenterConfiguration。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @ConfigurationProperties("spring.zookeeper.config-center") public class ConfigCenterConfiguration { private String zkAddress; private String sysName; private Integer connectTimeoutMs = 60000; private Integer sessionTimeoutMs = 60000; private Integer retryInterval = 1000; ...... }
|
这个类不过多介绍了,就是Zookeeper的配置信息,连接Zookeeper时使用。
我们引入之前封装的framework-zookeeper包,通过自动配置拿到client。
1 2 3 4 5
| <dependency> <groupId>com.zwt</groupId> <artifactId>framework-zookeeper</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
|
ConfigCenterAutoConfig部分代码如下,也比较好理解,就是Spring启动后自动配置CuratorClient。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration @ConditionalOnClass(CuratorZKClient.class) @EnableConfigurationProperties(ConfigCenterConfiguration.class) public class ConfigCenterAutoConfig implements ApplicationContextAware { private static final Logger log= LoggerFactory.getLogger(ConfigCenterAutoConfig.class); @Autowired private ConfigCenterConfiguration properties; @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "spring.zookeeper.config-center", name = "enabled", havingValue = "true") CuratorZKClient curatorZKClient (){ ZKConfig config = new ZKConfig(); config.setConnectString(properties.getZkAddress()); config.setNameSpace(properties.getSysName()); config.setSessionTimeoutMs(properties.getSessionTimeoutMs()); config.setConnectTimeoutMs(properties.getConnectTimeoutMs()); config.setRetryInterval(properties.getRetryInterval()); CuratorZKClient zkClient = new CuratorZKClient(config); zkClient.addConnectionListener((state) -> { log.debug("ZKConfigService connectionListener state:" + state); if (state == ZKConstants.State.CONNECTED || state == ZKConstants.State.RECONNECTED) { log.info("ZKConfigService zookeeper is connected..."); } }); zkClient.start(); return zkClient; } private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static <T> T getBean(Class<T> requireType){ return applicationContext.getBean(requireType); } }
|
有了CuratorClient,我们创建ZKConfigService,主要为指定节点添加希望的监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class ZKConfigService{ private static final Logger log= LoggerFactory.getLogger(ZKConfigService.class); private static CuratorZKClient zkClient = null; final static String configRootPath = ConfigUtil.getConfigCenterPath();
public static void init() { zkClient = ConfigCenterAutoConfig.getBean(CuratorZKClient.class); try { PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient.getCuratorFramework(), configRootPath, true); PathChildrenCacheListener pathChildrenCacheListener = (client,event) -> { log.debug("pathChildrenCacheListener eventType:" + event.getType()); ChildData data = event.getData(); if(data!=null){ String dataStr = new String(data.getData(), "UTF-8"); String key = StringUtils.substringAfterLast(data.getPath(), ConfigUtil.SEP_STRING); switch (event.getType()) { case CHILD_ADDED: LocalCacheService.put(key,dataStr); break; case CHILD_UPDATED: LocalCacheService.put(key,dataStr); ConfigCenterListenerAdapter.onChange(key,dataStr); break; case CHILD_REMOVED: LocalCacheService.remove(key); break; default: break; } } }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start(); } catch (Exception e) { e.printStackTrace(); } log.info("spring-boot-config ZKConfigService init success."); }
public static String getKey(String key) { return zkClient.getStringData(ConfigUtil.joinPath(configRootPath, key)); } }
|
可以看到用到了我们刚才说的PathChildrenCache类,启动后,添加一个Listener,监听节点变化,一方面,我们需要一个类,对节点变化进行通知;另一方面,我们应把数据缓存在本地,如果数据变化后ZK没有通知或者其它情况,我们可以轮询查询后与本地缓存比较,有变化后继续进行我们节点变化的通知。
这就是我们的ConfigCenterListenerAdapter监听处理类和LocalCacheService本地缓存服务。
先说ConfigCenterListenerAdapter吧,可以看到上面代码节点有变化时触发了onChange事件。
由于我们业务可能需要多个监听类,故,我们提供一个监听接口,相关业务类实现这个接口,在注册一下监听即可使用岂不美哉。
考虑到此,我们存储监听类的集合应是静态的。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class ConfigCenterListenerAdapter { private static Logger log= LoggerFactory.getLogger(ConfigCenterListenerAdapter.class); private static ConcurrentHashMap<String, List<ConfigCenterListener>> someKeyListenerMap = new ConcurrentHashMap<>(); private static List<ConfigCenterListener> allKeyListeners = new CopyOnWriteArrayList<>(); public static boolean addListener(String key, ConfigCenterListener configCenterListener) { if (configCenterListener == null) { return false; } if (key == null || key.trim().length() == 0) { allKeyListeners.add(configCenterListener); return true; } else { List<ConfigCenterListener> listeners = someKeyListenerMap.get(key); if (listeners == null) { listeners = new ArrayList<>(); someKeyListenerMap.put(key, listeners); } listeners.add(configCenterListener); return true; } } public static void onChange(String key, String value) { if (key == null || key.trim().length() == 0) { return; } List<ConfigCenterListener> keyListeners = someKeyListenerMap.get(key); if (keyListeners != null && keyListeners.size() > 0) { for(ConfigCenterListener listener : keyListeners) { try { listener.onChange(key, value); } catch (Exception e) { log.error(e.getMessage(), e); } } } if (allKeyListeners.size() > 0) { for(ConfigCenterListener confListener : allKeyListeners) { try { confListener.onChange(key, value); } catch (Exception e) { log.error(e.getMessage(), e); } } } } }
|
1 2 3 4 5 6 7 8
| public interface ConfigCenterListener {
void onChange(String key, String value) ; }
|
我们再来看看我们的主动轮询服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class LocalCacheService { private static final Logger log= LoggerFactory.getLogger(LocalCacheService.class); private static final ConcurrentHashMap<String, CacheNodeVo> LOCAL_CONFIG_CACHE_MAP = new ConcurrentHashMap<>(); private static boolean refreshThreadStop = false; private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("spring-boot-config-%d").setDaemon(true).build(); private static ExecutorService singleThreadPool = Executors.newFixedThreadPool(1, namedThreadFactory); public static void init() { singleThreadPool.submit(()->{ while (!refreshThreadStop) { try { TimeUnit.SECONDS.sleep(60); reloadAll(); log.debug("spring-boot-config, refresh thread reloadAll success."); } catch (Exception e) { log.error("spring-boot-config, refresh thread error."); log.error(e.getMessage(), e); } } log.info("spring-boot-config, refresh thread stopped."); }); log.info("spring-boot-config LocalCacheService init success."); }
public static String get(String key) { CacheNodeVo cacheNodeVo = LOCAL_CONFIG_CACHE_MAP.get(key); if (cacheNodeVo != null) { return cacheNodeVo.getValue(); } return null; }
public static void put(String key, String value) { LOCAL_CONFIG_CACHE_MAP.put(key, new CacheNodeVo(key, value)); }
public static void remove(String key) { LOCAL_CONFIG_CACHE_MAP.remove(key); }
private static void reloadAll() { Set<String> keySet = LOCAL_CONFIG_CACHE_MAP.keySet(); if (keySet.size() > 1) { for(String key : keySet) { String zkValue = ZKConfigService.getKey(key); CacheNodeVo cacheNodeVo = LOCAL_CONFIG_CACHE_MAP.get(key); if (cacheNodeVo != null && cacheNodeVo.getValue() != null && cacheNodeVo.getValue().equals(zkValue)) { log.debug("refresh key:{} no changed ", key); } else { LOCAL_CONFIG_CACHE_MAP.put(key, new CacheNodeVo(key, zkValue)); ConfigCenterListenerAdapter.onChange(key, zkValue); } } } } }
|
可以看到借助了一个定长线程池,每隔60s重载一下数据,有变化会对监听者进行通知。它是通过一个静态的ConcurrentHashMap 来保存数据的。
这儿看到刚才的也会主动通知监听者,这儿也通知监听者,它们会通知两次吗?
我们可以看到主动通知的时候,也会先把ConcurrentHashMap的值先改变在进行通知,要是出现通知两次的情况,会是概率极低的。要是要求只能通知一次,且业务监听无法重复处理两次数据变化请求,可以在向ConcurrentHashMap里放值时,再检查一下它的当前值,或使用其它方法处理。
两个服务ZK通知和主动轮询处理完成后,提供一个配置中心Client,用于获取值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ConfigCenterClient { private static final Logger log= LoggerFactory.getLogger(ConfigCenterClient.class); static { LocalCacheService.init(); ZKConfigService.init(); } public static String getString(String key, String defaultValue) { String value = LocalCacheService.get(key); if (value != null) { log.debug("get config {} from cache",key); return value; } value = ZKConfigService.getKey(key); if (value != null) { log.debug("get config {} from zookeeper",key); LocalCacheService.put(key,value); return value; } return defaultValue; } public static String getString(String key) { return getString(key, null); } private static void checkNull(String key,String value) { if (value == null) { throw new ConfigCenterException(String.format("config key [%s] does not exist",key)); } } public static long getLong(String key) { String value = getString(key, null); checkNull(key, value); return Long.valueOf(value); } public static int getInt(String key) { String value = getString(key, null); checkNull(key, value); return Integer.valueOf(value); } public static boolean getBoolean(String key) { String value = getString(key, null); checkNull(key, value); return Boolean.valueOf(value); } public static boolean addListener(String key, ConfigCenterListener configCenterListener){ return ConfigCenterListenerAdapter.addListener(key, configCenterListener); } }
|
首先应加载LocalCacheService和ZKConfigService,然后实现主要的方法getString,直接去缓存里取,拿不到去Zookeeper里取并放到缓存里,在提供一个addListener方法,可以让用户自己定义想监听的节点。
至此,我们一个简单的配置中心的客户端就完成了,我们把它打包引入一个demo项目测试一下。
创建一个demo项目,引入我们的jar包。
1 2 3 4 5
| <dependency> <groupId>com.zwt</groupId> <artifactId>config-spring-boot-starter</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
|
我们创建一个Listener实现。
1 2 3 4 5 6 7
| public class MyListener implements ConfigCenterListener { private static final Logger log= LoggerFactory.getLogger(MyListener.class); @Override public void onChange(String key, String value) { log.info(key+ " changed "+ value); } }
|
创建测试类,我们循环10次改变节点的值,测试一下我们的程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { @Autowired CuratorZKClient client; @Test public void contextLoads() throws Exception{ ConfigCenterClient.addListener("test",new MyListener()); for(int i = 0;i<10;i++){ Thread.sleep(5000); String str = ConfigCenterClient.getString("test"); System.out.println(str); client.setData("/config/test","Hello World"+i); } } }
|
可以看到执行结果。
我们还要实现一个配置中心的服务端。
服务端基本上是对Zookeeper数据节点的增删改查这几个逻辑,其核心是Zookeeper保存在节点上的数据。
为了方便对Zookeeper数据进行操作,我们一般创建一个可视化后台管理系统。如下:
这个系统是比较好实现的,引入我们的framework-zookeeper包,里面封装了Zookeeper的增删改查,当然需要创建一个web项目。
这一块就不再过多介绍了,当明白了Zookeeper的增删改查节点数据后,实现起来是比较容易的。
总结
今天我们通过Zookeeper实现了一个配置中心,简单了解了它的原理,也对Zookeeper有了一些更深刻的理解。
现在很多开源的配置中心也相当的不错,也是可以学习和理解的,我后面可能也会讲解一些关于这方面的知识。
framework-zookeeper 和 config-spring-boot-starter 的相关代码已上传GitHub,大家如果有兴趣在实践中遇到问题可以过去参考下代码,如有疑问也欢迎与我交流探讨。
配置中心服务端(web项目)由于个人原因和时间原因,只写了个大概,也没有提交Github,后续应该会补上。