其他分享
首页 > 其他分享> > Zookeeper curator框架

Zookeeper curator框架

作者:互联网

1. 简介

1.1 原生zookeeperAPI的不足

1.2 curator特点

1.3 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <type>jar</type>
    </dependency>
</dependencies>

2. 连接与关闭

2.1 测试模版

3. 新增节点

3.1 案例一:简单创建

  @Test
  public void testCreate() throws Exception {
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node", "data".getBytes());
    log.info("create结束");
  }

3.2 案例二:自定义权限创建

  @Test
  public void testCreate2() throws Exception {
    Id ip = new Id("ip", "192.168.233.133");
    List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acl)
            .forPath("/node1", "data".getBytes());
    log.info("create结束");
  }

3.3 案例三:递归创建节点

3.4 案例四:异步方法创建节点

4. 更新节点

4.1 案例一:更新一个节点

  @Test
  public void testSet() throws Exception {
    client.setData()
            .forPath("/node", "set".getBytes());
    log.info("设置完成");
  }

4.2 案例二:带版本更新一个节点

  @Test
  public void testSet2() throws Exception {
    client.setData()
            .withVersion(1)  // 带有版本号
            .forPath("/node", "12".getBytes());
    log.info("设置完成");
  }

4.3 案例三:带回调方法更新一个节点

  @Test
  public void testSet3() throws Exception {
    client.setData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getResultCode());  // 0
                log.info(curatorEvent.getType());  // SET_DATA
                log.info(curatorEvent.getPath());  // /node
                log.info(curatorEvent.getStat().toString());  // 21474836489,21474836542,1620040487612,1620042328488,4,0,0,0,3,0,21474836489
              }
            })
            .forPath("/node", "432".getBytes());
    log.info("设置完成");
  }

5. 删除节点

5.1 案例一:删除一个节点

  @Test
  public void testDelete() throws Exception {
    client.delete()
            .forPath("/node");
    log.info("删除结束");
  }

5.2 案例二:递归删除节点

  @Test
  public void testDelete1() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .forPath("/node2");
    log.info("删除结束");
  }

5.3 案例三:带回调方法删除一个节点

  @Test
  public void testDelete3() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType());  // DELETE
                log.info(curatorEvent.getPath());  // /node1
              }
            })
            .forPath("/node1");
    log.info("删除结束");
  }

6. 查看节点

6.1 案例一:查看一个节点

  @Test
  public void testGet() throws Exception {
    byte[] data = client.getData()
            .forPath("/node2");
    log.info(new String(data));
  }

6.2 案例二:查看节点的值和状态

  @Test
  public void testGet2() throws Exception {
    Stat stat = new Stat();
    byte[] data = client.getData()
            .storingStatIn(stat)
            .forPath("/node2");
    log.info(new String(data));
    log.info(stat.getVersion());
  }

6.3 案例三:带回调方法查看一个节点

  @Test
  public void testGet3() throws Exception {
    client.getData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(new String(curatorEvent.getData()));  // 4134134
                log.info(curatorEvent.getStat().toString());  // 21474836566,21474836566,1620042863998,1620042863998,0,0,0,0,7,0,21474836566
                log.info(curatorEvent.getType().toString());  // GET_DATA
              }
            })
            .forPath("/node2");
  }

7. 查看子节点

7.1 案例一:查看一个节点的所有子节点

  @Test
  public void testChildren() throws Exception {
    List<String> children = client.getChildren()
            .forPath("/");
    log.info(children.toString());
  }

7.2 案例二:带回调方法查看一个节点的所有子节点

  @Test
  public void testChildren2() throws Exception {
    client.getChildren()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getPath()); // /
                log.info(curatorEvent.getType().toString());  // CHILDREN
                log.info(curatorEvent.getChildren().toString());  // [node, node2, node3]
              }
            })
            .forPath("/");
  }

8. 检查节点是否存在

8.1 案例一:检查一个节点是否存在

  @Test
  public void testExists() throws Exception {
    Stat stat = client.checkExists()
            .forPath("/node");
    if (stat != null)
      log.info(stat.toString());
    else
      log.info("节点不存在");
  }

8.2 案例二:带回调方法检查一个节点是否存在

  @Test
  public void testExists1() throws Exception {
    client.checkExists()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType().toString());  // EXISTS
                Stat stat = curatorEvent.getStat();
                if (stat != null)
                  log.info(stat.toString());  // 21474836548,21474836548,1620042534164,1620042534164,0,0,0,0,0,0,21474836548
                else
                  log.info("节点不存在");
              }
            })
            .forPath("/node");
  }

9. Watcher

9.1 案例一:NodeCache

  @Test
  public void testWatch() throws Exception {
    //  观察节点的变化
    NodeCache nodeCache = new NodeCache(client, "/node22");
    nodeCache.start();
    nodeCache.getListenable()
            .addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
                  log.info(currentData.getPath());
                  log.info(new String(currentData.getData()));
                } else {
                  log.info("删除了某个节点");
                }
              }
            });
    Thread.sleep(60000); //睡30s
    nodeCache.close();
  }

9.2 案例二:PathChildrenCache

  @Test
  public void testWatch2() throws Exception {
    //  观察节点的变化
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22", true);
    pathChildrenCache.start();
    pathChildrenCache.getListenable()
            .addListener(new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                log.info(pathChildrenCacheEvent.getType());  // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
                log.info(pathChildrenCacheEvent.getData().toString());  // ChildData{path='/node22/child', stat=21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630, data=[50, 50]}
                log.info(new String(pathChildrenCacheEvent.getData().getData()));
                log.info(pathChildrenCacheEvent.getData().getPath());  // ChildData{path='/node22/child'
                log.info(pathChildrenCacheEvent.getData().getStat().toString());  // 21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630
              }
            });
    Thread.sleep(60000); //睡30s
    pathChildrenCache.close();
  }

10. 事务

10.1 案例一:使用事务创建两个节点

  @Test
  public void testTransaction() throws Exception {
    client.inTransaction()
            .create().forPath("/node100", "100".getBytes())
            .and()  // 桥
            .create().forPath("/node101", "101".getBytes())
            .and()  // 桥
            .commit();  // 提交
    log.info("提交成功");
  }

11. 分布式锁

11.1 使用分布式可重入排它锁

11.2 使用读写锁

标签:info,throws,log,框架,void,Zookeeper,curator,public,节点
来源: https://blog.csdn.net/weixin_42524843/article/details/116380642