数据库
首页 > 数据库> > 借助Redis 实现限流

借助Redis 实现限流

作者:互联网

场景:调用上游的接口,上游不同接口有不同的调用频率限制,我们全量同步有上百万条数据,如何快速同步

例子:

上游有接口A,B,C

A接口频率 600次/min

B接口频率 200次/min

C接口频率 1000次/min

如果超过频率,调用会报错

写了一个本地做好限流的方法,要求所有的记录都要调用且 减少调用报错的问题

因为借助了RedisTemplate 暂不考虑 原子性问题

@Slf4j
public class RateLimitUtil {

    RedisUtils redisUtils;

    // 频次:limit/timeSecond
    //频率
    private Integer limit;
    //时间单位秒
    private Integer timeSecond;

    public RateLimitUtil(RedisUtils redisUtils, Integer limit, Integer timeSecond) {
        this.redisUtils = redisUtils;
        this.limit = limit;
        this.timeSecond = timeSecond;
    }

    /**
     * 非分布式锁
     * @param clazz
     * @param method
     */
    public synchronized void lock(Class clazz, String method){
        String key = clazz.toString()+"."+method;
        String valueStr ;
        int value;
        while (true){
            valueStr = redisUtils.getValue(key);
            if(valueStr == null){
                boolean putFlag = redisUtils.setNx(key, 1);
                if(putFlag){
                    redisUtils.expireSeconds(key,timeSecond);
                    break ;
                }
                continue;
            }
            value = Integer.valueOf(valueStr);
            if(value >= limit){
                long expire = redisUtils.getExpire(key);
                sleepSecond(expire+1);
                if(expire>0){
                    log.info("{} sleep:{} 秒",key,expire);
                }
                continue;
            }
            redisUtils.incr(key,1L);
            break;
        }
    }

    public void sleepSecond(long n){
        try {
            Thread.sleep(n*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void sleep(long n){
        try {
            Thread.sleep(n);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 分布式锁
     * @param clazz
     * @param method
     */
    public void lockDistributed(Class clazz,String method){
        String lockKey = clazz+"."+method+"_lock";
        try {
            while (true){
                boolean result = redisUtils.setNx(lockKey, Thread.currentThread().getId());
                if(result){
                    redisUtils.expireSeconds(lockKey,timeSecond);
                    lock(clazz,method);
                    break;
                }else {
                    sleepSecond(10);
                }
            }
        }catch (Exception e){

        }finally {
            String value = redisUtils.getValue(lockKey);
            if(value!= null){
                if(String.valueOf(Thread.currentThread().getId()).equals(value)){
                    redisUtils.del(lockKey);
                }
            }
        }
    }

}

使用例子:

  @Test
  public void  testLimit1(){
      RateLimitUtil rateLimitUtil = new RateLimitUtil(redisUtils,10,6);
      Random random = new Random();
      for (int i=0;i<100;i++){
          try {
              Thread.sleep(random.nextInt(200));
          }catch (Exception e){

          }
          long time = System.currentTimeMillis()/1000;

          rateLimitUtil.lockDistributed(this.getClass(),"testLimit");
      }
  }

 

标签:key,timeSecond,String,redisUtils,Redis,public,限流,借助,method
来源: https://blog.csdn.net/zy1404/article/details/117954261