分布式幂等 2 (基于请求参数幂等)- 自定义接口幂等(注解) @AvoidResubmit(isLoc = false,interval=3000)
作者:互联网
场景:用于接口请求参数幂等,基于请求参数判断在3s(interval)时间内是否重复提交,重复提交,则直接返回 {"code":2500,"message":"重复提交"}
EVN : springboot 2.3.12 + jdk8
使用: 1.(在需要做类似幂等的接口加上注解)@AvoidResubmit interval: 两次相同请求的最小时间间隔(ms),小于这个时间,认为是重复提交
isLoc=true : 表示单机版本,使用本地缓存,不需要配置redis , isLoc = flase , 用于分布式服务,需要配置redis
2.请求时必须带上认证请求头 Authorization,如果接口不需要认证 Authorization 的值, 可以是当前sessionId or userId , 多次请求唯一即可
@PostMapping("add") @AvoidResubmit(isLoc = false,interval = 2000) public JSONObject add(@RequestBody @Validated AgreeDO agree) { // doingreturn ResponseUtil.getResult(ResponseCode.SUCCESS.getCode(),ResponseCode.SUCCESS.getMessage(),null); }
eg:
curl -X POST "http://localhost:7213/agree/add" -H "accept: */*" -H "Content-Type: application/json" -H "Authorization:a369361156674bb496fc94ee49c84bd6_1659490855779"
-d "{ \"avatar\": \"string\", \"remark\": \"string\", \"ts\": 0, \"type\": \"string\", \"typeId\": \"string\", \"userId\": \"string\", \"userName\": \"string\"}"
{"code":2500,"message":"数据重复提交"}
依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency>
添加如下三个类:
AvoidResubmit AvoidResubmitHandler, SeaRequestBodyHolder(解决获取body问题)
/** * 自定义注解,用于是否做方重复提交数据检测 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface AvoidResubmit { boolean isLoc() default true; //* 间隔时间(ms),小于此时间视为重复提交 int interval() default 3000; }
AvoidResubmitHandler
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.icil.bx.common.config.SeaRequestBodyHolder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Base64; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; /** * 依赖: * <dependency> * <groupId>org.springframework.boot</groupId> * <artifactId>spring-boot-starter-data-redis</artifactId> * </dependency> * <dependency> * <groupId>com.google.guava</groupId> * <artifactId>guava</artifactId> * <version>22.0</version> * </dependency> */ /*************************** *<pre> * @Project Name : bx-blog-service * @Package : com.icil.bx.handler * @File Name : ResubmitLock * @Author : Sea * @Date : 8/2/22 3:25 PM * @Purpose : 接口幂等 * @History : *</pre> ************* 基于请求参数校验的幂等操作 => **************/ /** * 目的: 自定义切片,防止表单重复提交,网络抖动, 尤其是服务间调用,retry,防止重复提交数据 * 如果在单位时间内(自定义2s内), 同一个用户请求的数据一样,则认为是重复提交 * 1. 提交数据,需要携带认证 AUTHORIZATION_TOKEN_HEADER = "Authorization"; * 2. 请求时,生成唯一 key uri+token+ body.md5 * 3. 验证 唯一 key 是否存在, 并且时间间隔是否小于设定 * 4. 如果存在,并且小于设定时间间隔, 则说明重复提交,响应: 重复提交 {"code":2500,message:"重复提交"} */ @Slf4j @Aspect @Component public class AvoidResubmitHandler{ private final static int RESUBMIT_CODE = 2500; private final static int RESUBMIT_NO_TOKEN = 2501; private final static String CODE = "code"; private final static String OK = "200"; /** * @param joinPoint * @param avoidResubmit * @return * @throws Throwable */ @Order(-1) @Around("@annotation(avoidResubmit)") public Object handlerAvoidResubmit(ProceedingJoinPoint joinPoint, AvoidResubmit avoidResubmit) throws Throwable { List<Object> cacheKeyList = new ArrayList<>(); try { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); //把body 放入list, 后面取出, 因为@RequestBody 在接口处已经获取了一次,后面通过HttpServletRequest,无法获取了 //另外 用于解决非 post方法(put del), 请求参数防抖 Object[] bodyArgs = joinPoint.getArgs(); cacheKeyList.add((bodyArgs!=null&&bodyArgs.length>0)?JSON.toJSONString(bodyArgs):null); String resubmitToken = request.getHeader(AUTHORIZATION_TOKEN_HEADER);// Authorization if(!this.checkTokenExist(resubmitToken,avoidResubmit,request,cacheKeyList)) { if(StringUtils.isBlank(resubmitToken)){ //2501 return new JSONObject(){{put(CODE,RESUBMIT_NO_TOKEN);put("message","没有认证信息,bad token");}};} //controller 我的接口层默认返回的对象是 JSONObject;//2500 return new JSONObject(){{put(CODE,RESUBMIT_CODE);put("message","数据重复提交");}}; } }catch (Exception e) { //操作过程出错,回滚 e.printStackTrace(); ifNotOkClearLockAndCache(null,avoidResubmit.isLoc(), cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+""); } finally { //释放资源,避免内存泄露 SeaRequestBodyHolder.resetRequestBody(); } Object result = joinPoint.proceed();// // 可能存在网络问题,响应不 ok,需要立马再次提交,所以如果code 不是 200 , 可以再次立马提交 ifNotOkClearLockAndCache( result, avoidResubmit.isLoc(), cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+""); return result; } /** * 如果code不是 200, 回滚,放行 * @param result : 此次result : JSONObject 我的ResponseUtils : 封装 new JSONObject(){{put("code",2500);put("message","xx","data":"");}} * @param isLoc * @param cacheKey */ private void ifNotOkClearLockAndCache(Object result,Boolean isLoc, String cacheKey){ try { if(StringUtils.isBlank(cacheKey)){return;} JSONObject myResult =result==null ? new JSONObject(): (JSONObject) result ; String code = myResult.get(CODE)==null?null:myResult.get(CODE)+"";//code code = StringUtils.isBlank(code)?myResult.get(CODE.toUpperCase())+"":code;//CODE //code:2500 数据重复提交 if(!OK.equalsIgnoreCase(code)&&!(RESUBMIT_CODE+"").equalsIgnoreCase(code)){ System.err.println("roll back"); if(isLoc){ cacheLoc.invalidate(cacheKey); MyNxLockUtils.unlock("lc"+cacheKey,"1"); }else { redisTemplate.delete(cacheKey); redisTemplate.delete("lc"+cacheKey); } } }catch (Exception e) { e.printStackTrace(); log.error(e+""); } } // ################### base logic handler ################### //对于用户唯一 private static String AUTHORIZATION_TOKEN_HEADER = "Authorization"; private static String RESUBMIT_PREFIX = "resubmit_"; private static long cacheTimeOutSec = 10;//3min public static Cache<String, Long> cacheLoc = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeOutSec, TimeUnit.SECONDS).build(); private static RedisTemplate<String,Object> redisTemplate ; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate) { StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setValueSerializer(stringRedisSerializer); this.redisTemplate = redisTemplate; } /** * if true . psss * @param token * @param avoidResubmit * @param request * @return */ public Boolean checkTokenExist(String token, AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList) { //token is Authorization if(StringUtils.isBlank(token)){return false;} if(avoidResubmit.isLoc()) { return checkExistInLoc(token,avoidResubmit,request,cacheKeyList); }else { return checkExistInRedis(token,avoidResubmit,request,cacheKeyList); } } /** * 检测请求body是否之前发送过,解决重复点击问题(抖动) * if exist, return false * if not exist , add key:uri , value: body MD5 * remark : 如果接口使用了@RequestBody ,就无法再次获取body, 并且出现:getInputStream() has already been called for this request * @param request */ private String getBodyMD5(HttpServletRequest request) { try { /*BufferedReader reader = request.getReader(); if(reader==null){return null;} String body = IOUtils.toString(reader);*/ String body = SeaRequestBodyHolder.getRequestBody(); if(body==null){return null;} System.err.println("body---"+body); //MD5 MessageDigest md5 = MessageDigest.getInstance("MD5"); return Base64.getEncoder().encodeToString(md5.digest(body.getBytes("utf-8"))); } catch (Exception e) { log.warn("get body from HttpServletRequest error , I will get it from request Args later "+e); return null; } } /** * if exist return true * @param token * @param avoidResubmit * @param request * @return */ private Boolean checkExistInLoc(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){ Boolean result = true; //检测body String bodyMD5 = getBodyMD5(request); if(bodyMD5==null&&cacheKeyList.get(0)!=null){ bodyMD5 = DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes()); } cacheKeyList.clear(); if(bodyMD5!=null) { //类似分布式锁 String key = DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes()); String lcKey="lc"+key; Boolean lock = MyNxLockUtils.getLock(lcKey, "1"); if(!lock){return false;} //检测请求body是否之前发送过,解决重复点击问题 Long reqBeforeTs = cacheLoc.getIfPresent(key); System.err.println("reqBeforeTs"+reqBeforeTs); if(reqBeforeTs!=null)//之前请求过 { System.err.println("diff time :" +(new Date().getTime() - reqBeforeTs)); //2s 以内,认为是重复提交 if(new Date().getTime() - reqBeforeTs < avoidResubmit.interval()) { result = false; }else{ //超过规定的时间,删除 key ,放置新的值 cacheLoc.put(key,new Date().getTime()); } }else { cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况 cacheLoc.put(key,new Date().getTime()); } MyNxLockUtils.unlock(lcKey,"1"); } return result; } /** * if exist return true * @param token * @param avoidResubmit * @param request * @return */ private Boolean checkExistInRedis(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){ Boolean result = true; //检测body String bodyMD5 = getBodyMD5(request); if(bodyMD5==null&&cacheKeyList.get(0)!=null){ bodyMD5 = DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes()); } cacheKeyList.clear(); if(bodyMD5!=null) { //类似分布式锁 String key = RESUBMIT_PREFIX+DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes()); String lcKey="lc"+key; Boolean getLock = redisTemplate.opsForValue().setIfAbsent( lcKey , "1", 5, TimeUnit.SECONDS); if(!getLock){return false;} //检测请求body是否之前发送过,解决重复点击问题 Object reqBeforeTs = redisTemplate.opsForValue().get(key); System.err.println("reqBeforeTs"+reqBeforeTs); if(reqBeforeTs!=null)//之前请求过 { System.err.println("diff time :" +(new Date().getTime() - Long.valueOf(reqBeforeTs+""))); //2s 以内,认为是重复提交 if(new Date().getTime() - Long.valueOf(reqBeforeTs+"") < avoidResubmit.interval()) { result = false; }else{ //超过规定的时间,删除 key ,放置新的值 redisTemplate.opsForValue().set(key,new Date().getTime()+""); } }else { cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况 redisTemplate.opsForValue().set(key,new Date().getTime()+""); } redisTemplate.delete(lcKey); } return result; } /** * 自定义loc类分布式锁 */ public static class MyNxLockUtils { private static volatile String lcPoint0="xx0", lcPoint1="xx1", lcPoint2="xx2" ,lcPoint3="xx3", lcPoint4="xx4", lcPoint5="xx5", lcPoint6="xx6", lcPoint7="xx7", lcPoint8="xx8", lcPoint9="xx9"; /** * 分段枷锁,提升效率 * @param k * @return */ private static String getLcPoint(String k){ switch (k.hashCode()%10){ case 1: return lcPoint1; case 2: return lcPoint2; case 3: return lcPoint3; case 4: return lcPoint4; case 5: return lcPoint5; case 6: return lcPoint6; case 7: return lcPoint7; case 8: return lcPoint8; case 9: return lcPoint9; default: return lcPoint0; } } private static Cache<String, String> lockCache = null; static { lockCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(); } /** * @param k * @param v * @return */ public static Boolean getLock(String k, String v){ if(lockCache.getIfPresent(k)==null) { synchronized (getLcPoint(k)){ if(lockCache.getIfPresent(k)==null){ lockCache.put(k,v); return true; } } } return false; } public static void unlock(String k, String v) { String ifPresent = lockCache.getIfPresent(k); if(ifPresent!=null&&ifPresent.equals(v)){ lockCache.invalidate(k); } } } }
SeaRequestBodyHolder
import com.sea.handler.AvoidResubmit; import org.apache.commons.io.IOUtils; import org.springframework.core.MethodParameter; import org.springframework.core.NamedThreadLocal; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpInputMessage; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.servlet.mvc.method.annotation.RequestBodyAdviceAdapter; import javax.servlet.http.HttpServletRequest; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /*************************** *<pre> * @Project Name : Sea-blog-service * @Package : com.sea.common.config * @File Name : SeaRequestBodyHolder * @Author : Sea * @Date : 8/4/22 2:11 PM * @Purpose : body 默认只能获取一次: 如果接口使用了@RequestBody , HttpServletRequest在接口会获取一次body,后面就无法再次获取body, * 并且出现:getInputStream() has already been called for this request * 此处在 获取body后,优选缓存一份body * @History : *</pre> ***************************/ @ControllerAdvice public class SeaRequestBodyHolder extends RequestBodyAdviceAdapter { private static String requestBody = "body"; private static final ThreadLocal<Map<String,String>> requestBodyHolder = new NamedThreadLocal("Sea Request body holder"); public static String getRequestBody(){ try{ String body = requestBodyHolder.get().get(requestBody); resetRequestBody(); return body; }catch (Exception e){ return null; } } public static void resetRequestBody(){ requestBodyHolder.remove(); } @Override public boolean supports(MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) { return true; } //如果仅仅是post 方法防抖,可以在此处操作 @Override public HttpInputMessage beforeBodyRead(HttpInputMessage httpInputMessage, MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) throws IOException { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); // System.err.println(request.getMethod()); // 不包含注解 @CheckResubmit 不需要读body 直接返回 if(!methodParameter.hasMethodAnnotation(AvoidResubmit.class)){ return super.beforeBodyRead(httpInputMessage, methodParameter, type, aClass); } // 第一次 读取body String bodyStr = IOUtils.toString(httpInputMessage.getBody(), StandardCharsets.UTF_8); // 重新new 一个 HttpInputMessage return new HttpInputMessage() { @Override public InputStream getBody() throws IOException { String body = bodyStr; //放入requestBodyHolder,后面获取 requestBodyHolder.set(new HashMap<String, String>(){{put(requestBody,body);}}); // 重新写入 body return new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)); } @Override public HttpHeaders getHeaders() { return httpInputMessage.getHeaders(); } }; } }
标签:body,return,String,自定义,interval,request,import,false,null 来源: https://www.cnblogs.com/lshan/p/16553596.html