其他分享
首页 > 其他分享> > zookeeper分布式锁,解决了羊群效应,

zookeeper分布式锁,解决了羊群效应,

作者:互联网

zookeeper 实现分布式锁,监听前一个节点来避免羊群效应,
思路:很简单,但是实现起来要麻烦一些, 而且我也是看了很多帖子,发现很多帖子的代码,下载下来逐步调试之后发现,看起来是对的,但在并发情况下运行,就会出现问题. 有的在代码里其实并没有真正实现(监听前一个节点),

接下来分享一个: 真正的zookeeper 分布式锁: 这个也是别人的代码,自己只是搬运工,很遗憾,我自己写的分布式锁,看起来是对的,但是出现了并发问题,

    package tech.codestory.zookeeper.aalvcai.base_I0Itec_ZK_lock;
    
    import lombok.extern.slf4j.Slf4j;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    /**
     * @author 邱润泽
     **/
    @Slf4j
    public class ZookeeperLock {
        private String server = "127.0.0.1:2181";
        private ZkClient zkClient;
        private static final String rootPath = "/qiurunze-lock1";
    
        public ZookeeperLock() {
            zkClient = new ZkClient(server, 5000, 20000);
            buildRoot();
        }
    
        // 构建根节点
        public void buildRoot() {
            if (!zkClient.exists(rootPath)) {
                zkClient.createPersistent(rootPath);
            }
        }
        // 获取锁
        public Lock lock(String lockId, long timeout) {
            // 创建临时节点
            Lock lockNode = createLockNode(lockId);
            lockNode = tryActiveLock(lockNode);// 尝试激活锁
            if (!lockNode.isActive()) {
                try {
                    synchronized (lockNode) {
                        lockNode.wait(timeout); // 线程锁住
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            if (!lockNode.isActive()) {
                throw new RuntimeException(" lock  timeout");
            }
            return lockNode;
        }
    
        // 释放锁
        public void unlock(Lock lock) {
            if (lock.isActive()) {
                zkClient.delete(lock.getPath());
            }
        }
    
        // 尝试激活锁
        private Lock tryActiveLock(Lock lockNode) {
    
            // 获取根节点下面所有的子节点
            List<String> list = zkClient.getChildren(rootPath)
                    .stream()
                    .sorted()
                    .map(p -> rootPath + "/" + p)
                    .collect(Collectors.toList());      // 判断当前是否为最小节点
            log.info("Thread: {}, list : {}",Thread.currentThread().getName(),list);
    
            String firstNodePath = list.get(0);
            log.info("Thread: {}, firstNodePath: {}",Thread.currentThread().getName(),firstNodePath);
            // 最小节点是不是当前节点
            if (firstNodePath.equals(lockNode.getPath())) {
                lockNode.setActive(true);
            } else {
                String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
                log.info("Thread: {},监听的节点是: {}",Thread.currentThread().getName(),upNodePath);
                zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                    }
                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        // 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
                        System.out.println("节点删除:" + dataPath);
                         Lock lock = tryActiveLock(lockNode);
                        synchronized (lockNode) {
                            if (lock.isActive()) {
                                lockNode.notify(); // 释放了
                            }
                        }
                        zkClient.unsubscribeDataChanges(upNodePath, this);
                    }
                });
            }
            return lockNode;
        }
    
    
        public Lock createLockNode(String lockId) {
            String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
            return new Lock(lockId, nodePath);
        }
    }
    
    
    class Test01{
        static volatile int num = 0;
        static ZookeeperLock zookeeperLock = new ZookeeperLock();
        public static void main(String[] args){
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    try {
                        Lock zkLock = zookeeperLock.lock("lvcai", 5000 );
                        TimeUnit.MILLISECONDS.sleep(100);
                        for (int j = 0; j < 10; j++) {
                            num++;
                        }
                        System.out.println( "num的值是 : "+ num );
                        zookeeperLock.unlock(zkLock);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },"线程"+i).start();
            }
        }
    }
    
    
    class Lock {
        private String lockId;
        private String path;
        private boolean active;
        public Lock(String lockId, String path) {
            this.lockId = lockId;
            this.path = path;
        }
    
        public Lock() {
        }
    
        public String getLockId() {
            return lockId;
        }
    
        public void setLockId(String lockId) {
            this.lockId = lockId;
        }
    
        public String getPath() {
            return path;
        }
    
        public void setPath(String path) {
            this.path = path;
        }
    
        public boolean isActive() {
            return active;
        }
    
        public void setActive(boolean active) {
            this.active = active;
        }
    }

标签:lockNode,lockId,Lock,zookeeper,节点,羊群,分布式,public,String
来源: https://www.cnblogs.com/lvcai/p/13974803.html