数据库
首页 > 数据库> > Redisson原理

Redisson原理

作者:互联网

1、场景

假设场景,多个线程并发(模拟并发)对库存数量进行扣减,现将库存预置在redis中,然后开启多线程对库存进行扣减

private static final String PRODUCT = "MoonCake";
private static final String PRODUCT_STOCK = PRODUCT + "Stock";

@Autowired
private RedissonClient redissonClient;

@Autowired
private RedisTemplate redisTemplate;

@GetMapping("/lockAdd")
@ApiOperation("加分布式锁")
public void lockAdd() throws Exception {
    redisTemplate.opsForValue().set(PRODUCT_STOCK, 90);// 预置库存为90
    for (int i = 0; i < 100; i++) {
        new Thread(() -> {
            try {
                decreaseStock();
            } catch (InterruptedException e) {
                log.error("error:", e);
            }
        }).start();
    }
}

private void decreaseStock() throws InterruptedException {
    //对数据进行加锁
    RLock lock = redissonClient.getLock(PRODUCT);
    if (lock.tryLock(5, TimeUnit.SECONDS)) {
        // 获取最新的库存
        Integer curStock = (Integer) redisTemplate.opsForValue().get(PRODUCT_STOCK);
        log.info("Get from redis, curStock=" + curStock);
        if (curStock > 0) {
            curStock -= 1;
            // 更新库存值
            redisTemplate.opsForValue().set(PRODUCT_STOCK, curStock);
            log.info("扣减成功,库存stock:" + curStock);
        } else {
            //没库存
            log.info("扣减失败,库存不足");
        }
        //解锁
        lock.unlock();
    }
}

2、新建锁对象

获取锁时,一般需要锁的key,可以根据实际业务情况进行设置,这里设置为“MoonCake”

RLock lock = redissonClient.getLock(PRODUCT);

上面新建锁时,主要将name注入,生成一个锁对象RedissonLock

@Override
public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        // 命令行执行器
        this.commandExecutor = commandExecutor;
        // id
        this.id = commandExecutor.getConnectionManager().getId();
        // 内部锁释放时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        // entry名,存储在redis中的形式
        this.entryName = id + ":" + name;
        // 锁状态的发布和订阅
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

3、获取锁

使用lock.tryLock可以获取锁,该方法的定义如下

/**
**@param waitTime: 获取锁的等待时间
**@param leaseTime:获取到锁之后的释放时间
**@param unit:时间单位
**/
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 等待时间转化为毫秒
     long time = unit.toMillis(waitTime);
    // 当前时间
     long current = System.currentTimeMillis();
    // 当前线程ID
     long threadId = Thread.currentThread().getId();
    // 尝试获取锁,返回ttl时间
     Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
     // 获取到锁
     if (ttl == null) {
         return true;
     }
     // 没有获取到锁,更新等待时间=waitTime-刚才操作花费时间
     time -= System.currentTimeMillis() - current;
    // 等待时间到,没有获取到锁,获取失败
     if (time <= 0) { 
         acquireFailed(waitTime, unit, threadId);
         return false;
     }
     
    // 等待时间还没有用完
     current = System.currentTimeMillis();
    /**
    * 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
    * 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
    * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
    * 当 this.await 返回 true,进入循环尝试获取锁.
    */
     RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
     if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
         if (!subscribeFuture.cancel(false)) {
             subscribeFuture.onComplete((res, e) -> {
                 if (e == null) {
                     unsubscribe(subscribeFuture, threadId);
                 }
             });
         }
         acquireFailed(waitTime, unit, threadId);
         return false;
     }

     try {
         // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
         time -= System.currentTimeMillis() - current;
         if (time <= 0) {
             acquireFailed(waitTime, unit, threadId);
             return false;
         }
         /**
           * 收到锁释放的信号后,在最大等待时间之内,循环尝试获取锁
           * 获取锁成功,则返回 true,
           * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
          */
         while (true) {
             long currentTime = System.currentTimeMillis();
             ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
             // 获取锁成功
             if (ttl == null) {
                 return true;
             }
             // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
             time -= System.currentTimeMillis() - currentTime;
             if (time <= 0) {
                 acquireFailed(waitTime, unit, threadId);
                 return false;
             }

             /**
              * 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
              */
             currentTime = System.currentTimeMillis();
             // 当前锁的ttl>0并且ttl<剩余的等待时间
             if (ttl >= 0 && ttl < time) {
                 // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
             } else {
                 // 在wait time 时间范围内等待可以通过信号量
                 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
             }

              // 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
             time -= System.currentTimeMillis() - currentTime;
             if (time <= 0) {
                 acquireFailed(waitTime, unit, threadId);
                 return false;
             }
         }
     } finally {
         // 取消订阅
         unsubscribe(subscribeFuture, threadId);
     }
 }

加锁之后在redis中的数据结构是hash类型

在这里插入图片描述

获取锁的Lua脚本

public class RedissonLock extends RedissonExpirable implements RLock {
 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        // 脚本执行参数: 都是从下标1开始
        // KEYS[1]=锁的key
        // ARGV[1]=锁释放时间,ARGV[2]=当前线程Id
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +  // 当前锁key存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成最新值
                        "return nil; " + // 返回nil
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 当前锁的key不存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + //以毫秒为单位,将锁的TTL过期时间刷新成最新值
                        "return nil; " +
                 "end; " +
                 "return redis.call('pttl', KEYS[1]);", // 以毫秒为单位,返回锁的当前ttl过期时间
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
}

4、释放锁

解锁时的参数类似于:

getName() -> MoonCake
getChannelName() -> redisson_lock__channel:{MoonCake}
LockPubSub.UNLOCK_MESSAGE -> 0
internalLockLeaseTime -> 90000
getLockName(threadId) -> 1a2cef15-9816-4bea-89dc-1a8d9ad096ca:644

主要通过RedissonLock#unlockInnerAsync方法进行解锁,先判断锁是否被当前线程加的,如果不是,直接返回解锁失败;如果是,将当前线程持有的锁的计数器减1,获得减1之后的计数器值A;如果A大于0,代表锁被当前线程加了多次,将锁的过期时间刷新成传入的internalLockLeaseTime;如果A==0,代表锁要被释放了,删除锁,然后发送解锁消息给等待队列。

 protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 脚本执行参数: 都是从下标1开始
        // KEYS[1]=锁的key, KEYS[2]=消息通道名称
        // ARGV[1]=解锁信息,ARGV[2]=锁释放时间,ARGV[2]=当前线程Id
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 当前线程持有锁?
                        "return nil;" + // 不是当前线程持有的锁
                  "end; " +
                  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 当前线程持有锁计数-1
                  "if (counter > 0) then " + // 锁重入
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成释放时间
                        "return 0; " + // 返回0
                   "else " + // 只锁定了一次,没有锁重入
                        "redis.call('del', KEYS[1]); " + // 删除锁
                        "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁释放消息,通知等待队列
                        "return 1; " + // 返回1
                   "end; " +
                   "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

标签:Redisson,return,KEYS,redis,获取,线程,threadId,原理
来源: https://blog.csdn.net/shuoyueqishilove/article/details/122453559