基于Redis实现分布式锁(二)
作者:互联网
扑街前言:上篇文章简单的说了一下Redis实现分布式锁的基本原理,本次来分析下Redis提供的分布式锁的源码。(认识自己是菜鸟的第十一天)
RedissonClient:
我们先说结论,Redis本身提供的RedissonClient对象(客户端对象),这个对象的getlock()方法能获取一把锁对象,然后lock.lock()加锁,lock.unlock()解锁。最简单的分布式锁就完成了。简单的难以想象,我们说下源码。
/*
* 构建连接对象
*/
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
// 连接Redis,或者服务器客户端对象
RedissonClient redisson = Redisson.create(config);
// 获取锁
RLock lock = redisson.getLock("anyLock");
// 加锁
lock.lock();
try {
...
} finally {
// 解锁
lock.unlock();
}
getLock:
从上面的代码可以看出,当连接到数据库获取到客户端对象之后,第一步是先拿到一把锁,然后用锁对象来完成加锁和解锁的动作。而锁对象是通过getLock(String name)方法根据锁的唯一名称来获取的,上篇文章有说不同线程去拿锁的过程是会判断是否有线程正在持有锁,进而为锁设置过期时间等操作,但getLock方法的源码中是没有判断这些操作的,这些判断放在了下面加锁的过程中,也就说拿锁这一步,只是单纯的获取一个锁对象,并没有连接Redis数据库,这样进而保证了锁的原子性,防止死锁。
public RLock getLock(String name) {
// 获取锁对象
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
/**
* 锁的一些属性
*/
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
// 这里是获取一个RedisObject
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
lock:
加锁的过程,这个不看源码的情况下,只根据上篇文章的结论大致猜测一下,应该是先获取线程的唯一ID,然后判断目前这个锁是否有线程正在持有,如果没有线程持有,则将锁添加的Redis数据库,然后设置过期时间,这个过程应该是lua脚本操作,为了保证操作的原子性,还有整个过程中如果出现异常则进行解锁,如果有线程持有,则为线程添加订阅与发布,进而等待持有线程释放锁,如果中间过程超时,则提前结束等待线程。
这面大致猜测了一下,下面看具体源码,加锁的这个接口有多个实现类进行了现实接口,目前先看redisson包下面的代码(下面会有一大段代码段),这里有用到一个Redis的订阅与发布,这个下次再说。
/**
* 入口
*/
public void lock() {
try {
this.lockInterruptibly();
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
/**
* 第二次调用,传入-1L和null
*/
public void lockInterruptibly() throws InterruptedException {
this.lockInterruptibly(-1L, (TimeUnit)null);
}
/**
* 加锁的实现方法
*/
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取线程的唯一ID
long threadId = Thread.currentThread().getId();
// 判断是否有线程支持有锁,并返回锁的生存时间,如果加锁成功则返回为null,反之为锁的生存时间(也就是说还有线程正在持有锁)
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
// 阻塞,订阅消息
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
/*
* 循环访问锁是否已经释放
*/
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
// 加锁成功
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消订阅
this.unsubscribe(future, threadId);
}
}
}
tryAcquire():
首先明确tryAcquire方法当前需要返回的是生存时间,其中代码调用的get方法是解析tryAcquireAsync方法返回的RFuture对象获取生存时间,而tryAcquireAsync方法用会调用存值锁或获取已有锁的生存时间的lua脚本(用lua脚本是为了保证操作的原子性),这里还有续期的一些逻辑,具体代码详情如下。
关于锁续期,这点我也没有弄得很明白,从锁的流程上推,猜测结论:当A线程获取到锁之后,会创建一个异步线程用于锁续期,也就是不让业务执行时间大于锁的过期时间,但是如何判断当前线程是否持有锁,我没有找到相关的结论(自己猜测,加锁之前,我们是先获取的redisson的锁对象,如果服务挂了,那么jvm的垃圾回收机制会将这个锁对象回收掉,那么异步线程就获取不到当前的锁对象了,就不再续期,从而锁会自然过期,然后释放),当B线程进来时,发现A线程正在使用锁,那么就会返回一个锁的生存时间,B线程就会循环等待,这个块具体的代码应该在上面的代码块中。
lua脚本翻译:
加锁lua脚本:
- 如果锁不存在的话(EXISTS命令),则存入锁的key和value线程的唯一ID(hset命令),并用pexpire命令设置过期时间(毫秒数)。
- 如果key和value的值都存在的话(hexists命令),也就说当前访问的线程是正在使用锁的线程(递归操作,重入,自己调自己),则增加一个增量1,并pexpire命令设置过期时间(毫秒数)。
- 上面条件均不满足是,也就是已经有线程在使用当前锁同时锁没有释放,并且当前线程不是正在使用的线程,则返回key的生存时间(pttl命令返key的生存时间毫秒数,相似命令TTL返回的是秒)即锁的生存时间。
续期lua脚本:
- 当前线程是否还存在(hexists命令),如果存在,则重新用pexpire命令设置过期时间(毫秒数),并返回1,反之返回0。
/**
* 入口
*/
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
/**
* 调用存值,续期
*/
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 传入时间不等于-1L
if (leaseTime != -1) {
// 调用存入锁
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 调用存入锁,返回监听对象
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 续期操作
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 调用续期
scheduleExpirationRenewal(threadId);
}
}
});
// 返回监听对象
return ttlRemainingFuture;
}
/**
* 存入锁
*/
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 转换时间
internalLockLeaseTime = unit.toMillis(leaseTime);
// lua脚本执行存入命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
/**
* 锁续期
*/
private void scheduleExpirationRenewal(final long threadId) {
// 这里其实就是一个幂等处理,思考 如果存在了当前线程的加锁调式调度,肯定就不需要再添加一次对把
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 开启一个异步线程 默认 30/3 = 10S 后调度任务
// 进行续约,如果线程没有持有锁了,那么就会停止调度
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 从名字可以看出这里是异步的更新过期时间
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
//0 false 1 true 意思就是 只有成功了才会再次调度续约
if (future.getNow()) {
// reschedule itself
// 这里续约完成之后,会再次调用 续约方法
scheduleExpirationRenewal(threadId);
}
}
});
}
// 过期时间/3 = 默认就是 30 / 3 = 10S
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
上述就是加锁的全过程,还是一句话一定要保证操作的原子性。
unlock:
加锁完成后,一定要记得解锁,因为线程结束后,虽然Redis会根据锁的过期时间释放锁,但是这个过程一定是>30秒的,因为锁会自动续期,而且只有当jvm回收完锁对象之后,续期才会结束,所以加上unlock方法来释放锁是有必要的。
相对于加锁,解锁就相当简单了,只要保证线程运行结束,并且解锁的线程一定是加锁的线程即可。看代码。
/**
* 解锁
*/
public void unlock() {
// 判断当前线程是否是加锁线程,并解锁(注意递归调用,要递归解锁)
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
// 解锁失败
if (opStatus == null) {
// 报错
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 解锁成功
if (opStatus) {
// 删除续期监听
cancelExpirationRenewal();
}
}
以上就是分布式锁的简单源码分析,后续再针对Redis的订阅与发布和锁需求详讲一次。
标签:基于,加锁,return,lock,commandExecutor,Redis,线程,threadId,分布式 来源: https://blog.csdn.net/qq_39339965/article/details/121657697