其他分享
首页 > 其他分享> > 分布式框架之ZooKeeper系列(三) 实现分布式锁

分布式框架之ZooKeeper系列(三) 实现分布式锁

作者:互联网

  通过ZooKeeper的有序节点、节点路径不回重复、还有节点删除会触发Wathcer事件的这些特性,我们可以实现分布式锁。

一、思路

  1. zookeeper中创建一个根节点Locks,用于后续各个客户端的锁操作。
  2. 当要获取锁的时候,在Locks节点下创建“Lock_序号”的零时有序节点(临时节点为了客户端突发断开连接,则此节点消失)。
  3. 如果没有得到锁,就监控排在自己前面的序号节点,等待它的释放。
  4. 当前面的锁被释放后,触发Process方法,然后继续获取当前子节点,判断当前节点是不是第一个,是 返回锁,否 获取锁失败。

二、实现

  在实现是要了解一个类 AutoResetEvent。AutoResetEvent 常常被用来在两个线程之间进行信号发送。它有两个重要的方法:

  Set() :发送信号到等待线程以继续其工作。

  bool WaitOne():等待另一个线程发信号,只有收到信号,线程才继续往下执行 ,会一直等待下去,返回值表示是否收到信号。

  bool WaitOne(int millisecondsTimeout):等待指定时间,如果没有收到信号继续执行,返回值表示是否收到信号。

下面为具体实现方法:

public class ZooKeeperLock
    {
        private MyWatcher myWatcher;

        private string lockNode;

        private org.apache.zookeeper.ZooKeeper zooKeeper;

        public ZooKeeperLock()
        {
            myWatcher = new MyWatcher();
        }

        /// <summary>
        /// 获取锁
        /// </summary>
        /// <param name="millisecondsTimeout">等待时间</param>
        /// <returns></returns>
        public async Task<bool> TryLock(int millisecondsTimeout = 0)
        {
            try
            {
                zooKeeper = new org.apache.zookeeper.ZooKeeper("127.0.0.1", 50000, new MyWatcher());

                //创建锁节点
                if (await zooKeeper.existsAsync("/Locks") == null)
                    await zooKeeper.createAsync("/Locks", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

                //新建一个临时锁节点
                lockNode = await zooKeeper.createAsync("/Locks/Lock_", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

                //获取锁下所有节点
                var lockNodes = await zooKeeper.getChildrenAsync("/Locks");

                lockNodes.Children.Sort();

                //判断如果创建的节点就是最小节点 返回锁
                if (lockNode.Split("/").Last() == lockNodes.Children[0])
                    return true;
                else
                {
                    //当前节点的位置
                    var location = lockNodes.Children.FindIndex(n => n == lockNode.Split("/").Last());
                    //获取当前节点 前面一个节点的路径
                    var frontNodePath = lockNodes.Children[location - 1];
                    //在前面一个节点上加上Watcher ,当前面那个节点删除时,会触发Process方法
                    await zooKeeper.getDataAsync("/Locks/" + frontNodePath, myWatcher);

                    //如果时间为0 一直等待下去
                    if (millisecondsTimeout == 0)
                        myWatcher.AutoResetEvent.WaitOne();
                    else //如果时间不为0 等待指定时间后,返回结果
                    {
                        var result = myWatcher.AutoResetEvent.WaitOne(millisecondsTimeout);

                        if (result)//如果返回True,说明在指定时间内,前面的节点释放了锁(但是可能是中间节点主机宕机 导致,所以不一定触发了Process方法就是得到了锁。需要重新判断是不是第一个节点)
                        {
                            //获取锁下所有节点
                            lockNodes = await zooKeeper.getChildrenAsync("/Locks");
                            //判断如果创建的节点就是最小节点 返回锁
                            if (lockNode.Split("/").Last() == lockNodes.Children[0])
                                return true;
                            else
                                return false;
                        }
                        else
                            return false;

                    }
                }
            }
            catch (KeeperException e)
            {
                await UnLock();
                throw e;
            }
            return false;
        }

        /// <summary>
        /// 释放锁
        /// </summary>
        /// <returns></returns>
        public async Task UnLock()
        {
            try
            {
                myWatcher.AutoResetEvent.Dispose();await zooKeeper.deleteAsync(lockNode);
            }
            catch (KeeperException e)
            {
                throw e;
            }
        }

    }

 Process方法实现:

 public class MyWatcher : Watcher
    {
        public AutoResetEvent AutoResetEvent;

        public MyWatcher()
        {
            this.AutoResetEvent = new AutoResetEvent(false);
        }

        public override Task process(WatchedEvent @event)
        {
            if (@event.get_Type() == EventType.NodeDeleted)
            {
                AutoResetEvent.Set();
            }
            return null;
        }
    }

本文源代码在:分布式实现代码

如果你认为文章写的不错,就点个【推荐】吧

标签:zooKeeper,框架,AutoResetEvent,ZooKeeper,public,Locks,节点,await,分布式
来源: https://www.cnblogs.com/MicroHeart/p/10453855.html