Redis 笔记 02:实战篇
作者:互联网
Redis 笔记 02:实战篇
这是本人根据黑马视频学习 Redis 的相关笔记,系列文章导航:《Redis设计与实现》笔记与汇总
短信登陆
业务流程:
利用session完成
发送验证码
在 service 层中完成相应的逻辑,即上图左侧的逻辑:
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Override
public Result sendCode(String phone, HttpSession session) {
// 1. 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2. 如果不符合,返回错误信息
return Result.fail("手机格式错误");
}
// 3. 符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 4. 保存验证码到 Session
session.setAttribute("code", code);
// 5. 模拟发送验证码
log.debug("验证码为: {}", code);
// 6. 返回 ok
return Result.ok();
}
}
验证码登录
对应中间的流程图
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号和验证码
if (RegexUtils.isPhoneInvalid(loginForm.getPhone())) {
return Result.fail("手机格式错误");
}
// 2. 校验验证码
Object code = session.getAttribute("code");
if (loginForm.getCode() == null || !loginForm.getCode().equals(code)) {
// 3. 不一致,报错
return Result.fail("验证码格式错误");
}
// 4. 一致,根据手机号查询用户
User user = query().eq("phone", loginForm.getPhone()).one();
// 5. 判断用户是否存在
if (user == null) {
user = createUserWithPhone(loginForm.getPhone());
}
session.setAttribute("user", user);
return Result.ok();
}
private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
save(user);
return user;
}
自定义拦截器
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取 session
HttpSession session = request.getSession();
// 2. 获取 session 中的用户
User user = (User) session.getAttribute("user");
// 3. 判断用户是否存在
if (user == null) {
response.setStatus(401);
// 4. 不存在,拦截
return false;
}
// 5. 存在,保存用户信息到 ThreadLocal
UserDTO userDTO = new UserDTO();
userDTO.setId(user.getId());
userDTO.setNickName(user.getNickName());
UserHolder.saveUser(userDTO);
// 6. 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
);
}
}
利用redis完成
方案设计
发送验证码
@Override
public Result sendCode(String phone, HttpSession session) {
// 1. 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2. 如果不符合,返回错误信息
return Result.fail("手机格式错误");
}
// 3. 符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 4. 保存验证码到 Session
//session.setAttribute("code", code);
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
// 5. 模拟发送验证码
log.debug("验证码为: {}", code);
// 6. 返回 ok
return Result.ok();
}
登录校验
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号和验证码
if (RegexUtils.isPhoneInvalid(loginForm.getPhone())) {
return Result.fail("手机格式错误");
}
// 2. 校验验证码
// Object code = session.getAttribute("code");
String code = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + loginForm.getPhone());
if (loginForm.getCode() == null || !loginForm.getCode().equals(code)) {
// 3. 不一致,报错
return Result.fail("验证码格式错误");
}
// 4. 一致,根据手机号查询用户
User user = query().eq("phone", loginForm.getPhone()).one();
// 5. 判断用户是否存在
if (user == null) {
user = createUserWithPhone(loginForm.getPhone());
}
// 保存用户信息到 Redis 当中
// session.setAttribute("user", user);
String token = UUID.randomUUID().toString();
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((filedName, filedValue) ->
filedValue.toString()
));
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
return Result.ok();
}
拦截器修改
第一个用来刷新过期时间
public class RefreshTokenInterceptor implements HandlerInterceptor {
private final StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取 session
// HttpSession session = request.getSession();
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
String tokenKey = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);
// 3. 判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 2. 获取 session 中的用户
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 5. 存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 刷新过期时间
stringRedisTemplate.expire(tokenKey, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
// 6. 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
第二个用来判断是否需要登录
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 判断是否需要拦截
if (UserHolder.getUser() == null) {
response.setStatus(401);
return false;
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
配置一下加载顺序:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}
- 注意通过
order
设置了拦截器加载的顺序
商户查询缓存
添加缓存
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 尝试从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 2. 不存在,根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
return Result.fail("店铺不存在");
}
// 写入 Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
}
缓存更新策略
主动更新 的三种策略:
左侧方案的三个问题:
先删除缓存还是先操作数据库?
都有问题,右边可能性低,选右边
添加缓存更新
这里的策略是:当更新时顺便删除缓存
@Override
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("店铺ID不能为空");
}
// 1. 更新数据库
updateById(shop);
// 2. 删除缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
stringRedisTemplate.delete(key);
return Result.ok();
}
缓存的问题
缓存穿透
这里采用缓存空对象的方式进行解决,需要改动的地方有:
- 如果没命中,缓存空信息
- 从缓存读取信息时,判断是否为空
public Result queryById(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 尝试从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
if (shopJson != null) {
// 说明之前被加入到Redis中了!
return Result.fail("店铺信息不存在");
}
// 2. 不存在,根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 添加空值到 Redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}
// 写入 Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
缓存雪崩
缓存击穿
两种解决方案:
两种解决方案的对比:
下面来代码实践一下
这里的流程如下:
所谓的锁不是在学 JUC 时用到的各种锁,而是用 Redis 的 setnx 来设置
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
业务流程:
public Shop queryWithPassThroughMutex(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 尝试从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}
if (shopJson != null) {
return null;
}
// 2. 不存在,根据id查询数据库
// 获取互斥锁
String lockKey = null;
Shop shop = null;
try {
lockKey = RedisConstants.LOCK_SHOP_KEY + id;
if (!tryLock(lockKey)) {
Thread.sleep(50);
queryWithPassThroughMutex(id);
}
shop = getById(id);
// 模拟等待
Thread.sleep(200);
if (shop == null) {
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 写入 Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放互斥锁
unlock(lockKey);
}
return shop;
}
封装常用工具类
我们将基于 StringRedisTemplate 封装一个工具类,满足如下需求:
- 方法一:将任意 Java 对象序列化为 JSON 并存储在 string 类型的 key 中,并且可以设置 TTL
- 方法二:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法三∶根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法四∶根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
代码地址:huyi612/hm-dianping - Gitee.com
(上面代码有部分有问题, 见下面的评论)
优惠券功能
唯一ID生成器
利用 Redis 的 icr 方法, 加上时间戳等信息生成 id
@Component
public class RedisIdWorker {
private final StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
private static final long BEGIN_TIMESTAMP = 1640995200L;
private static final int COUNT_BITS = 32;
public long nextId(String keyPrefix) {
// 1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
System.out.println(timestamp);
// 2. 生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
System.out.println(count);
// 3. 拼接返回
return timestamp << COUNT_BITS | count;
}
}
实现下单功能
- 秒杀是否开始或结束
- 库存是否充足, 不足则无法下单
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1. 查询
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始或结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
// 3. 判断库存
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}
// 4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success) {
return Result.fail("库存不足!");
}
// 5. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setPayType(1);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 6. 返回订单
return Result.ok(orderId);
}
线程安全问题
用 JMeter 模拟多个用户同时参与秒杀, 有可能会出现线程安全问题(不过很奇怪, 我这里自己测试的时候并没有出现这个问题)
乐观锁的实现方案:
CAS, 这里的场景似乎也不需要担心 ABA 问题, 所以只用一个库存量就行了, 不用专门搞一个版本
线程安全实践
当然, 这里 eq 显然有些严格, 很容易造成还有券, 但是都没抢到的问题 (这里可没有失败了重试之类的说法)
所以实践中我们只需让 库存大于 0 即可
.gt("stock", 0);
一人一单
同样会有线程安全的问题, 这里似乎不太适合用乐观锁, 毕竟是添加不同的信息, 而不是对一条信息的多次修改, 所以用悲观锁
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始或结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
// 3. 判断库存
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}
// 6. 返回订单
return createVoucherOrder(voucherId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
VoucherOrder voucherOrder = null;
synchronized (userId.toString().intern()) {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("用户已经购买过了!");
}
// 4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success) {
return Result.fail("库存不足!");
}
// 5. 创建订单
voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setPayType(1);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
}
return Result.ok(voucherOrder);
}
这段代码有两个学到的地方:
- 如果用 String 的 toString 方法不能保证锁的唯一, 这是由于其中的实现好像有随机的地方, 要用 String 的 intern 方法
- 加 @Transaction 才能实现[这一点我暂时不能理解这个事务]
但是集群状态下, 仅依靠 JVM 自身的锁是不能实现的! 要用分布式锁
分布式锁
Redis实现分布式锁
定义接口:
public interface ILock {
boolean tryLock(long timeoutSec);
void unlock();
}
实现类:
public class SimpleRedisLock implements ILock{
private final String name;
private final StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,
threadId + "", timeoutSec, TimeUnit.SECONDS);
// 防止空指针的风险
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
修改业务
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = simpleRedisLock.tryLock(1200);
if (!isLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
VoucherOrder voucherOrder = null;
try {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("用户已经购买过了!");
}
// 4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success) {
return Result.fail("库存不足!");
}
// 5. 创建订单
voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setPayType(1);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
} finally {
simpleRedisLock.unlock();
}
return Result.ok(voucherOrder);
}
问题与改进
当前方案的问题一:
超时释放后, 线程会在运行结束后把别人的锁解开了
解决方法: 解锁前判断一下标识
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,
threadId + "", timeoutSec, TimeUnit.SECONDS);
// 防止空指针的风险
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
当前的问题二:
如何保证原子性? 比如判断锁和释放锁两个步骤是分开的, 没有一致性, 所以仍然有可能把别人的锁给释放了
答: 借助 Lua 脚本 Lua 的基础教程入门 : Lua基础教程笔记
Redis 提供了 Lua 脚本功能, 在一个脚本中编写多条 Redis 命令, 确保多条命令执行时的原子性
Lua 脚本用如下方式执行 redis 的命令:
redis.call("set", "name", "rose");
Redis 用如下方式执行脚本: (1 表示有一对参数)
EVAL "return redis.call(xx, xx, xx)" 1 name rose
创建一个 unlock.lua
local id = redis.call('get', KEYS[1])
if (id == ARGV[1]) then
return redis.call('del', KEYS[1])
end
return 0
在 Lock 类中添加相关逻辑:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
Redisson
自己实现的锁的问题:
基本使用
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
创建一个 bean
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.137.112:6379").setPassword("abc123");
return Redisson.create(config);
}
}
获取锁:
RLock lock = redissonClient.getLock("lock:order:" + userId);
原理
- 可重入 : 利用 hash 结构记录线程 id 和重入次数
- 可重试 : 利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约 : 利用 watchDog,每隔一段时间( releaseTime / 3),重置超时时间
- 主从一致性 : 联锁
秒杀优化
异步秒杀
原来的程序, 当用户进行秒杀操作时, 许多操作都在一个线程中完成, 用户必须要等待所有操作完成后才能看到结果, 并且其中很多操作要直接和 MySQL 打交道:
考虑现实生活中的情况, 假如你去学校食堂吃饭, 某一家的饭特别好吃, 供不应求, 那么一种方法是排一个长长的队, 付款后就站着等, 等饭好了再端走.
我们上面的实现就是这种情况. 而现在更多的一种情况是, 下完单后, 给你一张小票, 之后等做好了会叫号让你来取餐.
所以另一种思路是: 把一些操作放在 Redis 中完成
可以用 Lua 脚本来完成一些逻辑功能, 用 Set 结构来判断是否购买过
秒杀资格判断实现
在创建秒杀券时同时保存到 redis 当中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 秒杀优化: 保存优惠券信息到 Redis 当中
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
创建 lua 脚本
-- 1. 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户 Id
local userId = ARGV[2]
-- 2. 数据 key
-- 2.1 库存 key
local stockKey = "seckill:stock:" .. voucherId
-- 2.2 订单 key
local orderKey = "seckill:order:" .. voucherId
-- 3. 脚本业务
-- 3.1 库存不足
if (tonumber(redis.call('get', stockKey)) <= 2) then
return 1
end
-- 3.2 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end
-- 3.3 扣库存, 加用户
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
return 0
更改逻辑为直接修改 Redis 中的数据
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1. 执行 lua 脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
// 2. 判断结果
assert result != null;
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3. 返回
long orderId = redisIdWorker.nextId("order");
// TODO 保存到阻塞队列
return Result.ok(orderId);
}
阻塞队列
利用一个阻塞队列来异步处理耗时的操作
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder take = orderTasks.take();
createVoucherOrder(take);
} catch (InterruptedException e) {
log.error("订单处理异常",e);
}
}
}
}
private void createVoucherOrder(VoucherOrder voucherOrder) {
long userId = voucherOrder.getUserId();
long voucherId = voucherOrder.getVoucherId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取锁失败
log.error("不允许重复下单");
return;
}
try {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("用户已经购买过了!");
return;
}
// 4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success) {
log.error("库存不足!");
return;
}
save(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
但是 BlockQueue 是基于内存的,有如下的问题
- 内存限制问题
- 数据安全问题(如果宕机,数据会丢失)
可以借助一些成熟的方案来解决这个问题
消息队列
Redis 提供了三种不同的方式来实现消息队列:
list结构
:基于 List 结构模拟消息队列PubSub
:基本的点对点消息模型Stream
:比较完善的消息队列模型
List
Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现
不过要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果
优点:
- 利用 Redis 存储,不受限于 JVM 内存上限
- 基于 Redis 的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
PubSub
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
Stream
特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
Stream Group
特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
实战
在 Redis 中创建一个组:
XGROUP CREATE stream.orders g1 0 MKSTREAM
修改之前的 Lua 脚本:
业务逻辑方面:
添加到消息队列的逻辑放在了 Lua 脚本中
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1. 执行 lua 脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2. 判断结果
assert result != null;
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3. 返回
return Result.ok(orderId);
}
处理消息队列的代码如下:
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
String queueName = "stream.orders";
while (true) {
try {
// 1. 获取消息队列中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2. 判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null 没有消息, 继续循环
continue;
}
System.out.println("=================" + list.size());
// 3. 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 4. 创建订单
createVoucherOrder(voucherOrder);
// 5. 确认消息
// XACK s1 g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常",e);
handlePendingList();
}
}
}
}
处理 Pending 队列的代码如下,和上面的逻辑相仿:
private void handlePendingList() {
String queueName = "stream.orders";
while (true) {
try {
// 1. 获取PendingList 中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2. 判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null 没有消息, 继续循环
break;
}
// 3. 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 4. 创建订单
createVoucherOrder(voucherOrder);
// 5. 确认消息
// XACK s1 g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("Pending 订单处理异常",e);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
标签:02,实战篇,return,String,Redis,public,Result,id,stringRedisTemplate 来源: https://www.cnblogs.com/lymtics/p/16439357.html