前言
我们今天来讲讲Zookeeper Java客户端的一些使用吧。
之前我们说到Zookeeper的安装及简单使用,要介绍它的一些应用场景,要明白它的应用场景,要先理解它客户端的一些操作方法。
Zookeeper的Java客户端,最常使用的便是Apache Curator了,它是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端ZooKeeper相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,而且Curator的功能更加强大。
正文
要使用Curator客户端,需要下面的两个依赖。
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
|
curator-recipes包一般能满足我们的需要,要是封装更简便的底层功能的话,curator-framework包必不可少。
创建并启动客户端
使用程序创建一个客户端client并启动(连接到Zookeeper)。
Builder模式创建一个客户端。
1 2 3 4 5 6 7
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getConnectString()) .namespace(zkConfig.getNameSpace()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getRetryInterval(), Integer.MAX_VALUE)) .connectionTimeoutMs(zkConfig.getConnectTimeoutMs()) .sessionTimeoutMs(zkConfig.getSessionTimeoutMs()); client = builder.build();
|
客户端启动。
简单说下连接时的参数:
- connectString:服务器列表,逗号隔开(host1:port1,host2:port2……)
- namespace:命名空间,可以用来进行业务区分。
- retryPolicy:重试策略,有以下4种重试策略,也可以自己实现重试策略(实现RetryPolicy接口)。
RetryOneTime:重试一次。
RetryNTimes:重试N次(需要传入重试间隔参数sleepMsBetweenRetries,及尝试次数n),它继承了抽象类SleepingRetry(每休眠一段时间重试一次)。
RetryForever:一直重试(需要传入重试间隔retryIntervalMs参数)。
BoundedExponentialBackoffRetry:重试次数固定,但每次重试的时间间隔会不断变大(如果一直连不上),需要传入初始等待重试时间baseSleepTimeMs,重试次数maxRetries,及最大等待重试时间maxSleepTimeMs 参数,这个类继承ExponentialBackoffRetry(它又继承SleepingRetry)抽象类。
- sessionTimeoutMs:会话超时时间,单位毫秒,默认60000ms。
- connectionTimeoutMs:连接创建超时时间,单位毫秒,默认60000ms。
创建数据节点。
Zookeeper节点有4种,上篇文章已介绍。
创建持久化节点
1
| client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data);
|
使用create方法,creatingParentsIfNeeded这个方法保证如果有父节点也会一起创建,这在原生客户端是无法实现的。
CreateMode 有4种,跟Zookeeper的节点类型对应。
forPath方法可以认为最终操作,path表示节点路径,data表示节点数据。
data是byte数组,其它类型的数据应转换为byte数组。
注:如果不设置withMode方法,默认创建持久化节点,不设置data,节点默认内容为空。
如下:
1
| client.create().forPath(path);
|
创建顺序节点
1
| client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, data);
|
创建临时节点
1
| client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data);
|
创建临时顺序节点
1
| client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data);
|
设置节点数据
1
| client.setData().forPath(path, bytes);
|
直接调用setData方法即可,返回一个Stat(节点信息类)。
获取节点数据
1
| client.getData().forPath(path);
|
使用getData方法,返回byte数组。
获取子节点
1
| client.getChildren().forPath(path);
|
使用getChildren方法,返回一个子节点List <String>
列表,数据为各个子节点名称。
删除节点
1
| client.delete().guaranteed().forPath(path);
|
使用delete方法,guaranteed方法可以保证一定删除。如果某个节点删除失败,会抛出异常,但是如果使用了guaranteed,它会在后台继续进行删除直到删除成功。
删除节点(包括子节点)
1
| client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
|
deletingChildrenIfNeeded方法可以保证如果有子节点的话一并删除,原生client是无法实现此功能的(需要我们写方法处理)。
判断节点是否存在
1 2 3 4 5 6 7 8 9 10 11
| public boolean checkNodeExist(String path) { boolean exits = false; try { Stat stat = client.checkExists().forPath(path); if (stat != null) { exits = true; } } catch (Exception e) { } return exits; }
|
使用checkExists方法,最终返回一个Stat,如果Stat为空就说明不存在。
PS:由此我们可以创建一个 createOrUpdate方法,无节点时创建,有节点时更新内容。
1 2 3 4 5 6 7 8 9 10 11
| public void createOrUpdateNode(String path, byte[] data) { try { if(checkNodeExist(path)){ setData(path,data); }else{ createNode(path, data); } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
异步处理
上面的操作方法,都可以使用异步进行处理的,主要使用了inBackground方法。
如下:
1
| client.create().inBackground().forPath(path, data);
|
该方法全参函数如下,且重载了多个方法。
1
| public T inBackground(BackgroundCallback callback, Object context, Executor executor);
|
BackgroundCallback callback:异步回调函数,处理完成后会回调此函数进行某些逻辑。
Object context:上下文对象。
Executor executor:异步处理的线程,不指定的话将使用内部默认线程处理。
我们可以看下BackgroundCallback 方法 会有两个参数。
1 2 3
| public interface BackgroundCallback { void processResult(CuratorFramework var1, CuratorEvent var2) throws Exception; }
|
第二个参数CuratorEvent里面包含了此次处理结果的所有信息,包括节点信息等。
1 2 3 4 5 6 7 8 9 10 11 12
| public interface CuratorEvent { CuratorEventType getType(); int getResultCode(); String getPath(); Object getContext(); Stat getStat(); byte[] getData(); String getName(); List<String> getChildren(); List<ACL> getACLList(); WatchedEvent getWatchedEvent(); }
|
CuratorEventType表示事件类型,表示此次操作的事件类型。可以看到它与CuratorFramework里的方法是一一对应的。
getResultCode返回处理结果码。可以在这个枚举里查看各个状态码。
添加watcher
1 2
| client.getData().usingWatcher(watcher).forPath(path); client.getChildren().usingWatcher(watcher).forPath(path);
|
使用usingWatcher结合getData或者getChildren方法可以为指定节点或者子节点添加watcher。
Watcher可以为CuratorWatcher或者Zookeeper自带的Watcher。它们有一个event参数。
可以拿到Zookeeper的状态 KeeperState和 事件类型 EventType,从而进行某些必要的操作。
KeeperState枚举和EventType枚举如下图。
事务支持
Zookeeper一些操作是支持事务的。
主要用到的方法有inTransaction、and、commit等方法。举例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
public CuratorTransaction startTransaction() { return client.inTransaction(); }
public CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction, String path) throws Exception { return transaction.create().forPath(path, new byte[0]).and(); }
public CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction, String path) throws Exception { return transaction.delete().forPath(path).and(); }
public CuratorTransactionFinal addSetDataToTransaction(CuratorTransaction transaction, String path, byte[] data) throws Exception { return transaction.setData().forPath(path, data).and(); }
public Collection<CuratorTransactionResult> commitTransaction(CuratorTransactionFinal transaction) throws Exception { return transaction.commit(); }
|
检查连接情况
1
| client.getZookeeperClient().isConnected();
|
关闭连接
提升
对Curator客户端有简单理解后,我们把它进行简单功能的封装。
PS:Curator的强大之处在于其增强功能部分,我们会在后面结合Zookeeper应用讨论。
创建项目framework-zookeeper,搭建如下结构:
接口ZKClient,里面有一些Zookeeper客户端的协议,大致如下:
1 2 3 4 5 6 7 8 9 10
| public interface ZKClient { void start(); boolean isConnected(); void close(); void createNode(String path, byte[] data); void createOrUpdateNode(String path, byte[] data); void createEphemeralNode(String path, byte[] data); String createSequenceNode(String path) ; ...... }
|
CuratorZKClient是Curator对接口ZKClient的实现,BaseZKClient是原生客户端对接口ZKClient的实现。
我们来写下CuratorZKClient的一些关键代码。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class CuratorZKClient implements ZKClient { private CuratorFramework client; private volatile boolean closed = false; private ZKConfig zkConfig; private Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>(); private final ConcurrentMap<String, ConcurrentMap<NodeListener, CuratorWatcher>> nodeListeners = new ConcurrentHashMap<>(); ...... }
|
提供一个连接监听接口,以便我们可以监听Zookeeper的连接状态并且执行某些操作。
1 2 3
| public interface ConnectionListener { void stateChanged(ZKConstants.ConnectState state); }
|
及节点变化接口,监测节点变化进行某些操作。
1 2 3 4 5
| public interface NodeListener { void nodeChanged(String path, List<String> nodes); void nodeDelete(String path); void dataChanged(String path, byte[] data); }
|
ZKConfig是Zookeeper客户端连接的配置,属性值可以配置在properties等配置文件里。
1 2 3 4 5 6 7 8
| public class ZKConfig { private String connectString; private String nameSpace; private int retryInterval = 1000; private int connectTimeoutMs = 60000; private int sessionTimeoutMs = 60000; ...... }
|
CuratorZKClient 里实现接口的start方法是,部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ...... public void start() { ....... CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getConnectString()) .namespace(zkConfig.getNameSpace()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getRetryInterval(), Integer.MAX_VALUE)) .connectionTimeoutMs(zkConfig.getConnectTimeoutMs()) .sessionTimeoutMs(zkConfig.getSessionTimeoutMs()); client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState zkConnectionState) { ZKConstants.ConnectState state = toConnectionListenerState(zkConnectionState); if (state != null) { for(ConnectionListener connectionListener : connectionListeners) { connectionListener.stateChanged(state); } } } private ZKConstants.ConnectState toConnectionListenerState(ConnectionState zkConnectionState) { switch (zkConnectionState) { case LOST: return ZKConstants.ConnectState.DISCONNECTED; case SUSPENDED: return ZKConstants.ConnectState.DISCONNECTED; case CONNECTED: return ZKConstants.ConnectState.CONNECTED; case RECONNECTED: return ZKConstants.ConnectState.RECONNECTED; default: return null; } } }); client.start(); } .....
|
这样我们暴露了Zookeeper的连接状态监听接口,以后想监听它的连接状态进行某些操作,直接实现接口,并通过addConnectionListener添加进来即可。
1 2 3 4 5 6 7 8
| @Override public void addConnectionListener(ConnectionListener listener) { connectionListeners.add(listener); } @Override public void removeConnectionListener(ConnectionListener listener) { connectionListeners.remove(listener); }
|
其它的方法,比如createNode、deleteNode等,我们拿到client后,按照上面讲述的各个操作便可以写出代码,这里不再赘述。以下是createNode的例子。
1 2 3 4 5 6 7 8 9 10
| @Override public void createNode(String path, byte[] data) { try { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data); } catch (KeeperException.NodeExistsException e) { log.warn(String.format("create node is exist:%s", path)); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
再说一下需要实现的NodeListener方法,节点发生变化,主要通过watcher通知。
实现一个watcher。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| private class CuratorWatcherImpl implements CuratorWatcher { private volatile NodeListener listener; public CuratorWatcherImpl(NodeListener listener) { this.listener = listener; } public void unWatch() { this.listener = null; } @Override public void process(WatchedEvent event) throws Exception { if (listener != null) { log.debug(event.getPath() + " with event " + event.getType()); switch (event.getType()) { case NodeDataChanged: try { byte[] data = client.getData().usingWatcher(this).forPath(event.getPath()); log.debug(event.getPath() + " data after change: " + new String(data)); listener.dataChanged(event.getPath(), data); } catch (Exception e) { log.warn(e.getMessage(), e); } break; case NodeDeleted: case NodeCreated: log.error(event.getPath()); case NodeChildrenChanged: try { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { listener.nodeDelete(event.getPath()); } else { List<String> nodes = getNodes(event.getPath()); if (nodes != null) { client.getChildren().usingWatcher(this).forPath(event.getPath()); } log.debug(event.getPath() + " nodes after change: " + nodes); listener.nodeChanged(event.getPath(), nodes); } } catch (KeeperException.NoNodeException e) { log.warn(e.getMessage()); } catch (Exception e) { log.warn(e.getMessage(), e); } break; case None: default: break; } } } }
|
然后实现NodeListener的添加移除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Override public void addNodeListener(String path, NodeListener listener) { ConcurrentMap<NodeListener, CuratorWatcher> listeners = nodeListeners.get(path); if (listeners == null) { nodeListeners.putIfAbsent(path, new ConcurrentHashMap<NodeListener, CuratorWatcher>()); listeners = nodeListeners.get(path); } CuratorWatcher watcher = listeners.get(listener); if (watcher == null) { listeners.putIfAbsent(listener, new CuratorWatcherImpl(listener)); watcher = listeners.get(listener); } addChildrenCuratorWatcher(path, watcher); }
@Override public void removeNodeListener(String path, NodeListener listener) { ConcurrentMap<NodeListener, CuratorWatcher> listeners = nodeListeners.get(path); if (listeners != null) { CuratorWatcher watcher = listeners.remove(listener); if (watcher != null) { ((CuratorWatcherImpl) watcher).unWatch(); } } } private void addChildrenCuratorWatcher(final String path, CuratorWatcher watcher) { try { client.getData().usingWatcher(watcher).forPath(path); client.getChildren().usingWatcher(watcher).forPath(path); } catch (KeeperException.NoNodeException e) { log.warn(String.format("add watcher node not exist:%s", path)); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
|
可以看到主要是是维护一个ConcurrentHashMap,listener为key,watcher为value,节点有变化,通知到listener。
好。到这里基本上一个Zookeeper工具客户端就OK了,BaseZKClient的实现与CuratorZKClient类似,有兴趣的可以自己看看。
测试
我们测试下我们的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void main(String[] args) throws Exception{ ZKConfig config = new ZKConfig(); config.setConnectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); config.setConnectTimeoutMs(60000); config.setNameSpace("zwt"); config.setRetryInterval(1000); config.setSessionTimeoutMs(60000); ZKClient client = new CuratorZKClient(config); client.addConnectionListener(new ConnectionListener() { @Override public void stateChanged(ZKConstants.ConnectState state) { System.out.println("ZKState state "+ state.name()); } }); client.start(); client.createNode("/mytest","Hello World"); System.out.println(new String(client.getData("/mytest"),"UTF-8")); client.close(); }
|
运行可以看到输出:
我们使用命令行也可以看到我们新增的test节点及其属性。
PS:可以看到nameSpace 业务命名空间相当于新增一个根节点以区分不同业务,避免节点冲突等作用。
我们在client启动后添加watcher。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ...... client.start(); client.addNodeListener("/mytest", new NodeListener() { @Override public void nodeChanged(String path, List<String> nodes) { System.out.println(path+" node changed"); } @Override public void nodeDelete(String path) { System.out.println(path+" node delete"); } @Override public void dataChanged(String path, byte[] data) { System.out.println(path+" data changed "+ data); } }); System.out.println(new String(client.getData("/mytest"),"UTF-8")); client.setData("/mytest","World"); System.out.println(new String(client.getData("/mytest"),"UTF-8")); client.createNode("/mytest/test"); client.deleteNodeWithChildren("/mytest"); ......
|
继续测试,结果如下。
到这里,我们基本把客户端操作的基本说完了。关于其它一些Zookeeper客户端,这里就不在过多介绍了,有兴趣的可以继续实现ZKClient接口去完成。
PS:BaseZKClient类是我写的一个原生Zookeeper客户端的集成工具,但有些小问题未处理。
总结
通过使用Zookeeper客户端的一些例子,更对Zookeeper有了更深入的了解。
下面的文章我们将结合Curator的一些高级功能及Zookeeper的一些应用来了解Zookeeper的强大之处。