其他分享
首页 > 其他分享> > Zookeeper:五、实现分布式锁

Zookeeper:五、实现分布式锁

作者:互联网

前言

Zookeeper实现分布式锁。


文章目录

一、实现思路

在这里插入图片描述

二、代码实现

/**
 * 分布式锁的实现
 */
public class DistributeClock {
    // Zookeeper客户端
    private ZooKeeper zk = null;
    // 等待连接处理
    private CountDownLatch connectlatch = new CountDownLatch(1);
    // 等待前一个节点删除执行完毕
    private CountDownLatch preNodelatch = new CountDownLatch(1);
    // 远程服务器地址
    private String hostName = "192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181";
    // 连接建立时间
    private int connectTimeout = 60000;
    // 前一个被监听的节点名称
    private String waitPath;
    // 当前节点的名称
    private String currentPath;

    public DistributeClock() throws Exception{
        //1.获取连接
        zk = new ZooKeeper(hostName, connectTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                // 状态为连接建立完毕
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    connectlatch.countDown();
                }
                // 监听前一个节点的处理完毕
                if(watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                    preNodelatch.countDown();
                }
            }
        });
        //2.等待连接建立完毕
        connectlatch.await();
        //3.连接建立完毕,判断根节点 /locks是否存在,不存在则创建该根节点
        Stat stat = zk.exists("/locks", false);
        if(stat == null){
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 请求加锁
     */
    public void zkLock(){
        try {
            // 创建带序号的临时节点,同时获取到该根路径下的子节点
            currentPath = zk.create("/locks" + "/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> childNodes = zk.getChildren("/locks", true);
            // 只有1个节点直接返回
            if(childNodes.size() == 1){
                return;
            }
            // 多个节点时,当前节点监听前一个节点
            else{
                // 节点名称:seq-00000000
                String thisNode = currentPath.substring("/locks/".length());
                Collections.sort(childNodes);
                int index = childNodes.indexOf(thisNode);
                if(index == -1){
                    throw new RuntimeException("数据异常");
                }
                else if(index == 0){
                    return;
                }
                else{
                    waitPath = "/locks/"+childNodes.get(index-1);
                    zk.getData(waitPath,true,new Stat());
                    preNodelatch.await();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 释放锁
     */
    public void releaseLock(){
        try {
            zk.delete(currentPath,-1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意以下几点:

三、成熟的框架——Curator

自定义的分布式锁存在很多问题,比如需要通过CountDownLatch实现异步连接控制,还需要解决反复重连等问题,因此可以直接使用封装成熟的框架Curator来实现分布式锁的调用。
详细学习Curator

public class CuratorTest {
    private static String hostAddress = "192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181";

    public static void main(String[] args) {
        // 创建分布式锁1
        final InterProcessMutex clock1 = new InterProcessMutex(connectClient(), "/clocks");
        // 创建分布式锁2
        final InterProcessMutex clock2 = new InterProcessMutex(connectClient(), "/clocks");

        // 尝试获取锁
        new Thread(new Runnable() {
            public void run() {
                try {
                    clock1.acquire();
                    System.out.println("-----------客户端1成功获取到分布式锁------------");
                    clock1.release();
                    System.out.println("-----------客户端1成功释放分布式锁------------");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            public void run() {
                try {
                    clock2.acquire();
                    System.out.println("-----------客户端2成功获取到分布式锁------------");
                    clock2.release();
                    System.out.println("-----------客户端2成功释放分布式锁------------");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 建立客户端连接
    private static CuratorFramework connectClient(){
        /**
         * 建立重试策略
         * 间隔时间、最大重试次数
         */
        RetryPolicy policy = new ExponentialBackoffRetry(3000,3);

        /**
         * 建立连接
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(hostAddress)
                .connectionTimeoutMs(60000)
                .sessionTimeoutMs(60000)
                .retryPolicy(policy).build();
        // 启动Client实例
        client.start();
        return client;
    }
}

标签:实现,Zookeeper,private,public,locks,new,节点,分布式
来源: https://blog.csdn.net/qq_43684985/article/details/118878596