数据库
首页 > 数据库> > 分布式锁:Redisson源码解析-FairLock

分布式锁:Redisson源码解析-FairLock

作者:互联网

分布式锁:Redisson源码解析-FairLock

一、FairLock是什么

前面的篇章中,我们输出过Redisson分布式锁核心代码的类图,可以观察到FairLock是基于RedissonLock的子类,也就是基于RedissonLock来实现了一些其他的特性

核心概念

相比与之前的ReentrantLock和现在的FairLock,顾名思义,就很明显的可以发现是表示的公平锁,从之前的案例中可以发现并没有对加锁失败后竞争锁的线程进行顺序上的控制,而是任由线程竞争,也就是非公平的,而FairLock会通过一系列的机制来控制,获取到锁的顺序会和请求获取锁的顺序是一致的

代码实现

// 获取一个FairLock
RLock fairLock = redissonClient.getFairLock("lockName");
// 加锁
fairLock.lock();
// 释放锁
fairLock.unlock();

提出问题

加锁

既然FairLock是可以实现加锁的顺序的,那么整个加锁的流程就会让人好奇了,到底是如何加锁的,才能保证加锁的一致性

释放锁

释放锁有两种情况,一个是主动释放,只有锁已被持有任务完成,会主动执行unlock指令来释放锁;还有一种就是被动释放,超时锁自动释放,也就是设置了leaseTime,倒是watchdog没有再次启动的情况

二、源码解析

初始化

// 命令执行器
this.commandExecutor = commandExecutor;
// threadWaitTime:60000*5 是直接在代码中初始化的这个时间 这里都5分钟了
this.threadWaitTime = threadWaitTime;
// 队列名称:redisson_lock_queue:{lockName}
threadsQueueName = prefixName("redisson_lock_queue", name);
// set集合名称:redisson_lock_timeout:{lockName}
timeoutSetName = prefixName("redisson_lock_timeout", name);

路啊
可以观察到其实整个初始化的过程,就是初始化了一些成员变量,比较重点关注的有下面的:

同样,我们也可以猜测threadWaitTime是获取锁的等待时间,然后还在redis中维护了一个队列和一个set集合


通过阅读源码,可以发现FairLock在整个lock的过程中,几乎都是走的RedissonLock,发现RedissonFairLock只是重载了RedissonLock的方法

下面的时间,我们也就是把这几个核心的源码流程给看一下,引入了什么不一样的机制来完成排队加锁

加锁

Lua脚本剖析

加锁的核心代码是 tryLockInnerAsync ,主要核心是一段lua脚本

下面是针对自己的理解,给这段lua脚本增加了一些注释

// KEY[1] lockName
// KEY[2] threadsQueueName -> redisson_lock_queue:{lockName}
// KEY[3] timeoutSetName -> redisson_lock_timeout:{lockName}
// ARGV[1] unit.toMillis(leaseTime)
// ARGV[2] uuid:threadId 
// ARGV[3] waitTime
// ARGV[4] currentTime当前时间

-----------------------------------循环--------------------------------------------

//  (1)要不就是没有元素,不处理了
//  (2)要不就是把那些过期掉的元素给remove掉
while true do 
  // 从队列中获取第一个元素
  // lindex redisson_lock_queue:{lockName} 0
  local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
  // 如果没有获取到就直接跳出去
  if firstThreadId2 == false then
    break;
  end;
  // 去set里面获取这个队头的元素的score,也就是timeout
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  // 如果这个timeout比当前时间还要小就从队列和set集合中给他挪出来
  if timeout <= tonumber(ARGV[4]) then 
    redis.call('zrem', KEYS[3], firstThreadId2);
    redis.call('lpop', KEYS[2]);
  else
    break;
  end;
end;

-------------------------------------加锁------------------------------------------

// 当前的锁lockName没有人获取,且lockName队列也不存在或者从队列中对头元素是同一个线程加锁-->代表能加锁
// 能加锁的话,就这一段代码
// (exists lockName == 0) and 
// ((exists redisson_lock_queue:{lockName} == 0) 
//  or (lindex redisson_lock_queue:{lockName} 0 ==  uuid:threadId))
if (redis.call('exists', KEYS[1]) == 0) 
  and ((redis.call('exists', KEYS[2]) == 0) 
    or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then 
  // 把当前加锁的线程给从队列和set集合中remove掉
  // lpop redisson_lock_queue:{lockName}
  redis.call('lpop', KEYS[2]);
  // zrem redisson_lock_timeout:{lockName} uuid:threadId 
  redis.call('zrem', KEYS[3], ARGV[2]);
  // 就获取set集合中的所有元素,赋值给keys
  // zrange redisson_lock_timeout:{lockName} 0 -1
  local keys = redis.call('zrange', KEYS[3], 0, -1);
  // 遍历keys
  for i = 1, #keys, 1 do 
    // 而zscore的设置是: 上一个锁的score+waitTime+currentTime
    // 让整个set集合中的元素都减掉waitTime
    // zincrby redisson_lock_timeout:{lockName} -waittime keys[i]
    redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
  end;
  // 加锁的代码
  // hset lockName uuid:threadId 1
  redis.call('hset', KEYS[1], ARGV[2], 1);
  // 设置过期时间的代码
  // pexpire lockName leaseTime
  redis.call('pexpire', KEYS[1], ARGV[1]);
  // 返回nil,加锁成功,会启动一个watchdog调度任务
  return nil;
end;

-------------------------------------可重入锁---------------------------------------

// 当前持有锁的线程是自己,可重入加锁就会走到这里来加锁了
// hexists lockName uuid:threadId 
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then 
  redis.call('hincrby', KEYS[1], ARGV[2],1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

------------------------已经有线程阻塞在队列里的,while获取锁--------------------------

// 获取set集合中,uuid:threadId key的分数,然后再给他 -waitTime-currentTime
// zscore的设置是: 上一个锁的score+waitTime+currentTime
// 实际返回的就是ttl
// zscore redisson_lock_timeout:{lockName} uuid:threadId
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
  return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;


-----------------------------------第一次过来的互斥锁--------------------------------
// 计算并返回队列中最后一个线程的ttl,并添加到队列和set集合中

// 获取队列中的最后一个元素
// lindex threadsQueueName -1
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
// 判断队列中最后一个元素不为空,且不等于uuid:threadId
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
  // zscore redisson_lock_timeout:{lockName} lastThreadId - 当前时间
  ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else 
  // 只有队列中的元素为空的
  // pttl lockName
  ttl = redis.call('pttl', KEYS[1]);
end;

// timeout = ttl + waitTime + currentTime
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
// 设置set集合中的元素,会带上一个timeout作为score
// zadd redisson_lock_timeout:{lockName} timeout uuid:threadId 
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
  // 将等待的线程设置到队列中
  // rpush redisson_lock_queue:{lockName} uuid:threadId 
  redis.call('rpush', KEYS[2], ARGV[2]);
end;

// 返回ttl
return ttl;

加锁的流程图

从整个大致的流程图中我们大概可以枚举出几种情况:

假设中间过去了10ms,也就是currentTime2 - currentTime1 = 10ms

加锁的场景图

简单画了一下,加锁的几个场景时序图——哈哈哈 伪时序图,最开始没想好咋画

公平加锁中的特性

排队加锁

排队加锁也是公平锁与RLock的主要区别,可以根据请求获取锁的顺序来获取锁

核心是引入了一个队列,来存储获取锁的顺序,同时通过维护一个zset有序集合来控制锁的超时

获取锁超时自动释放锁

自动刷新超时时间

锁释放

从代码结构中可以看到实际上的释放锁是重构了org.redisson.RedissonFairLock#unlockInnerAsync方法

核心也是一段lua脚本

-----------------------------------循环--------------------------------------------

//  (1)要不就是没有元素,不处理了
//  (2)要不就是把那些过期掉的元素给remove掉
while true do 
  local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
  if firstThreadId2 == false then 
    break;
  end; 
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  if timeout <= tonumber(ARGV[4]) then 
    redis.call('zrem', KEYS[3], firstThreadId2); 
    redis.call('lpop', KEYS[2]); 
  else 
    break;
  end; 
end;


----------------------------要释放的所没有被持有--------------------------------------

if (redis.call('exists', KEYS[1]) == 0) then 
  local nextThreadId = redis.call('lindex', KEYS[2], 0); 
  if nextThreadId ~= false then 
    redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); 
  end; 
  return 1; 
end;


----------------------------如果持有锁的人不是自己----------------------------------
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end; 

---------------------------如果是可重入的锁,就count-1---------------------------------
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0;
end; 

---------------------------如果持有的锁被释放掉了,就发一个广播消息----------------------
redis.call('del', KEYS[1]); 
local nextThreadId = redis.call('lindex', KEYS[2], 0); 
if nextThreadId ~= false then 
  redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); 
  end; 
return 1; 
    

主要不返回null就代表释放锁成功了

核心公平锁知识大概就这些了,主要是详细的看了加锁和释放锁的lua脚本,以及释放锁的流程

标签:加锁,lockName,KEYS,redis,FairLock,call,timeout,Redisson,源码
来源: https://blog.csdn.net/weixin_47246944/article/details/121713053