其他分享
首页 > 其他分享> > 分布式协调-Zookeeper使用(Watcher、Curator、Session、Acl)

分布式协调-Zookeeper使用(Watcher、Curator、Session、Acl)

作者:互联网

分布式协调-Zookeeper使用(Watcher、Session、Curator、Acl)

前面说到zk可以为shardingSphere当做动态配置的一个中间件,然后聊了一下zk的大体介绍,本篇咱们聊聊他的一些常见的特性,并且对其进行相关阐释,同时使用Curator作为Demo。本篇会聊到:

  • 【State】:zk上每个节点除了存储了节点数据,同时也存储了一些节点的状态信息。我们会分析一下。
  • 【Watcher(发布订阅)】:在shardSphere使用zk作为分布式配置的那一篇,我们在zk上手动修改了配置,然后ShardingSphere就可以感知到,其底层就是基于这个特性来做的。
  • 【Session】:客户端和session建立连接的流程。
  • 【Acl】(权限控制):因为不是谁都有权限对zk上的node进行操作的,一旦操作不当,系统就可能宕机。
  • 【Curator】:对zk的api封装的一个客户端框架。

Stat(每个节点的状态和信息)

 
czxid 表示该数据节点被创建时的事务ID
mzxid 表示该节点最后一次被更新时的事务ID
ctime 表示节点被创建的时间
mtime 表示该节点最后一次被更新的时间
version 数据节点的版本号,这里其实是一种乐观锁。我们每次修改一个节点数据的时候,节点的version就会增加。那每个客户端在修改节点的时候,带一个version,当他们传递的version和当前version不一致的时候,就修改失败。
cversion 子结点的版本号
aversion 节点的ACL版本号
ephemeralOwner 创建该临时节点的会话的sessionID。如果该节点是持久节点,那么这个属性值为0
dataLength 数据内容的长度
numChildren 当前节点的子节点个数
pzxid 表示该节点的子节点里最后一次被修改时的事务ID。

version 】:像上面解释的一样,现在我们的版本seq的版本是2,我们修改时带上版本是1就无法修改。

Watcher

我们的shardingSphere使用zk作为配置中心,当在zk上修改了配置后,shardingSphere就能感知到,就是通过watcher做的,zk实际上是和我们的客户端建立连接,并且主动通知客户端有数据修改了。我们这里举个例子。

对某个节点建立一个监听:所有命令带w的都是可以进行监听的

 比如:我们在get的时候对某个节点进行监听,那么当其他客户端对我们get的这个数据进行操作的时候,我们对这个节点监听的节点就会收到消息。

现在我们使用客户端1对seq这个节点get的时候进行监听

然后使用客户端2对这个节点的值进行修改

 这个时候节点一就能收到被监听节点的修改信息

 问题是当我们再次这个数据进行修改的时候,修改的信息并没有被监听到,这也就是说,这种方式只是一次性监听。那如何进行每次修改都被监听到呢?

  • 循环监听:我们看到,当收到信息时候,我们是可以知道哪个节点被修改了,那我们就可以拿到这个节点再次进行监听。
  • addwatch:我们发现它命令中有一个addwatch,这个就是实现持续监听的方法。

 

持久化监听里面提供了两种方式:

【PERSISTENT】:持久化订阅,针对当前节点的修改和删除事件,以及当前节点的子节点的删除和新增事件   【PERSISTENT_RECURSIVE】:持久化递归订阅,在PERSISTENT的基础上,增加了子节点修改的事件触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性)   那了解了这个特性就知道,我们的shardingSphere指定是对rules(存储他的配置文件的及节点)进行了addwather的监听,这样当我们修改了rules的数据,监听这个节点的那些个shardingSphere就收到了信息啦。

Session(当客户端连接zkServer的时候,是一个异步的状态.)

  • 客户端向Zookeeper Server发起连接请求,此时状态为CONNECTING
  • 当连接建立好之后,Session状态转化为CONNECTED,此时可以进行数据的IO操作
  • 如果Client和Server的连接出现丢失,则Client又会变成CONNECTING状态
  • 如果会话过期或者主动关闭连接时,此时连接状态为CLOSE
  • 如果是身份验证失败,直接结束 

 Curator(使用java操作zk)

pom

    <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
View Code

以下的代码,就是对zk进行操作的一个案例,包含了增删改查。curator提供了两种方式,同步和异步的操作。

【增删改查同步操作】

   // 启动以及连接zk
    private CuratorOperationExample(){
        curatorFramework= CuratorFrameworkFactory
                .builder()
                .connectionTimeoutMs(20000)
                .connectString("192.168.43.3:2181") //读写分离(zookeeper-server)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .sessionTimeoutMs(15000)
                .build();
        curatorFramework.start(); //启动
    }

    // 对节点进行操作
    private void nodeCRUD() throws Exception {
        System.out.println("开始针对节点的CRUD操作");
        //这个要给节点中存储的数据
        String value="Hello World";
        //创建一个节点
        String node=curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/node",value.getBytes());
        System.out.println("节点创建成功:"+node);
        //存储状态信息的对象
        Stat stat=new Stat();
        //获取节点的value
        byte[] data=curatorFramework.getData().storingStatIn(stat).forPath(node);
        System.out.println("节点value值:"+new String(data));
        //这里使用先查询,然后修改,因为可能有多个客户端同时连接。可能存在锁的问题。这里查询出来version然后再进行操作
        stat=curatorFramework.setData()
                .withVersion(stat.getVersion())
                .forPath(node,"Update Date Result".getBytes());
        String result=new String(curatorFramework.getData().forPath(node));
        System.out.println("修改节点之后的数据:"+result);
        System.out.println("开始删除节点");
        curatorFramework.delete().forPath(node);
        Stat existStat=curatorFramework.checkExists().forPath(node);
        if(existStat==null){
            System.out.println("节点删除成功");
        }
    }
View Code

【异步增操作】 

    // 创建一个节点的异步方式,其他的操作都有对应的api,
    // 这里的CountDownLatch是为了不让线程直接跑下去,
    // 要是直接跑下去的话,就看不到创建的节点了,因为是异步的,而当执行到获取节点的时候,可能还没有创建好,只是为了看见这个节点内容而已,没有任何作用。
    public void asyncCRUD() throws Exception {
        CountDownLatch countDownLatch=new CountDownLatch(1);
        // ZK会回调BackgroundCallback里面的方法进行回调
        String node=curatorFramework.create().withMode(CreateMode.PERSISTENT)
                .inBackground((session,event)->{
            System.out.println(Thread.currentThread().getName()+":执行创建节点:"+event.getPath());
            countDownLatch.countDown(); //触发回调,递减计数器
        }).forPath("/async-node");
        countDownLatch.await();
    }
View Code

【Acl操作:digest】digest为例 因为他是对于每次会话的授权,那我们在建立连接的时候就要加上,【.authorization("digest","glen:glen".getBytes())】

    //创酱一个acl节点
    //scheme 是digest id是glen:glen 权限是ala
    private void aclOperation() throws Exception {
        Id id=new Id("digest", DigestAuthenticationProvider.generateDigest("glen:glen"));
        List<ACL> acls=new ArrayList<>();
        acls.add(new ACL(ZooDefs.Perms.ALL,id));
        String node=curatorFramework.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(acls,false).forPath("/curator-auth","Auth".getBytes());
        System.out.println("创建带有权限节点:"+node);
        System.out.println("数据查询结果:"+new String(curatorFramework.getData().forPath(node)));
    }
View Code

一次性监听

    // 一次性监听
    public void normalWatcher() throws Exception {
        CuratorWatcher curatorWatcher=new CuratorWatcher() {
            @Override
            public void process(WatchedEvent watchedEvent) throws Exception {
                System.out.println("监听到的事件"+watchedEvent.toString());
                //循环设置监听
                curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath());
            }
        };
        // 创酱一个节点
        String node=curatorFramework.create().forPath("/watcher","Watcher String".getBytes());
        System.out.println("节点创建成功:"+node);
        //设置一次普通的watcher监听
        String data=new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node));
        System.out.println("设置监听并获取节点数据:"+data);
        //第一次操作才会触发监听,而第二次不会。所以上面在回调方法中我们设置了一个循环监听。
        curatorFramework.setData().forPath(node,"change data 0".getBytes());
        Thread.sleep(1000);
        curatorFramework.setData().forPath(node,"change data 1".getBytes());
    }
View Code

持续化监听

    //持久化监听
    //node :要监听的节点名称
    private void persisWatcher(String node){
        CuratorCache curatorCache=CuratorCache.
                //实例;去监听的节点;操作类型(单节点缓存,对数据进行压缩,关闭或不清理缓存)
                build(curatorFramework,node, CuratorCache.Options.SINGLE_NODE_CACHE);
        //这里可以设置对于事件的监听类型
        CuratorCacheListener listener=CuratorCacheListener
                .builder()
                //这里是我们自己写的一个类,他会调用我们类的方法,这个实现了他的CuratorCacheListener接口
                .forAll(new ZookeeperWatcherListener())
                .build();
        //把事件监听添加进去
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
    }
View Code

测试

//这里对节点进行操作,看看是不是会别检测到事件
    private void operation(String node) throws Exception {
        curatorFramework.create().forPath(node);
        curatorFramework.setData().forPath(node,"hello".getBytes());
        curatorFramework.delete().forPath(node);
    }

    public static void main(String[] args) throws Exception {
        ZookeeperWatchExample zookeeperWatchExample=new ZookeeperWatchExample();
        String node="/persis-node";
        zookeeperWatchExample.persisWatcher(node);
        zookeeperWatchExample.operation(node);
        //让main方法等待
        System.in.read();
    }
View Code

ACL权限控制

zk针对节点提供了权限的控制,这是因为要规避有人不小心删除了某个节点,而导致整个系统出现问题的情况。他和linux的权限控制相似。

他的权限标志符是这样的:【scheme:id:perm

比如我们获取seq节点,他的scheme就是world(全部都能访问),id就是所有人,permission就是增、删、改、查、管理

  • Scheme(权限模式),标识授权策略,即表示通过什么样子的方式去控制权限。
    • 【world】:默认方式,相当于全部都能访问。
    • 【auth】:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
    • 【digest】:即用户名:密码这种方式认证
    • 【ip】:通过ip地址来做权限控制。
  • ID(授权对象):比如说我们的scheme是ip,那这里就填写ip,如果是digest,那就填写用户名和密码
  • Permission:授予的权限 (c) create . (d)delete (r)read (w)write  (a)admin 

【world】通过get和set【acl】命令去修改一个节点的权限,一个world的例子。

auth】 对登录用户进行授权,授权后可以操作授权的节点,而退出客户端后,再次进入就需要再次授权了。前面的glen是用户名 后面的glen是密码  

 当退出后,我们就无法操作atuh这个节点,必须再次进行授权了。

 

标签:node,String,Zookeeper,Curator,curatorFramework,监听,forPath,Watcher,节点
来源: https://www.cnblogs.com/UpGx/p/15569174.html