Zookeeper curator框架
作者:互联网
1. 简介
- curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。
- 提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
1.1 原生zookeeperAPI的不足
- 连接对象异步创建,需要开发人员自行编码等待
- 连接没有自动重连超时机制
- watcher一次注册生效一次
- 不支持递归创建树形节点
1.2 curator特点
- 解决session会话超时重连
- watcher反复注册
- 简化开发api
- 遵循Fluent风格的API
- 提供了分布式锁服务、共享计数器、缓存机制等机制
1.3 依赖
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<type>jar</type>
</dependency>
</dependencies>
2. 连接与关闭
- 采用了工厂设计模式和建造者设计模式。通过输入一些连接信息,可以获取到一个连接Zookeeper服务器的客户端。
public static void main(String[] args) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 表示间隔1s,最多尝试重连3次 CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181") .sessionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace("create") .build(); client.start(); // 开启客户端 log.info(client.isStarted()); client.close(); // 关闭客户端 }
connectString
:用于设置地址及端口号;
sessionTimeoutMs
:用于设置超时时间;
retryPolicy
:用于设置重连策略
namespace
:表示根节点路径,可以没有
2.1 测试模版
- 因此,可以写一个测试模板,在开始之前打开客户端,在结束之后关闭客户端。
public class CreateTest { private final static Logger log = Logger.getLogger(ConnectTest.class); private String connectString = "192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181"; CuratorFramework client; Integer sessionTimeoutMs = 5000; Integer baseSleepTimeMs = 1000; Integer maxRetries = 3; String namespace = "create"; @Before public void before() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory .builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .retryPolicy(retryPolicy) .namespace(namespace) .build(); client.start(); log.info("客户端已开启"); } @After public void after() { client.close(); log.info("客户端已关闭"); } }
3. 新增节点
3.1 案例一:简单创建
@Test
public void testCreate() throws Exception {
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node", "data".getBytes());
log.info("create结束");
}
3.2 案例二:自定义权限创建
@Test
public void testCreate2() throws Exception {
Id ip = new Id("ip", "192.168.233.133");
List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(acl)
.forPath("/node1", "data".getBytes());
log.info("create结束");
}
3.3 案例三:递归创建节点
.creatingParentsIfNeeded()
实现,可以递归创建节点@Test public void testCreate3() throws Exception { // 递归创建节点 client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node2/node33", "data".getBytes()); log.info("create结束"); }
3.4 案例四:异步方法创建节点
- 在此说明一下,方法接收到的第一个参数
curatorFramework
实际上就是客户端;curatorFramework
保存了一些查询的结果。@Test public void testCreate4() throws Exception { // 异步方式创建节点 client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { log.info(curatorFramework == client); // true log.info("getResultCode(): " + curatorEvent.getResultCode()); // 0表示创建成功 log.info("getType(): " + curatorEvent.getType().toString()); // 获取操作类型 CREATE log.info("getPath(): " + curatorEvent.getPath()); // 获取节点路径 } }) .forPath("/node2/node38", "data".getBytes()); log.info("create结束"); }
4. 更新节点
4.1 案例一:更新一个节点
@Test
public void testSet() throws Exception {
client.setData()
.forPath("/node", "set".getBytes());
log.info("设置完成");
}
4.2 案例二:带版本更新一个节点
@Test
public void testSet2() throws Exception {
client.setData()
.withVersion(1) // 带有版本号
.forPath("/node", "12".getBytes());
log.info("设置完成");
}
4.3 案例三:带回调方法更新一个节点
@Test
public void testSet3() throws Exception {
client.setData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getResultCode()); // 0
log.info(curatorEvent.getType()); // SET_DATA
log.info(curatorEvent.getPath()); // /node
log.info(curatorEvent.getStat().toString()); // 21474836489,21474836542,1620040487612,1620042328488,4,0,0,0,3,0,21474836489
}
})
.forPath("/node", "432".getBytes());
log.info("设置完成");
}
5. 删除节点
5.1 案例一:删除一个节点
@Test
public void testDelete() throws Exception {
client.delete()
.forPath("/node");
log.info("删除结束");
}
5.2 案例二:递归删除节点
@Test
public void testDelete1() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.forPath("/node2");
log.info("删除结束");
}
5.3 案例三:带回调方法删除一个节点
@Test
public void testDelete3() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType()); // DELETE
log.info(curatorEvent.getPath()); // /node1
}
})
.forPath("/node1");
log.info("删除结束");
}
6. 查看节点
6.1 案例一:查看一个节点
@Test
public void testGet() throws Exception {
byte[] data = client.getData()
.forPath("/node2");
log.info(new String(data));
}
6.2 案例二:查看节点的值和状态
@Test
public void testGet2() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData()
.storingStatIn(stat)
.forPath("/node2");
log.info(new String(data));
log.info(stat.getVersion());
}
6.3 案例三:带回调方法查看一个节点
@Test
public void testGet3() throws Exception {
client.getData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(new String(curatorEvent.getData())); // 4134134
log.info(curatorEvent.getStat().toString()); // 21474836566,21474836566,1620042863998,1620042863998,0,0,0,0,7,0,21474836566
log.info(curatorEvent.getType().toString()); // GET_DATA
}
})
.forPath("/node2");
}
7. 查看子节点
7.1 案例一:查看一个节点的所有子节点
@Test
public void testChildren() throws Exception {
List<String> children = client.getChildren()
.forPath("/");
log.info(children.toString());
}
7.2 案例二:带回调方法查看一个节点的所有子节点
@Test
public void testChildren2() throws Exception {
client.getChildren()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getPath()); // /
log.info(curatorEvent.getType().toString()); // CHILDREN
log.info(curatorEvent.getChildren().toString()); // [node, node2, node3]
}
})
.forPath("/");
}
8. 检查节点是否存在
8.1 案例一:检查一个节点是否存在
@Test
public void testExists() throws Exception {
Stat stat = client.checkExists()
.forPath("/node");
if (stat != null)
log.info(stat.toString());
else
log.info("节点不存在");
}
8.2 案例二:带回调方法检查一个节点是否存在
@Test
public void testExists1() throws Exception {
client.checkExists()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType().toString()); // EXISTS
Stat stat = curatorEvent.getStat();
if (stat != null)
log.info(stat.toString()); // 21474836548,21474836548,1620042534164,1620042534164,0,0,0,0,0,0,21474836548
else
log.info("节点不存在");
}
})
.forPath("/node");
}
9. Watcher
- curator提供了两种Watcher(Cache)来监听结点的变化
NodeCache
: 只是监听某一个特定的节点,监听节点的新增、修改数据、删除。(子节点的新增、删除、修改均不会管)PathChildrenCache
: 监控一个ZNode的子节点. 当一个子节点增加、修改数据、删除时, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态- 这个监视器可以多次使用
9.1 案例一:NodeCache
@Test
public void testWatch() throws Exception {
// 观察节点的变化
NodeCache nodeCache = new NodeCache(client, "/node22");
nodeCache.start();
nodeCache.getListenable()
.addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
log.info(currentData.getPath());
log.info(new String(currentData.getData()));
} else {
log.info("删除了某个节点");
}
}
});
Thread.sleep(60000); //睡30s
nodeCache.close();
}
9.2 案例二:PathChildrenCache
@Test
public void testWatch2() throws Exception {
// 观察节点的变化
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22", true);
pathChildrenCache.start();
pathChildrenCache.getListenable()
.addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
log.info(pathChildrenCacheEvent.getType()); // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
log.info(pathChildrenCacheEvent.getData().toString()); // ChildData{path='/node22/child', stat=21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630, data=[50, 50]}
log.info(new String(pathChildrenCacheEvent.getData().getData()));
log.info(pathChildrenCacheEvent.getData().getPath()); // ChildData{path='/node22/child'
log.info(pathChildrenCacheEvent.getData().getStat().toString()); // 21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630
}
});
Thread.sleep(60000); //睡30s
pathChildrenCache.close();
}
10. 事务
10.1 案例一:使用事务创建两个节点
@Test
public void testTransaction() throws Exception {
client.inTransaction()
.create().forPath("/node100", "100".getBytes())
.and() // 桥
.create().forPath("/node101", "101".getBytes())
.and() // 桥
.commit(); // 提交
log.info("提交成功");
}
11. 分布式锁
11.1 使用分布式可重入排它锁
- 排它锁,就是所有人都争抢同一个锁节点
/lock
,请求的时候,会在/lock
内部添加一个顺序节点,当轮到自己的时候,就可以继续执行;否则就阻塞。释放锁的时候,会删除自己增加的顺序节点。(基本实现原理与分布式锁基本一致)@Test public void testMutex() throws Exception { // 排他锁 InterProcessLock lock = new InterProcessMutex(client, "/lock"); log.info("等待获取锁对象"); lock.acquire(); for (int i = 0; i < 3; i++) { Thread.sleep(3000); System.out.println(i); } lock.release(); log.info("释放锁"); }
11.2 使用读写锁
- 读锁和写锁是两种类型的锁,但是如果两者争抢同一个锁节点的时候,也会发生一些有趣的事情。
- 当读锁进入之后,其他的读锁也可以进入;但是写锁只能在外面等;
- 当写锁进入之后,读写锁都不能进入。
/** * 读锁在运行的时候,写锁不允许工作,在阻塞。 * 读锁运行的时候,允许另一个读锁也进入读数据 * 写锁运行时,其他读写锁都不能进入 * @throws Exception */ @Test public void testReadLock() throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock"); InterProcessLock readLock = interProcessReadWriteLock.readLock(); log.info("等待获取读锁对象"); readLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } readLock.release(); log.info("释放锁"); }
@Test public void testWriteLock() throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock"); InterProcessLock writeLock = interProcessReadWriteLock.writeLock(); log.info("等待获取写锁对象"); writeLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } writeLock.release(); log.info("释放锁"); }
标签:info,throws,log,框架,void,Zookeeper,curator,public,节点 来源: https://blog.csdn.net/weixin_42524843/article/details/116380642