zk客户端的三种操作(akAPI,ZkClient,Curator)
作者:互联网
1. zk原生态API
新建Maven项目导入依赖
<!-- zk 原生api 操作 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
zk客户端是异步的哦!需要引入CountDownLatch 来确保连接好了再做下面操作。zk 原生api是不支持迭代式的创建跟删除路径的。
public class CreateSessionDemo
{
private final static String CONNECTSTRING = "localhost:2181";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// CountDownLatch 的引入是因为 zk 连接是异步的
public static void main(String[] args) throws IOException, InterruptedException
{
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher()
{
public void process(WatchedEvent watchedEvent)
{
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
{
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());
}
}
节点的增删改查 操作:
public class ApiOperatorDemo implements Watcher
{
private final static String CONNECTSTRING = "localhost:2181";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zookeeper;
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception
{
zookeeper = new ZooKeeper(CONNECTSTRING, 5000, new ApiOperatorDemo());
countDownLatch.await();
// 确保连接ok
//创建节点 设置为IDs权限为anyone 还是一个 永久节点
String result = zookeeper.create("/node1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
//增加一个
System.out.println("创建成功:" + result);
//修改数据
zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
zookeeper.setData("/node1", "deer2".getBytes(), -1);
Thread.sleep(2000);
//删除节点
zookeeper.getData("/node1", new ApiOperatorDemo(), stat);
zookeeper.delete("/node1", -1);
Thread.sleep(2000);
// 创建节点和子节点
String path = "/node11";
zookeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
Stat stat = zookeeper.exists(path + "/node1", true);
if (stat == null)
{
//表示节点不存在
zookeeper.create(path + "/node1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
}
//修改子路径
zookeeper.setData(path + "/node1", "deer".getBytes(), -1);
TimeUnit.SECONDS.sleep(1);
//获取指定节点下的子节点
List<String> childrens = zookeeper.getChildren("/node11", true);
System.out.println(childrens);
}
public void process(WatchedEvent watchedEvent)
{
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
{
if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath())
{
countDownLatch.countDown();
System.out.println(watchedEvent.getState() + "-->" + watchedEvent.getType());
} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged)
{
try
{
System.out.println("数据变更触发路径:" + watchedEvent.getPath() + "->改变后的值:" +
new String(zookeeper.getData(watchedEvent.getPath(), true, stat)));
} catch (KeeperException e)
{
e.printStackTrace();
} catch (InterruptedException e)
{
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged)
{//子节点的数据变化会触发
try
{
System.out.println("子节点数据变更路径:" + watchedEvent.getPath() + "->节点的值:" +
zookeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e)
{
e.printStackTrace();
} catch (InterruptedException e)
{
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeCreated)
{//创建子节点的时候会触发
try
{
System.out.println("节点创建路径:" + watchedEvent.getPath() + "->节点的值:" +
zookeeper.getData(watchedEvent.getPath(), true, stat));
} catch (KeeperException e)
{
e.printStackTrace();
} catch (InterruptedException e)
{
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeDeleted)
{//子节点删除会触发
System.out.println("节点删除路径:" + watchedEvent.getPath());
}
}
}
}
zk权限操作demo
public class AuthControlDemo implements Watcher
{
private final static String CONNECTSTRING = "localhost:2181";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static CountDownLatch countDownLatch2 = new CountDownLatch(1);
private static ZooKeeper zookeeper;
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception
{
zookeeper = new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
countDownLatch.await();
ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("root:root")));
ACL acl2 = new ACL(ZooDefs.Perms.CREATE, new Id("ip", "192.168.1.1"));
List<ACL> acls = new ArrayList<>();
acls.add(acl);
acls.add(acl2);
zookeeper.create("/auth1", "123".getBytes(), acls, CreateMode.PERSISTENT);
zookeeper.addAuthInfo("digest", "root:root".getBytes());
zookeeper.create("/auth1/auth1-1", "123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
ZooKeeper zooKeeper1 = new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
countDownLatch.await();
zooKeeper1.addAuthInfo("digest", "root:root".getBytes());
zooKeeper1.delete("/auth1/auth1-1", -1);
// acl (create /delete /admin /read/write)
//权限模式: ip/Digest(username:password)/world/super
}
public void process(WatchedEvent watchedEvent)
{
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
{
if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath())
{
countDownLatch.countDown();
System.out.println(watchedEvent.getState() + "-->" + watchedEvent.getType());
}
}
}
}
zk原生态APi缺点
1. 会话的连接是异步的;必须用到回调函数
2. Watch需要重复注册: 看一次watch注册一次
3. Session重连机制:有时session断开还需要重连接。
4. 开发复杂性较高:开发相对来说比较琐碎
2. zkClient
<!-- zkclient 操作 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
创建会话(同步,重试)
public ZkClient(final String zkServers, final int sessionTimeout,
final int connectionTimeout, final ZkSerializer zkSerializer,
final long operationRetryTimeout)
创建节点(同步,递归创建)
public String create(String path,Object data,final List<ACL> acl,CreateMode mode)
public void createPersistent(String path,boolean createParents,List<ACL> acl)
public void createPersistent(String path, Object data, List<ACL> acl)
public String createPersistentSequential(String path,Object data,List<ACL> acl)
public void createEphemeral(String path, Object data, List<ACL> acl)
public String createEphemeralSequential(String path,Object data,List<ACL> acl)
删除节点(同步,递归删除)
public boolean delete(String path,int version)
public boolean deleteRecursive(String path)
获取节点(同步,避免不存在异常)
public List<String> getChildren(String path)
public <T> T readData(String path, boolean returnNullIfPathNotExists)
public <T> T readData(String path, Stat stat)
更新节点(同步,实现CAS,状态返回)
public void writeData(String path, Object datat, int expectedVersion)
public Stat writeDataReturnStat(String path,Object datat,int expectedVersion)
检测节点存在(同步)
public boolean exists(String path)
权限控制(同步)
public void addAuthInfo(String scheme, final byte[] auth);
public void setAcl(final String path, final List<ACL> acl);
监听器
序号 |
监听器 |
注册API |
1 |
IZkStateListener |
(un)subscribeStateChanges(IZkStateListener listener) |
2 |
IZkDataListener |
(un)subscribeDataChanges(IZkStateListener listener) |
3 |
IZkChildListener |
(un)subscribeChildChanges(IZkStateListener listener) |
public class SessionDemo {
private final static String CONNECTSTRING="localhost:2181";
public static void main(String[] args) {
ZkClient zkClient=new ZkClient(CONNECTSTRING,4000);
System.out.println(zkClient+" - > success");
}
}
public class ZkClientApiOperatorDemo
{
private final static String CONNECTSTRING = "localhost:2181";
private static ZkClient getInstance()
{
return new ZkClient(CONNECTSTRING, 10000);
}
public static void main(String[] args) throws InterruptedException
{
ZkClient zkClient = getInstance();
//zkclient 提供递归创建父节点的功能
zkClient.createPersistent("/zkclient/zkclient1/zkclient1-1/zkclient1-1-1", true);
System.out.println("success");
//删除节点
zkClient.deleteRecursive("/zkclient");
//获取子节点
List<String> list = zkClient.getChildren("/node11");
System.out.println(list);
//watcher
zkClient.subscribeDataChanges("/node11", new IZkDataListener()
{
@Override
public void handleDataChange(String s, Object o) throws Exception
{
System.out.println("节点名称:" + s + "->节点修改后的值" + o);
}
@Override
public void handleDataDeleted(String s) throws Exception
{
}
});
zkClient.writeData("/node11", "node");
TimeUnit.SECONDS.sleep(2);
zkClient.subscribeChildChanges("/node11", new IZkChildListener()
{
@Override
public void handleChildChange(String s, List<String> list) throws Exception
{
System.out.println("节点名称:" + s + "->" + "当前的节点列表:" + list);
}
});
zkClient.delete("/node11/node1");
;
TimeUnit.SECONDS.sleep(2);
}
}
3. curator
<!-- curator 操作 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
Java客户端连接集群
ZkClient client = new ZkClient("host1,host2,host3,host4,host5");
Zk客户端处理过程:解析→打散→形成环形地址列表队列
参考
标签:String,zk,zookeeper,Curator,节点,ZkClient,new,public,watchedEvent 来源: https://blog.51cto.com/u_14582976/2829346