咖啡汪日志——实际开发中如何避免缓存穿透和缓存雪崩(代码示例实际展示)
作者:互联网
本汪作为一名资深的哈士奇
每天除了闲逛,拆家,就是啃博客了
作为不是在戏精,就是在戏精的路上的二哈
今天就来给大家说说在实际工作中,如何预防缓存穿透
一、 开篇有益
1、什么是缓存穿透?
通常缓存系统,都是按照key去进行缓存查询,如果不存在对应的value,就应该去数据库查询。一些恶意的请求会故意大量查询不存在的key(例如使用“-1”,“#”,或者UUID生成100万个Key进行查询),就会对数据库造成很大的压力。我们把这种情况称之为缓存穿透。
2.什么是缓存雪崩?
缓存雪崩(缓存失效)的两个原因,从广义上来讲:
第一种,缓存系统本身不可用,导致大量请求直接回源到数据库
第二种,应用层面大量的 Key 在同一时间过期,导致大量的数据回源
3、缓存穿透有什么具体的防护方法?
(1)采用布隆过滤器,将所有可能存在的数据存到一个bitMap中,不存在的数据就会进行拦截。
(2)对查询结果为空的情况也进行缓存,缓存时间设置短一点,不超过5分钟。
4、如何有效避免缓存雪崩?(失效时间扰动)
确保大量 Key , 不在同一时间过期:
简单方案:差异化缓存过期时间,不让大量 Key 在同一时间过期。比如,在初始化缓存的时候,设置缓存的过期时间为 应设过期时间30min + 30秒以内的随机延迟(扰动值)。(待支付订单的有效时间均为30min,互联网企业定律) ,这样,这些 Key 不会在 30min 这个时刻过期,而是分散在 30min ~ 30min+30second 之间过期。
二、大家随本汪,一起来看看实际工作中的代码实现
1、布隆过滤器,数据预加载,预防缓存穿透。
(1)在基础controller引入了bloomfilter
/**
* @author Yuezejian
* @date 2020年 08月22日 16:04:01
*/
public class AbstractController {
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 通用的基础过滤器,如有使用,请在此注明
* 1.BloomFilter 内数据需要预热(固定时间内有效token的过滤,正在使用此过滤器)
* The current three parties request the current system and need to obtain the token,
* which is using the filter {@Modified by yuezejian }
*/
protected static final BloomFilter<String> filter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 1000,0.01);
}
(2)项目启动进行数据加载
import com.slzhkj.smartworksite.model.dto.RequestDto;
import com.slzhkj.smartworksite.model.mapper.RequestMapper;
import com.slzhkj.smartworksite.server.controller.AbstractController;
import com.slzhkj.smartworksite.server.controller.ResponseController;
import com.slzhkj.smartworksite.server.controller.TokenAndSignController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This loader is used for data preheating of redis and bloom filter.
* The current preheating data mainly includes:
* The first, Three parties request obtain time effective token {@link TokenAndSignController },
* When the bloom filter and redis are loaded,
* they need to be updated when I start the system
* The second,Three party request for data access,
* we judge the token {@link ResponseController} if it's enable
*
* @author Yuezejian
* @date 2020年 11月 10日 14:00:18
*/
@Component
public class RedisAndBloomFilterRecordRunner extends AbstractController implements ApplicationRunner, Ordered {
@Autowired
RequestMapper requestMapper;
@Autowired
RedisTemplate redisTemplate;
@Autowired
private Environment env;
@Override
public void run(ApplicationArguments args) throws Exception {
ValueOperations<String,String> tokenOpera = redisTemplate.opsForValue();
List<RequestDto> requestDtos = requestMapper.selectTokenRecord();
AtomicInteger count = new AtomicInteger(0);
requestDtos.stream()
.parallel()
.forEach( dto -> {
filter.put(dto.getToken());
filter.put(dto.getAppId());
if (filter.mightContain(dto.getToken())) {
count.getAndAdd(1);
}
String key = dto.getAppId();
//TODO:- Math.abs((System.currentTimeMillis() - dto.getUpdateTime().getTime())/60000 )
tokenOpera.set(key,dto.getToken(),env.getProperty("token.enable.time",Long.class), TimeUnit.MINUTES);
} );
logger.info("==========total is "+ count +", The data preloading of redis and bloom filter is completed!===========");
}
/**
* Makes the current class load with a higher priority
* @return
*/
@Override
public int getOrder() {
return 2;
}
}
(3)在合适的时机,将新数据加入bloomfilter
private void updateRedisAndBloomFilter(String appId, ValueOperations<String, String> tokenOpera,
String token, int res) {
if ( res > 0 ) {
//TODO: update redis ,insert BloomFilter
String key = appId;
tokenOpera.set(key,token,env.getProperty("token.enable.time",Long.class), TimeUnit.MINUTES);
filter.put(token);
filter.put(appId);
logger.info("appId为:{} 的用户,更新了token: {}, 已存入Redis和基类布隆过滤器", appId, token);
} else {
logger.error("appId为:{} 的用户,更新了token: {}, Database update success, but Redis or BloomFilter update fail!",appId,token);
throw new IllegalStateException("Database update success, but Redis or BloomFilter update fail!");
}
}
2、设置无效数据类型,预防缓存穿透。每当发生缓存穿透时,即缓存和数据库都没有该条数据,在数据库返回 null 后,也应该在缓存中放置相应得 无效类型返回。
在Coupon 类中,加一个获取ID 为“-1”的无效coupon 方法
/**
* <h2>返回一个无效的 Coupon 对象</h2>
* */
public static Coupon invalidCoupon() {
Coupon coupon = new Coupon();
coupon.setId(-1);
return coupon;
}
/** redis 客户端,redis 的 key 肯定是 String 类型,而 StringRedisTemplate 是 value 也都是 String 的一个简化 */
@Autowired
private StringRedisTemplate redisTemplate;
/**
* save List<Coupon> which are null to Cache
* 目的: 避免缓存穿透
* @param userId user id
* @param status coupon status
*/
@Override
public void saveEmptyCouponListToCache(Long userId, List<Integer> status) {
log.info("Save empty list to cache for user: {}, status: {}",userId, JSON.toJSONString(status));
Map<String, String> invalidCouponMap = new HashMap<>();
invalidCouponMap.put("-1",JSON.toJSONString(Coupon.invalidCoupon()));
//使用 SessionCallback 把数据命令放入到 Redis 的 pipeline
//redis 的 pipeline 可以让我们一次性执行多个命令,统一返回结果,而不用每一个命令去返回;
//redis 本身是单进程单线程的,你发送一个命令,他给你一个返回,然后你才可以发生下一个命令给他。
// 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程
//我们都知道Redis有两种持久化的方式,一种是RDB,一种是AOF。
//拿RDB举例,执行bgsave,就意味着 fork 出一个子进程在后台进行备份。
//这也就为什么执行完bgsave命令之后,还能对该Redis实例继续其他的操作。
SessionCallback<Object> sessionCallback =
new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations)
throws DataAccessException {
status.forEach( s -> {
//TODO: 把用户 ID 和 优惠券使用 status 进行拼接,作为 redisKey
String redisKey = status2RedisKey(s,userId);
operations.opsForHash()
.putAll(redisKey,invalidCouponMap);
});
return null;
}
};
log.info("Pipeline exe result: {}",
JSON.toJSONString(sessionCallback));
}
/**
* Get Redis Key According to status
* @param status
* @param userId
* @return
*/
private String status2RedisKey(Integer status, Long userId) {
String redisKey = null;
CouponStatus couponStatus = CouponStatus.of(status);
switch (couponStatus) {
case USABLE:
redisKey = String.format("%s%s", Constant.RedisPrefix.USER_COUPON_USABLE, userId);
break;
case USED:
redisKey = String.format("%s%s",Constant.RedisPrefix.USER_COUPON_USED,userId);
break;
case EXPIRED:
redisKey = String.format("%s%s",Constant.RedisPrefix.USER_COUPON_EXPIRED,userId);
}
return redisKey;
}
当查询缓存没有时,设置缓存失效数据,返回空;有时,直接反序列化然后返回(存的时候,是coupon 的 JSON.toJSONString(coupon))
import org.apache.commons.collections4.CollectionUtils;
@Override
public List<Coupon> getCacheCoupons(Long userId, Integer status) {
log.info("Get Coupons From Cache: {}, {}", userId, status);
String redisKey = status2RedisKey(status,userId);
List<String> couponStrs = redisTemplate
.opsForHash().values(redisKey)
.stream().map( o -> Objects.toString(o,null))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(couponStrs)) {
saveEmptyCouponListToCache(userId,
Collections.singletonList(status));
return Collections.emptyList();
}
return couponStrs.stream().map(
cs -> JSON.parseObject(cs,Coupon.class))
s.collect(Collectors.toList());
}
3、时间扰动,预防缓存雪崩
以秒为单位时,getRandomExpirationTime(1,2) 会返回1~2小时之间的随机时间
redisTemplate.expire(redisKey, getRandomExpirationTime(1,2) , TimeUnit.SECONDS);
/**
* get one Random Expiration Time
* 缓存雪崩:key 在同一时间失效
* @param min 最小小时数
* @param max 最大小时数
* @return 返回 【min, max】之间的随机秒数
*/
private long getRandomExpirationTime(Integer min, Integer max) {
return RandomUtils.nextLong(min * 60 * 60, max * 60 * 60);
}
/**
* insert coupon to Cache
* @param userId
* @param coupons
* @return
*/
private Integer addCouponToCacheForUsable(Long userId, List<Coupon> coupons) {
// 如果 status 是 USABLE, 代表是新增的优惠券
// 只会影响到一个 cache : USER_COUPON_USABLE
log.debug("Add Coupon To Cache For Usable");
Map<String, String> needCacheObject = new HashMap<>();
coupons.forEach( coupon -> {
needCacheObject.put(coupon.getId().toString(),JSON.toJSONString(coupon));
});
String redisKey = status2RedisKey(
CouponStatus.USABLE.getCode(), userId
);
//TODO: redis 中的 Hash key 不能重复,needCacheObject 对应HashMap ,
// coupon id 不可能重复,所以直接 putAll 不会有问题
redisTemplate.opsForHash().putAll(redisKey, needCacheObject);
log.info("Add {} Coupons TO Cache: {} , {}", needCacheObject.size(), userId, redisKey);
//TODO: set Expiration Time, 1h - 2h ,random time
redisTemplate.expire(redisKey, getRandomExpirationTime(1,2) , TimeUnit.SECONDS);
return needCacheObject.size();
}
标签:status,缓存,String,示例,userId,redisKey,import,实际 来源: https://blog.csdn.net/weixin_42994251/article/details/110294750