数据库
首页 > 数据库> > 基于redis的分布式锁的分析与实践

基于redis的分布式锁的分析与实践

作者:互联网

前言:在分布式环境中,我们经常使用锁来进行并发控制,锁可分为乐观锁和悲观锁,基于数据库版本戳的实现是乐观锁,基于redis或zookeeper的实现可认为是悲观锁了。乐观锁和悲观锁最根本的区别在于线程之间是否相互阻塞。

那么,本文主要来讨论基于redis的分布式锁算法问题。

从2.6.12版本开始,redis为SET命令增加了一系列选项(set [key] NX/XX EX/PX [expiration]):

原文地址:https://redis.io/commands/set
中文地址:http://redis.cn/commands/set.html

注意: 由于SET命令加上选项已经可以完全取代SETNX, SETEX, PSETEX的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。

这里简单提一下,在旧版本的redis中(指2.6.12版本之前),使用redis实现分布式锁一般需要setNX、expire、getSet、del等命令。而且会发现这种实现有很多逻辑判断的原子操作以及本地时间等并没有控制好。

而在旧版本的redis中,redis的超时时间很难控制,用户迫切需要把setNX和expiration结合为一体的命令,把他们作为一个原子操作,这样新版本的多选项set命令诞生了。然而这并没有完全解决复杂的超时控制带来的问题。

接下来,我们的一切讨论都基于新版redis。

在这里,我先提出几个在实现redis分布式锁中需要考虑的关键问题

1、死锁问题;

1.1、为了防止死锁,redis至少需要设置一个超时时间;

1.2、由1.1引申出来,当锁自动释放了,但是程序并没有执行完毕,这时候其他线程又获取到锁执行同样的程序,可能会造成并发问题,这个问题我们需要考虑一下是否归属于分布式锁带来问题的范畴。

2、锁释放问题,这里会有两个问题;

2.1、每个获取redis锁的线程应该释放自己获取到的锁,而不是其他线程的,所以我们需要在每个线程获取锁的时候给锁做上不同的标记以示区分;

2.2、由2.1带来的问题是线程在释放锁的时候需要判断当前锁是否属于自己,如果属于自己才释放,这里涉及到逻辑判断语句,至少是两个操作在进行,那么我们需要考虑这两个操作要在一个原子内执行,否者在两个行为之间可能会有其他线程插入执行,导致程序紊乱。

3、更可靠的锁;

单实例的redis(这里指只有一个master节点)往往是不可靠的,虽然实现起来相对简单一些,但是会面临着宕机等不可用的场景,即使在主从复制的时候也显得并不可靠(因为redis的主从复制往往是异步的)。

关于Martin Kleppmann的Redlock的分析

原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html

文章分析得出,这种算法只需具备3个特性就可以实现一个最低保障的分布式锁。

我们来分析一下:

第一点安全属性意味着悲观锁(互斥锁)是我们做redis分布式锁的前提,否者将可能造成并发;

第二点表明为了避免死锁,我们需要设置锁超时时间,保证在一定的时间过后,锁可以重新被利用;

第三点是说对于客户端来说,获取锁和手动释放锁可以有更高的可靠性。

更进一步分析,结合上文提到的关键问题,这里可以引申出另外的两个问题:

接下来再看,redis之父antirez对Redlock的评价

原文地址:http://antirez.com/news/101

文中主要提到了网络延迟和本地时钟的修改(不管是时间服务器或人为修改)对这种算法可能造成的影响。

最后,来点实践吧

I、传统的单实例redis分布式锁实现(关键步骤)

获取锁(含自动释放锁):

SET resource_name my_random_value NX PX 30000
 手动删除锁(Lua脚本):

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

II、分布式环境的redis(多master节点)的分布式锁实现

为了保证在尽可能短的时间内获取到(N/2)+1个节点的锁,可以并行去获取各个节点的锁(当然,并行可能需要消耗更多的资源,因为串行只需要count到足够数量的锁就可以停止获取了);

另外,怎么动态实时统一获取redis master nodes需要更进一步去思考了。

QA,补充一下说明(以下为我与朋友沟通的情况,以说明文中大家可能不够明白的地方):

1、在关键问题2.1中,删除就删除了,会造成什么问题?

线程A超时,准备删除锁;但此时的锁属于线程B;线程B还没执行完,线程A把锁删除了,这时线程C获取到锁,同时执行程序;所以不能乱删。

2、在关键问题2.2中,只要在key生成时,跟线程相关就不用考虑这个问题了吗?

不同的线程执行程序,线程之间肯虽然有差异呀,然后在redis锁的value设置有线程信息,比如线程id或线程名称,是分布式环境的话加个机器id前缀咯(类似于twitter的snowflake算法!),但是在del命令只会涉及到key,不会再次检查value,所以还是需要lua脚本控制if(condition){xxx}的原子性。

3、那要不要考虑锁的重入性?

不需要重入;try…finally 没得重入的场景;对于单个线程来说,执行是串行的,获取锁之后必定会释放,因为finally的代码必定会执行啊(只要进入了try块,finally必定会执行)。

4、为什么两个线程都会去删除锁?(貌似重复的问题。不管怎样,还是耐心解答吧)

每个线程只能管理自己的锁,不能管理别人线程的锁啊。这里可以联想一下ThreadLocal。

5、如果加锁的线程挂了怎么办?只能等待自动超时?

看你怎么写程序的了,一种是问题3的回答;另外,那就自动超时咯。这种情况也适用于网络over了。

6、时间太长,程序异常就会蛋疼,时间太短,就会出现程序还没有处理完就超时了,这岂不是很尴尬?

是呀,所以需要更好的衡量这个超时时间的设置。

实践部分主要代码:

RedisLock工具类:

package com.caiya.cms.web.component;

import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * redis实现分布式锁
 * 可实现特性:
 * 1、使多线程无序排队获取和释放锁;
 * 2、丢弃未成功获得锁的线程处理;
 * 3、只释放线程本身加持的锁;
 * 4、避免死锁
 *
 * @author wangnan
 * @since 1.0
 */
public final class RedisLock {

    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);

    /**
     * 尝试加锁(仅一次)
     *
     * @param lockKey       锁key
     * @param lockValue     锁value
     * @param expireSeconds 锁超时时间(秒)
     * @return 是否加锁成功
     * @throws CacheException
     */
    public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
        try {
            String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
            return Objects.equals(response, "OK");
        } finally {
            jedisCache.close();
        }
    }

    /**
     * 加锁(指定最大尝试次数范围内)
     *
     * @param lockKey       锁key
     * @param lockValue     锁value
     * @param expireSeconds 锁超时时间(秒)
     * @param tryTimes      最大尝试次数
     * @param sleepMillis   每两次尝试之间休眠时间(毫秒)
     * @return 是否加锁成功
     * @throws CacheException
     */
    public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
        boolean result;
        int count = 0;
        do {
            count++;
            result = tryLock(lockKey, lockValue, expireSeconds);
            try {
                TimeUnit.MILLISECONDS.sleep(sleepMillis);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        } while (!result && count <= tryTimes);
        return result;
    }

    /**
     * 释放锁
     *
     * @param lockKey   锁key
     * @param lockValue 锁value
     */
    public static void unlock(String lockKey, String lockValue) {
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
        try {
            String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
            Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
//            Objects.equals(result, 1L);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            jedisCache.close();
        }
//        return false;
    }


    private RedisLock() {
    }

}

使用工具类的代码片段1:

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟业务相关的唯一拼接键
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的唯一值
        boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只尝试一次,在本次处理过程中直接拒绝其他线程的请求
        if (!locked) {
            throw new IllegalAccessException("您的操作太频繁了,休息一下再来吧~");
        }
        try {
            // 开始处理核心业务逻辑
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);// 在finally块中释放锁
        }

使用工具类的代码片段2:

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
        boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里需要合理根据业务处理情况设置最大尝试次数和每次休眠时间)
        if (!locked) {
            throw new IllegalAccessException("系统太忙,本次操作失败");// 一般来说,不会走到这一步;如果真的有这种情况,并且在合理设置锁尝试次数和等待响应时间之后仍然处理不过来,可能需要考虑优化程序响应时间或者用消息队列排队执行了
        }

        try {
            // 开始处理核心业务逻辑
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);
        }
        ...

附加:

基于redis的分布式锁实现客户端Redisson:

https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

基于zookeeper的分布式锁实现:

http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

 

 

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
        boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里需要合理根据业务处理情况设置最大尝试次数和每次休眠时间)
        if (!locked) {
            throw new IllegalAccessException("系统太忙,本次操作失败");// 一般来说,不会走到这一步;如果真的有这种情况,并且在合理设置锁尝试次数和等待响应时间之后仍然处理不过来,可能需要考虑优化程序响应时间或者用消息队列排队执行了
        }

        try {
            // 开始处理核心业务逻辑
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);
        }
        ...

标签:redis,实践,lockValue,线程,key,lockKey,分布式
来源: https://www.cnblogs.com/sxw123/p/14066851.html