Redisson 源码初探 (六)公平锁
作者:互联网
因为Redisson 默认是非公平锁,client 端互相一起争抢,现在我们继续研究公平锁,为什么要研究?研究分布式锁 不仅仅要研究最基础的锁对吧,我们要把一系列的非公平锁 公平锁 读写锁 RedLock锁,Semaphore CountDownLatch 一系列的研究完,才算真正的研究了分布式锁,对吧
那么公平锁呢,我们知道公平锁需要维持一个有序的获取锁的顺序,可以使用队列也可以使用一些其他的机制,那么我们慢慢来看Redisson的公平锁是如何实现的?
先找到入口
public static void main(String[] args) throws Exception {
//构建一个配置信息对象
Config config = new Config();
config.useClusterServers()
//定时扫描连接信息 默认1000ms
.setScanInterval(2000)
.addNodeAddress("redis://127.0.0.1:7001");
//因为Redisson 是基于redis封装的一套便于复杂操作的框架
//所以这里构建对象肯定是创建一些与redis的连接
RedissonClient redisson = Redisson.create(config);
//这里是重点 获取锁,这也是重点分析的地方
//这里获取公平锁
RLock lock = redisson.getFairLock("lock");
//尝试获取锁,这里其实和之前的非公平锁 基本一样
//那么公平锁最主要的逻辑在于有序 应该会有一个队列或者其他机制来维持有序
//那么我们知道了类似点,同时我们去看看有哪些地方不一样
lock.lock();
//释放锁
lock.unlock();
}
我们可以看到RedissonFairLock 其实是我们RedissonLock 的一个子类,前面的逻辑都是一样的
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//尝试上锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//真正的尝试获取锁,执行lua脚本,这里是RedissonFairLock 不同的地方,子类进行了重写
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
其实区别主要在执行lua脚本这里,这里RedissonFairLock 重写了父类的 tryLockInnerAsync()方法,那么重写了什么内容?
内容有点多,重点还是看lua脚本 调试源码 if (command == RedisCommands.EVAL_LONG) 才是真正的执行逻辑
KEYS : Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
AVGS : internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
KEYS[1] = getName() 其实就是锁的名称 lockName
KEYS[2] = thredsQueueName = redisson_lock_queue:{lockName} 基于redis list 结构实现的一个队列
KEYS[3] = timeoutSetName = redisson_lock_timeout:{lockName} 基于redis 数据结构实现的一个set集合,有序的集合,可以自动按照每个数据指定一个分数(score)来进行排序
AVGS[1] = internalLockLeaseTime 就是lock 的契约时间,默认是30S
AVGS[2] = getLockName(threadId) 其实就是UUID + ":" + ThreadId
AVGS[3] = currentTime + threadWaitTime,当前时间 + 线程等待时间 默认 5000ms,最后得到一个时间 代表什么意义?
AVGS[4] = currentTime 当前时间
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[3]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime);
}
//如果你去debug 源码,你会知道其实走的是这块逻辑,而不是上面那一块
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
//一进来 就是一个死循环
"while true do "
//KEY[2] 是什么threadsQueueName 就是队列的名称
// lindex htreadsQueueName 0 什么意思?就是从队列中拿出第一个元素
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
//如果不存在 就直接会break掉 直接跳出循环
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
//从有序set 集合中拿去第一个元素 看是否过期
//其实就是去把一些过期的key清除掉
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
//从set 集合中 以及 queue中移除代表客户端的元素
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
//如果没有人加锁 锁key 和 队列都不存在 或者 队列的第一个元素是当前线程标志 满足其中一个条件
+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//弹出第一个元素
"redis.call('lpop', KEYS[2]); " +
//从set集合中删除当前线程元素
"redis.call('zrem', KEYS[3], ARGV[2]); " +
//进行加锁 ,就是一个map数据结构
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果是当前线程持有锁,那么重入的时候 就会将对应的value + 1,白哦名重入的次数
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重写设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//就会进入排队逻辑
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
//如果第一个元素不为空 同时当前线程不是第一个线程 就会进入排队 或者更新score
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
//根据第一个线程的过期时间 - 当前时间 比如 10:00:25 - 10:00:05 = 20S
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else "
//获取锁剩余存活时间
+ "ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
//timeout = ttl + 当前时间 + 5000ms
"local timeout = ttl + tonumber(ARGV[3]);" +
//放入zset 有序集合中(过期时间作为分数)这里会存在两种情况对吧
//(1)如果不存在zset这个数据集合中 那么就会返回1,就会同时放入到 队列中
//(2)如果存在了元素,那么这个zadd key score value 那么就是更新分数,返回0,这时就不会再放入到队列中
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
}
这里我们要注意,我们的公平锁,如果某个线程持有线程过长,可能会导致队列被重写排序,为什么?
我们注意两个关键存储数据的结构 queue 和 zset,
在开始的时候,就会从zset中拿去score 判断是否timeout,如果timeout,对吧,会将队列中的元素 pop 以及 set中的对应的元素移除,但是这并不代表对应的线程不进行争夺锁了哦,
那些对应的线程会根据激活时间重写争夺锁,这个时候就会重写将自己放入到queue中 以及 zset中,因为GC 或者其他原因 有可能会导致延迟,从而导致 被顺序被重排,所有严格意义上来说,公平锁,并不是完全公平
标签:Redisson,leaseTime,lock,初探,redis,long,源码,threadId,unit 来源: https://blog.csdn.net/wangxuelei036/article/details/114038840