分布式锁Redis实现方式
作者:互联网
1、SingleSubmitAspectJ
package com.sxc.workflow.aspect;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import javassist.NotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @author qiurunjing
*/
@Aspect
@Component
public class SingleSubmitAspectJ {
private static final Logger logger = LoggerFactory.getLogger(SingleSubmitAspectJ.class);
/**
* 分布式锁,需要注入分布式锁实现类,分布式锁介质可以使用redis、zookeeper、mysql等等。
*/
@Autowired
@Qualifier("redisLocker")
private IDistributeLocker distributeLocker;
/**
* 分布式锁前缀,如系统名,用于在多个系统共享同一个分布式介质时,区分其他系统使用
*/
private static final String lockKeyPrefix = "sxc-trade";
// public SingleSubmitAspectJ(IDistributeLocker distributeLocker, String lockKeyPrefix) {
// this.distributeLocker = distributeLocker;
// this.lockKeyPrefix = lockKeyPrefix;
// }
@Pointcut("@annotation(com.sxc.component.common.annotation.SingleSubmission)")
public void pointCut() {
}
@Around("pointCut()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature method = ((MethodSignature) pjp.getSignature());
SingleSubmission singleSubmit = AopUtils.getMostSpecificMethod(method.getMethod(), pjp.getTarget().getClass())
.getAnnotation(SingleSubmission.class);
if (singleSubmit == null) {
return pjp.proceed();
}
Map<String, Object> argMap = getArgMap(pjp, method);
if (!shouldIntercept(argMap, singleSubmit)) {
logger.info("不需要进行防重复提交拦截 whenExp == false");
return pjp.proceed();
}
if (singleSubmit.expire() <= 0) {
logger.info("不需要进行防重复提交拦截 expire <= 0");
return pjp.proceed();
}
return doArroundLock(pjp, singleSubmit, argMap);
}
private Object doArroundLock(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, Map<String, Object> argMap) throws Throwable {
Expression expression = AviatorEvaluator.compile(singleSubmit.seedExp(), true);
Object seedObj = expression.execute(argMap);
if (seedObj instanceof List) {
return lockListKeyAndExec(pjp, singleSubmit, (List) seedObj);
}
if (seedObj instanceof Object[]) {
return lockListKeyAndExec(pjp, singleSubmit, Lists.newArrayList(seedObj));
}
return lockSingleKeyAndExec(pjp, singleSubmit, seedObj);
}
private Object lockSingleKeyAndExec(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, Object seedObj) throws Throwable {
String key = makeKey(singleSubmit, String.valueOf(seedObj));
lock(singleSubmit, key);
return proceedAndUnlock(pjp, key);
}
private Object lockListKeyAndExec(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, List seedList) throws Throwable {
List<String> lockedKeys = Lists.newArrayList();
seedList.forEach(seed -> {
String key = makeKey(singleSubmit, String.valueOf(seed));
boolean lockSucc = false;
try {
lock(singleSubmit, key);
lockedKeys.add(key);
lockSucc = true;
} finally {
if (!lockSucc) {
lockedKeys.forEach(one -> unlock(one));
}
}
});
return proceedAndUnlockList(pjp, lockedKeys);
}
private void lock(SingleSubmission singleSubmit, String key) throws RepeatSubmitException {
logger.info("尝试获取全局锁, key: {}", key);
boolean isSucc = distributeLocker.tryLock(key, singleSubmit.expire());
if (!isSucc) {
if (!singleSubmit.needRetry()) {
repeatCommitException(key, singleSubmit);
return;
}
wait(singleSubmit, key);
}
}
private void repeatCommitException(String key, SingleSubmission singleSubmit) throws RepeatSubmitException {
logger.info("任务处理中, 请不要重复提交. key:{} errMsg:{}", key, singleSubmit.errMsg());
throw new RepeatSubmitException(singleSubmit.errMsg());
}
private void unlock(String key) {
try {
distributeLocker.unLock(key);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
logger.info("退出全局锁, key: {}", key);
}
private Object proceedAndUnlockList(ProceedingJoinPoint pjp, List<String> keys) throws Throwable {
try {
Object obj = pjp.proceed();
return obj;
} finally {
keys.forEach(key -> unlock(key));
logger.info("退出全局锁, keys: {}", keys);
}
}
private Object proceedAndUnlock(ProceedingJoinPoint pjp, String key) throws Throwable {
try {
Object obj = pjp.proceed();
return obj;
} finally {
try {
distributeLocker.unLock(key);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
logger.info("退出全局锁, key: {}", key);
}
}
private void wait(SingleSubmission singleSubmit, String key) throws RepeatSubmitException {
long waitTime = 0;
boolean isSucc = false;
while (waitTime < singleSubmit.expire()) {
SleepUtil.sleep(1 * 1000);
waitTime += 1;
logger.info("再次尝试获取全局锁, key: {}, 等待时间: {}/{} s", key, waitTime, singleSubmit.expire());
isSucc = distributeLocker.tryLock(key, singleSubmit.expire());
if (isSucc) {
break;
}
}
if (!isSucc) {
repeatCommitException(key, singleSubmit);
}
}
private boolean shouldIntercept(Map<String, Object> argMap, SingleSubmission submission) {
if (StringUtils.isNotBlank(submission.whenExp())) {
Expression expression = AviatorEvaluator.compile(submission.whenExp(), true);
return (boolean) expression.execute(argMap);
}
return true;
}
private String makeKey(SingleSubmission singleSubmit, String seedVal) {
return lockKeyPrefix + "_" + singleSubmit.group() + "_" + seedVal;
}
private Map<String, Object> getArgMap(ProceedingJoinPoint pjp, MethodSignature method) throws NotFoundException {
String[] parameterNames;
if (AopUtils.isJdkDynamicProxy(pjp.getThis())) {
parameterNames = ReflectionUtil.getParameterNames(pjp.getTarget().getClass(), method.getMethod().getName());
} else {
parameterNames = method.getParameterNames();
}
Object[] pjpArgs = pjp.getArgs();
Map<String, Object> argMap = Maps.newHashMap();
if (parameterNames != null) {
for (int i = 0; i < parameterNames.length; i++) {
argMap.put(parameterNames[i], pjpArgs[i]);
}
}
return argMap;
}
}
2、IDistributeLocker
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.sxc.component.common.lock;
public interface IDistributeLocker {
boolean tryLock(String var1, int var2);
void unLock(String var1);
}
3、IDistributeLocker的实现类RedisLocker
package com.biz.common.lock;
import com.common.lock.IDistributeLocker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @ClassName RedisLocker
* @Description TODO
* @Author zengry
* @Date 2021/1/11 14:54
*/
@Slf4j
@Service
public class RedisLocker implements IDistributeLocker {
@Resource
RedisTemplate<String, Object> redisTemplate;
@Override
public boolean tryLock(String key, int expireSeconds) {
log.info("资源key:{}开始获取分布式锁", key);
// todo 获取线程ID
long requestTime = System.currentTimeMillis();
Boolean lock = redisTemplate.opsForValue().setIfAbsent(key, requestTime, expireSeconds, TimeUnit.SECONDS);
if (null == lock || !lock) {
log.info("资源key:{}正在加锁....", key);
// 防止多线程抢锁
Object currentValue = redisTemplate.opsForValue().get(key);
if (null != currentValue && (Long) currentValue < System.currentTimeMillis()) {
Object target = redisTemplate.opsForValue().getAndSet(key, requestTime);
if (null != target && target.equals(requestTime)) {
log.info("资源key:{}重新分布式锁成功!!!", key);
return true;
}
}
log.info("资源key:{}获取分布式锁失败", key);
return false;
}
log.info("资源key:{}获取分布式锁成功!!!", key);
return true;
}
@Override
public void unLock(String key) {
try{
redisTemplate.opsForValue().getOperations().delete(key);
}catch (Exception e){
log.error("资源key:{}分布式锁释放异常, {}", key, e.getMessage() );
e.printStackTrace();
}
}
}
4、aspect包下SingleSubmission自定义注解
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.common.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Repeatable(SingleSubmissions.class)
public @interface SingleSubmission {
String group();
String seedExp() default "";
String whenExp() default "";
int expire() default 30;
boolean needRetry() default false;
String errMsg() default "任务处理中,请不要重复提交";
}
5、SingleSubmissions自定义注解
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.common.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SingleSubmissions {
SingleSubmission[] value();
}
6、Redis的yml配置
spring:
redis:
database: 5
host: 127.0.0.1
port: 6379
password: "123456"
# 连接超时时间(毫秒)默认是2000ms
timeout: 8000ms
jedis:
pool:
# 连接池最大连接数(使用负值表示没有限制)
max-active: 16
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: 30000
# 连接池中的最大空闲连接
max-idle: 8
# 连接池中的最小空闲连接
min-idle: 2
7、启动类配置注解@EnableAspectJAutoProxy(proxyTargetClass = true)
8、pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
9、工具类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.sxc.component.util;
public class SleepUtil {
public SleepUtil() {
}
public static void sleep(long mills) {
try {
Thread.sleep(mills);
} catch (InterruptedException var3) {
}
}
}
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.sxc.component.util;
import com.google.common.collect.Maps;
import java.util.Map;
import javassist.ClassClassPath;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;
import javassist.NotFoundException;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;
public class ReflectionUtil {
private static Map<String, String[]> parameterNamesCache = Maps.newConcurrentMap();
public ReflectionUtil() {
}
public static String[] getParameterNames(Class cls, String methodName, boolean cache) throws NotFoundException {
if (cache) {
String key = makeParameterCacheKey(cls, methodName);
if (parameterNamesCache.containsKey(key)) {
return (String[])parameterNamesCache.get(key);
}
}
String[] paramNames = getParameterNamesNoCache(cls, methodName);
if (cache) {
parameterNamesCache.put(makeParameterCacheKey(cls, methodName), paramNames);
}
return paramNames;
}
private static String[] getParameterNamesNoCache(Class cls, String methodName) throws NotFoundException {
String[] paramNames = null;
ClassPool pool = ClassPool.getDefault();
ClassClassPath classPath = new ClassClassPath(cls);
pool.insertClassPath(classPath);
CtClass cc = pool.get(cls.getName());
CtMethod cm = cc.getDeclaredMethod(methodName);
MethodInfo methodInfo = cm.getMethodInfo();
CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
LocalVariableAttribute attr = (LocalVariableAttribute)codeAttribute.getAttribute("LocalVariableTable");
if (attr != null) {
paramNames = new String[cm.getParameterTypes().length];
int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;
for(int i = 0; i < paramNames.length; ++i) {
paramNames[i] = attr.variableName(i + pos);
}
}
return paramNames;
}
private static String makeParameterCacheKey(Class cls, String methodName) {
return cls.getName() + "_" + methodName;
}
public static String[] getParameterNames(Class cls, String methodName) throws NotFoundException {
return getParameterNames(cls, methodName, true);
}
}
12、ProcessSubmissionGroup
package com.sxc.workflow.common.lock;
import lombok.Data;
/**
* 防重复提交锁标记,用于定义某种业务场景下的锁粒度
*/
public class ProcessSubmissionGroup {
/**
* 受理人受理任务
*/
public static final String ASSIGNEE_USER_HANDLE_TASK = "ASSIGNEE_USER_HANDLE_TASK";
/**
* 申请人撤回流程
*/
public static final String APPLY_WITHDRAW_PROCESS = "APPLY_WITHDRAW_PROCESS";
}
11、使用,在业务层BizService方法上加注解即可
@SingleSubmission(
group = ProcessSubmissionGroup.APPLY_START_PROCESS,
seedExp = "xxxDTO.id",
errMsg = "流程正在启动,不可以再次启动")
标签:return,String,方式,singleSubmit,Redis,key,import,pjp,分布式 来源: https://blog.csdn.net/weixin_44019026/article/details/113546842