数据库
首页 > 数据库> > redis分布式锁

redis分布式锁

作者:互联网

前言

分布式锁的选择,目前市面上使用最广的就是redis和zookeeper,这两种实现都有各自的有点。

zookeeper:可用性高,性能低,并发低。

redis:性能高,可用性中等,并发高

根据业务场景选择合适的技术实现,下面通过应用redis的特性,实现分布式锁。

业务开发相关要求

  1. 与业务代码低耦合,侵入性低。

  2. 使用方便,简单

  3. 组件开发

实现原理

利用redis的串行操作特点,将相关的锁设置key,给key设置过期时间。编写为一个luna脚本,实现一体操作。

redislock相关开源

redisson是开源的redis客户端,提供了丰富的数据操作。也提供了分布式锁的实现。已经有相关开源项目,就不要重复造轮子。

代码相关实现

根据相关的业务开发需求,基于spring的aop的特性,采用注解的方式进行实现。组件的开发参考了相关开源项目实现,也融入了自身的的实现

参考开源的git地址:https://github.com/kekingcn/spring-boot-klock-starter

锁组件涉及到功能

  1. 锁类型实现:重入锁,公平锁
  2. 加解锁超时,异常处理
  3. 自定义错误处理
  4. watchdog功能开启
  5. 自定义本地参数传入

@RedisLock切面类

package com.xxx.component.redislock.annotation;

import com.xxx.component.redislock.model.LockRange;
import com.xxx.component.redislock.model.LockType;
import com.xxx.component.redislock.strategy.LockExceptionStrategy;
import com.xxx.component.redislock.strategy.LockTimeOutStrategy;
import com.xxx.component.redislock.strategy.ReleaseExceptionStrategy;
import com.xxx.component.redislock.strategy.ReleaseTimeOutStrategy;
import com.xxx.component.redislock.util.Constant;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target(value = {ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RedisLock {
    /**
     * 锁前缀
     * @return
     */
    String preFix() default "";


    /**
     * 锁的范围
     *
     * @return
     */
    LockRange lockRange() default LockRange.normal;

    /**
     * 锁后缀
     *
     * @return
     */
    String postFix() default "";

    /**
     * 参数key值拼接符
     * @return
     */
    String separator() default ".";
    /**
     * 锁的名称
     * @return name
     */
    String name() default "";
    /**
     * 锁类型,默认可重入锁
     * @return lockType
     */
    LockType lockType() default LockType.Reentrant;
    /**
     * 尝试加锁,最多等待时间
     * @return waitTime
     */
    long waitTime() default Constant.waitTime;
    /**
     *上锁以后xxx秒自动解锁
     * @return leaseTime
     */
    long leaseTime() default Constant.leaseTime;

    /**
     * 自定义业务key
     * @return keys
     */
     String[] keys() default {};

     /**
      * 加锁异常处理策略
     * @return lockTimeoutStrategy
     */
     LockExceptionStrategy lockExceptionStrategy() default LockExceptionStrategy.FAIL_FAST;

     /**
      * 释放锁异常处理策略
     * @return releaseTimeoutStrategy
     */
     ReleaseExceptionStrategy releaseExceptionStrategy() default ReleaseExceptionStrategy.FAIL_FAST;


    /**
     * 加锁超时的处理策略
     *
     * @return lockTimeoutStrategy
     */
    LockTimeOutStrategy lockTimeoutStrategy() default LockTimeOutStrategy.FAIL_FAST;


    /**
     * 释放锁时已超时的处理策略
     *
     * @return releaseTimeoutStrategy
     */
    ReleaseTimeOutStrategy releaseTimeoutStrategy() default ReleaseTimeOutStrategy.NO_OPERATION;

    /**
     * 本地参数变量
     *
     * @return
     */
    String[] localArgs() default {};

}

自定义错误处理@CustomLockExceptionStrategy

加解锁,发生超时或其它异常时,需要自定义错误处理。eg:记录加锁出错数据等,个人建议这里采用异步处理。

package com.xxx.component.redislock.annotation;

import java.lang.annotation.*;

/**
 * @Author Buck Wang
 * @Description 自定义redissson 加锁异常处理类
 * @Date 2021/4/7 15:06
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface CustomLockExceptionStrategy {
    //实现 CustomLockExceptionStrategyInter的处理类
    Class name();
    //自定义的业务类型
    String busType() default "";

}

RedisLockAspect的aop切面类

@Aspect
@Component
@Order(0)
public class RedisLockAspect {

    private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);

    @Autowired
    LockFactory lockFactory;

    @Autowired
    private LockInfoProvider lockInfoProvider;


    @Around(value = "@annotation(redisLock)")
    public Object around(ProceedingJoinPoint joinPoint, RedisLock redisLock) throws Throwable {

        LockInfo lockInfo = lockInfoProvider.get(joinPoint, redisLock);
        LockRes lockResCurrent = new LockRes(lockInfo, LockResEnum.LockDealStatusEnum.EXCEPTION.getKey());
        Lock lock = lockFactory.getLock(lockInfo);
        lockResCurrent.setLock(lock);
        //加锁
        lock(lockResCurrent);

        try {
            return joinPoint.proceed();
        } catch (Throwable e) {
            e.printStackTrace();
            throw e;
        } finally {
            try {
                //释放锁
                releaseLock(lockResCurrent);
            } catch (Throwable e) {
                throw e;

            } finally {
                LockThreadLocalUtil.removeLockRes(lockResCurrent);
            }


        }


    }

}

注意点

aop的Order执行级别需要调高优先级,值越小,越先执行。注解是在方法上,有些会涉及到事务,事务会涉及到回滚,事务的实现也是aop,所以redislock的aop需要比事务的aop的越先执行,也就是锁的order值比事务的aop值小,默认的@Order的value是Ordered.LOWEST_PRECEDENCE=2147483647

实例化Redssion客户端

  1. 代码实现

  2. xml配置文件

目前项目中基于springmvc搭建的框架,版本是3.2.5。考虑到redis异常,自动下线错误节点,redis采用的是哨兵模式。

spring的xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
	   xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
	   http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	   http://www.springframework.org/schema/context
	   http://www.springframework.org/schema/context/spring-context.xsd
	   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
	   http://redisson.org/schema/redisson
	   http://redisson.org/schema/redisson/redisson.xsd"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:aop="http://www.springframework.org/schema/aop"
	   xmlns:redisson="http://redisson.org/schema/redisson">


	<aop:aspectj-autoproxy expose-proxy="true"/>
	<context:component-scan base-package="com.xxx.component.redislock"></context:component-scan>
	<redisson:client id="redisLockComponent" >
		<redisson:sentinel-servers master-name="${redislock.sentinelmaster}"
								   slave-connection-pool-size="500"
								   master-connection-pool-size="500"
								   idle-connection-timeout="60000"
								   slave-connection-minimum-idle-size="32"
								   master-connection-minimum-idle-size="32"
								   connect-timeout="10000"
								   timeout="3000"
								   ping-timeout="1000">
			<redisson:sentinel-address value="${redislock.addr1}" />
			<redisson:sentinel-address value="${redislock.addr2}" />
			<redisson:sentinel-address value="${redislock.addr3}" />
		</redisson:sentinel-servers>
	</redisson:client>
</beans>

springboot

针对springboot,redisson有对应的jar,进行代码实现。当然你也可以自己代码实现,下面是代码。

Config config = Config.fromYAML("yaml的内容");
或
Resource resource = ctx.getResource(redisLockConfig.getFile());
InputStream is = resource.getInputStream();   
Config config = Config.fromYAML(is);
Redisson.create(config);

redisson的yaml的配置

sentinelServersConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  subscriptionsPerConnection: 5
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  slaveSubscriptionConnectionMinimumIdleSize: 1
  slaveSubscriptionConnectionPoolSize: 50
  slaveConnectionMinimumIdleSize: 32
  slaveConnectionPoolSize: 64
  masterConnectionMinimumIdleSize: 32
  masterConnectionPoolSize: 64
  readMode: "SLAVE"
  sentinelAddresses:
    - "redis://10.xxx.32.94:26379"
    - "redis://10.xxx.32.95:26379"
    - "redis://10.xxx.32.106:26379"
  masterName: "mymaster"
  database: 0
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
transportMode: "NIO"

注意点

#redislock,哨兵模式配置
redislock.sentinelmaster=mymaster
redislock.addr1=redis://10.xx.32.xx:26379
26379这个是哨兵的端口

开启watchdog

org.redisson.RedissonLock#scheduleExpirationRenewal

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

leaseTime的值设置为-1,定时任务,internalLockLeaseTime / 3,执行一次自动续约。续约的时间为internalLockLeaseTime。如internalLockLeaseTime 没做设置,默认是30000ms,如果想看到自动延时,注意不要使用debug断点执行方法,这样定时任务续约的执行也会卡住,可以采用线程sleep的方法,在观察key的剩余过期时间的变化,就知道是否自动延时。

redisson设置锁的key是hashmap类型。

#查看key是否存在,不存在返回值为0
HGETALL key
#秒的方式,查看key的剩余时间,返回-1,代表没有设置过期时间,-2代表key已经过期了。
ttl key
#毫秒的方式,查看key的剩余时间
Pttl key

开发中遇到问题

  1. 注解不生效

    主要的原因是没开启aop功能,导致不生效。

    <aop:aspectj-autoproxy expose-proxy=“true”/>

  2. 嵌套调用,aop不生效

    expose-proxy=“true”,属性必须是true。eg:同一个类中非aop注解的方法,调用带有aop注解的方法,需要先获取到此类的代理,在进行调用。

    OrderService有方法method1,method2带有注解aop的方法。method1->method2

    OrderService orderService=(OrderService) AopContext.currentProxy();
    orderService.method2()
    

标签:return,default,redis,redislock,key,aop,import,分布式
来源: https://blog.csdn.net/wliang578/article/details/116399887