其他分享
首页 > 其他分享> > 分布式幂等 2 (基于请求参数幂等)- 自定义接口幂等(注解) @AvoidResubmit(isLoc = false,interval=3000)

分布式幂等 2 (基于请求参数幂等)- 自定义接口幂等(注解) @AvoidResubmit(isLoc = false,interval=3000)

作者:互联网

场景:用于接口请求参数幂等,基于请求参数判断在3s(interval)时间内是否重复提交,重复提交,则直接返回 {"code":2500,"message":"重复提交"}

EVN :  springboot 2.3.12  + jdk8

使用: 1.(在需要做类似幂等的接口加上注解)@AvoidResubmit   interval: 两次相同请求的最小时间间隔(ms),小于这个时间,认为是重复提交 

      isLoc=true  : 表示单机版本,使用本地缓存,不需要配置redis ,  isLoc = flase , 用于分布式服务,需要配置redis 

     2.请求时必须带上认证请求头 Authorization,如果接口不需要认证 Authorization 的值, 可以是当前sessionId  or  userId  ,  多次请求唯一即可   

    @PostMapping("add") 
    @AvoidResubmit(isLoc = false,interval = 2000)
    public JSONObject add(@RequestBody @Validated AgreeDO agree)
    {
        // doingreturn ResponseUtil.getResult(ResponseCode.SUCCESS.getCode(),ResponseCode.SUCCESS.getMessage(),null);
    }

eg:   

curl -X POST "http://localhost:7213/agree/add" -H "accept: */*" -H "Content-Type: application/json" -H "Authorization:a369361156674bb496fc94ee49c84bd6_1659490855779" 
-d "{ \"avatar\": \"string\", \"remark\": \"string\", \"ts\": 0, \"type\": \"string\", \"typeId\": \"string\", \"userId\": \"string\", \"userName\": \"string\"}"

    {"code":2500,"message":"数据重复提交"}

依赖:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
 <dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>

 

添加如下三个类:

AvoidResubmit AvoidResubmitHandler,  SeaRequestBodyHolder(解决获取body问题)
/**
 * 自定义注解,用于是否做方重复提交数据检测
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AvoidResubmit {
    boolean isLoc() default true;
     //* 间隔时间(ms),小于此时间视为重复提交
     int interval() default 3000;
}

 

AvoidResubmitHandler
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.icil.bx.common.config.SeaRequestBodyHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * 依赖:
 *         <dependency>
 *               <groupId>org.springframework.boot</groupId>
 *               <artifactId>spring-boot-starter-data-redis</artifactId>
 *            </dependency>
 *             <dependency>
 *             <groupId>com.google.guava</groupId>
 *             <artifactId>guava</artifactId>
 *             <version>22.0</version>
 *         </dependency>
 */
/***************************
 *<pre>
 * @Project Name : bx-blog-service
 * @Package      : com.icil.bx.handler
 * @File Name    : ResubmitLock
 * @Author       :  Sea
 * @Date         : 8/2/22 3:25 PM
 * @Purpose      : 接口幂等
 * @History      :
 *</pre>
 ************* 基于请求参数校验的幂等操作 =>  **************/
/**
 * 目的:  自定义切片,防止表单重复提交,网络抖动, 尤其是服务间调用,retry,防止重复提交数据
 *     如果在单位时间内(自定义2s内), 同一个用户请求的数据一样,则认为是重复提交
 * 1. 提交数据,需要携带认证  AUTHORIZATION_TOKEN_HEADER = "Authorization";
 * 2. 请求时,生成唯一 key  uri+token+ body.md5
 * 3. 验证 唯一 key 是否存在, 并且时间间隔是否小于设定
 * 4. 如果存在,并且小于设定时间间隔, 则说明重复提交,响应: 重复提交  {"code":2500,message:"重复提交"}
 */
@Slf4j
@Aspect
@Component
public class AvoidResubmitHandler{
    private final static  int RESUBMIT_CODE = 2500;
    private final static  int RESUBMIT_NO_TOKEN = 2501;
    private final static  String CODE = "code";
    private final static  String OK = "200";
    /**
     * @param joinPoint
     * @param avoidResubmit
     * @return
     * @throws Throwable
     */
    @Order(-1)
    @Around("@annotation(avoidResubmit)")
    public Object handlerAvoidResubmit(ProceedingJoinPoint joinPoint, AvoidResubmit avoidResubmit) throws Throwable
    {
        List<Object> cacheKeyList = new ArrayList<>();
        try
        {
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
            //把body 放入list, 后面取出, 因为@RequestBody 在接口处已经获取了一次,后面通过HttpServletRequest,无法获取了
            //另外 用于解决非 post方法(put del), 请求参数防抖
            Object[] bodyArgs = joinPoint.getArgs();
            cacheKeyList.add((bodyArgs!=null&&bodyArgs.length>0)?JSON.toJSONString(bodyArgs):null);
            String resubmitToken = request.getHeader(AUTHORIZATION_TOKEN_HEADER);// Authorization
            if(!this.checkTokenExist(resubmitToken,avoidResubmit,request,cacheKeyList))
            {
                if(StringUtils.isBlank(resubmitToken)){ //2501
                    return new JSONObject(){{put(CODE,RESUBMIT_NO_TOKEN);put("message","没有认证信息,bad token");}};}
                    //controller 我的接口层默认返回的对象是 JSONObject;//2500
                    return new JSONObject(){{put(CODE,RESUBMIT_CODE);put("message","数据重复提交");}};
            }
        }catch (Exception e)
        {   //操作过程出错,回滚
            e.printStackTrace();
            ifNotOkClearLockAndCache(null,avoidResubmit.isLoc(),  cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+"");
        }
        finally
        {       //释放资源,避免内存泄露
               SeaRequestBodyHolder.resetRequestBody();
        }

        Object result = joinPoint.proceed();//
        // 可能存在网络问题,响应不 ok,需要立马再次提交,所以如果code 不是 200 , 可以再次立马提交
        ifNotOkClearLockAndCache( result, avoidResubmit.isLoc(),  cacheKeyList.isEmpty()?"":cacheKeyList.get(0)+"");
        return result;
    }

        /**
         * 如果code不是 200, 回滚,放行
         * @param result : 此次result : JSONObject  我的ResponseUtils : 封装 new JSONObject(){{put("code",2500);put("message","xx","data":"");}}
         * @param isLoc
         * @param cacheKey
         */
        private void ifNotOkClearLockAndCache(Object result,Boolean isLoc, String cacheKey){
          try
          {
              if(StringUtils.isBlank(cacheKey)){return;}
              JSONObject myResult =result==null ? new JSONObject(): (JSONObject) result ;
              String code =  myResult.get(CODE)==null?null:myResult.get(CODE)+"";//code
              code = StringUtils.isBlank(code)?myResult.get(CODE.toUpperCase())+"":code;//CODE
                //code:2500 数据重复提交
              if(!OK.equalsIgnoreCase(code)&&!(RESUBMIT_CODE+"").equalsIgnoreCase(code)){
                  System.err.println("roll back");
                    if(isLoc){
                        cacheLoc.invalidate(cacheKey);
                        MyNxLockUtils.unlock("lc"+cacheKey,"1");
                    }else
                    {
                        redisTemplate.delete(cacheKey);
                        redisTemplate.delete("lc"+cacheKey);
                    }
              }
          }catch (Exception e)
          { e.printStackTrace();
            log.error(e+"");
          }
        }



    // ################### base logic handler  ###################
    //对于用户唯一
    private static String  AUTHORIZATION_TOKEN_HEADER = "Authorization";
    private static String  RESUBMIT_PREFIX =  "resubmit_";
    private static long    cacheTimeOutSec = 10;//3min
    public  static  Cache<String, Long> cacheLoc =  CacheBuilder.newBuilder().expireAfterWrite(cacheTimeOutSec, TimeUnit.SECONDS).build();

    private static RedisTemplate<String,Object> redisTemplate ;
    @Autowired
    public  void setRedisTemplate(RedisTemplate redisTemplate) {
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        this.redisTemplate = redisTemplate;
    }

    /**
     * if true .  psss
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    public Boolean checkTokenExist(String token, AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList)
    {   //token is Authorization
        if(StringUtils.isBlank(token)){return false;}
        if(avoidResubmit.isLoc())
        {
         return checkExistInLoc(token,avoidResubmit,request,cacheKeyList);
        }else
        {
         return checkExistInRedis(token,avoidResubmit,request,cacheKeyList);
        }
    }



    /**
     * 检测请求body是否之前发送过,解决重复点击问题(抖动)
     * if exist, return  false
     * if not exist , add  key:uri , value: body MD5
     * remark : 如果接口使用了@RequestBody ,就无法再次获取body,  并且出现:getInputStream() has already been called for this request
     * @param request
     */
    private String getBodyMD5(HttpServletRequest request) {
        try {
          /*BufferedReader reader = request.getReader();
            if(reader==null){return null;}
            String body = IOUtils.toString(reader);*/
            String body = SeaRequestBodyHolder.getRequestBody();
            if(body==null){return null;}
            System.err.println("body---"+body);
            //MD5
            MessageDigest md5 = MessageDigest.getInstance("MD5");
            return Base64.getEncoder().encodeToString(md5.digest(body.getBytes("utf-8")));
        } catch (Exception e) {
            log.warn("get body from HttpServletRequest error , I will get it from request Args later "+e);
            return null;
        }
    }

    /**
     * if exist return true
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    private Boolean checkExistInLoc(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){
        Boolean result = true;
        //检测body
        String bodyMD5 = getBodyMD5(request);
        if(bodyMD5==null&&cacheKeyList.get(0)!=null){
           bodyMD5 =  DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes());
        }
        cacheKeyList.clear();
        if(bodyMD5!=null)
        {
            //类似分布式锁
            String key = DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes());
            String lcKey="lc"+key;
            Boolean lock = MyNxLockUtils.getLock(lcKey, "1");
            if(!lock){return false;}
            //检测请求body是否之前发送过,解决重复点击问题
            Long reqBeforeTs = cacheLoc.getIfPresent(key);
            System.err.println("reqBeforeTs"+reqBeforeTs);
            if(reqBeforeTs!=null)//之前请求过
            {
                System.err.println("diff time :" +(new Date().getTime() - reqBeforeTs));
                //2s 以内,认为是重复提交
                if(new Date().getTime() - reqBeforeTs < avoidResubmit.interval())
                {
                    result = false;
                }else{ //超过规定的时间,删除 key ,放置新的值
                    cacheLoc.put(key,new Date().getTime());
                }
            }else
            {
                cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况
                cacheLoc.put(key,new Date().getTime());
            }
            MyNxLockUtils.unlock(lcKey,"1");
        }
        return  result;
    }

    /**
     * if exist return true
     * @param token
     * @param avoidResubmit
     * @param request
     * @return
     */
    private Boolean checkExistInRedis(String token,AvoidResubmit avoidResubmit,HttpServletRequest request,List<Object> cacheKeyList){
        Boolean result = true;
        //检测body
        String bodyMD5 = getBodyMD5(request);
        if(bodyMD5==null&&cacheKeyList.get(0)!=null){
            bodyMD5 =  DigestUtils.md5DigestAsHex(cacheKeyList.remove(0).toString().getBytes());
        }
        cacheKeyList.clear();
        if(bodyMD5!=null)
        {
            //类似分布式锁
            String key = RESUBMIT_PREFIX+DigestUtils.md5DigestAsHex((token + request.getRequestURI()+bodyMD5).getBytes());
            String lcKey="lc"+key;
            Boolean getLock = redisTemplate.opsForValue().setIfAbsent( lcKey , "1", 5, TimeUnit.SECONDS);
            if(!getLock){return false;}
            //检测请求body是否之前发送过,解决重复点击问题
            Object reqBeforeTs = redisTemplate.opsForValue().get(key);
            System.err.println("reqBeforeTs"+reqBeforeTs);
            if(reqBeforeTs!=null)//之前请求过
            {
                System.err.println("diff time :" +(new Date().getTime() - Long.valueOf(reqBeforeTs+"")));
                //2s 以内,认为是重复提交
                if(new Date().getTime() - Long.valueOf(reqBeforeTs+"") < avoidResubmit.interval())
                {
                    result = false;
                }else{ //超过规定的时间,删除 key ,放置新的值
                    redisTemplate.opsForValue().set(key,new Date().getTime()+"");
                }
            }else
            {
                cacheKeyList.add(key); //添加到List,后面异常情况补偿回滚处理,当前请求不ok的情况
                redisTemplate.opsForValue().set(key,new Date().getTime()+"");
            }
            redisTemplate.delete(lcKey);
        }
        return  result;
    }


    /**
     * 自定义loc类分布式锁
     */
    public static class MyNxLockUtils {
        private static  volatile String  lcPoint0="xx0", lcPoint1="xx1", lcPoint2="xx2" ,lcPoint3="xx3", lcPoint4="xx4",
                                         lcPoint5="xx5", lcPoint6="xx6", lcPoint7="xx7", lcPoint8="xx8", lcPoint9="xx9";
        /**
         * 分段枷锁,提升效率
         * @param k
         * @return
         */
        private static String getLcPoint(String k){
            switch (k.hashCode()%10){
                case 1: return lcPoint1;
                case 2: return lcPoint2;
                case 3: return lcPoint3;
                case 4: return lcPoint4;
                case 5: return lcPoint5;
                case 6: return lcPoint6;
                case 7: return lcPoint7;
                case 8: return lcPoint8;
                case 9: return lcPoint9;
                default: return lcPoint0;
            }
        }
        private static Cache<String, String> lockCache = null;
        static {
            lockCache =  CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
        }
        /**
         * @param k
         * @param v
         * @return
         */
        public static Boolean getLock(String k, String v){
            if(lockCache.getIfPresent(k)==null)
            {
                synchronized (getLcPoint(k)){
                    if(lockCache.getIfPresent(k)==null){
                        lockCache.put(k,v);
                        return true;
                    }
                }
            }
            return false;
        }
        public static  void unlock(String k, String v)
        {
            String ifPresent = lockCache.getIfPresent(k);
            if(ifPresent!=null&&ifPresent.equals(v)){
                lockCache.invalidate(k);
            }
        }
    }
}

 

SeaRequestBodyHolder
import com.sea.handler.AvoidResubmit;
import org.apache.commons.io.IOUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.NamedThreadLocal;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.servlet.mvc.method.annotation.RequestBodyAdviceAdapter;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/***************************
 *<pre>
 * @Project Name : Sea-blog-service
 * @Package      : com.sea.common.config
 * @File Name    : SeaRequestBodyHolder
 * @Author       :  Sea
 * @Date         : 8/4/22 2:11 PM
 * @Purpose      :  body 默认只能获取一次: 如果接口使用了@RequestBody , HttpServletRequest在接口会获取一次body,后面就无法再次获取body,
 *                  并且出现:getInputStream() has already been called for this request
 *                  此处在 获取body后,优选缓存一份body
 * @History      :
 *</pre>
 ***************************/
@ControllerAdvice
public class SeaRequestBodyHolder extends RequestBodyAdviceAdapter {
    private static String  requestBody = "body";
    private static final ThreadLocal<Map<String,String>> requestBodyHolder = new NamedThreadLocal("Sea Request body holder");
    public static String getRequestBody(){
        try{
            String body = requestBodyHolder.get().get(requestBody);
            resetRequestBody();
            return body;
        }catch (Exception e){
            return null;
        }
    }
    public static void resetRequestBody(){
          requestBodyHolder.remove();
    }
    @Override
    public boolean supports(MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) {
        return true;
    }
    //如果仅仅是post 方法防抖,可以在此处操作
    @Override
    public HttpInputMessage beforeBodyRead(HttpInputMessage httpInputMessage, MethodParameter methodParameter, Type type, Class<? extends HttpMessageConverter<?>> aClass) throws IOException {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
//        System.err.println(request.getMethod());
        // 不包含注解 @CheckResubmit 不需要读body 直接返回
        if(!methodParameter.hasMethodAnnotation(AvoidResubmit.class)){
            return super.beforeBodyRead(httpInputMessage, methodParameter, type, aClass);
        }
        // 第一次 读取body
        String bodyStr = IOUtils.toString(httpInputMessage.getBody(), StandardCharsets.UTF_8);
        // 重新new 一个 HttpInputMessage
        return new HttpInputMessage() {
            @Override
            public InputStream getBody() throws IOException {
                String body = bodyStr;
                //放入requestBodyHolder,后面获取
                requestBodyHolder.set(new HashMap<String, String>(){{put(requestBody,body);}});
                // 重新写入 body
                return new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8));
            }
            @Override
            public HttpHeaders getHeaders() {
                return httpInputMessage.getHeaders();
            }
        };
    }
}

 

标签:body,return,String,自定义,interval,request,import,false,null
来源: https://www.cnblogs.com/lshan/p/16553596.html