数据库
首页 > 数据库> > Redis 实现库存扣减操作

Redis 实现库存扣减操作

作者:互联网

原文链接:https://mp.weixin.qq.com/s/uyX9eRnd2xPOEr6lwax8Yw

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。

在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:

update number set x=x-1 where x > 0

理论上即使是这样由于MySQL事务的特性,这种方法只能降低超卖的数量,但是不可能完全避免超扣。

因为数据库默认隔离级别是repeatable read,假如库存是5,有A、B两个请求分别创建了事务并且都没有提交,当A事务提交了,改了库存为4,但是因为是事务隔离级别是可重复读的,所有B看不到A事务改的库存。到时B看到的库存还是5,所以B修改库存为4,这样就出现了超扣问题。

所以我们扣库存的时候需要将事务隔离级别设置成read commit才可以。(我自己测试没有出现这种情况)

基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。

比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

Redis Incrby 命令

Redis Incrby 命令将 key 中储存的数字加上指定的增量值。

本操作的值限制在 64 位(bit)有符号数字表示之内。

语法

redis Incrby 命令基本语法如下:

redis 127.0.0.1:6379> INCRBY KEY_NAME INCR_AMOUNT
可用版本

>= 1.0.0

返回值

加上指定的增量值之后, key 的值。

基于redis实现扣减库存的具体实现

具体关于lua脚本的内容使用请移步至 redis命令参考–Script脚本 :

http://doc.redisfans.com/script/index.html

lua脚本注意点:

Lua脚本,是一种轻量级的脚本语言。设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua脚本的应用也很多,比如Nginx+Lua实现的OpenResty,Redis+Lua配合使用(Redisson中大量使用了Lua脚本)。

Lua脚本具有以下好处:

1、减少网络开销:Lua脚本在执行的时候,是先发送到Redis服务器的,然后在服务器上执行脚本。多个命令和业务逻辑都封装到脚本里,一次性提交到服务器。

2、原子性操作:我们都知道redis在执行命令时是单线程的,但是每个命令之间就存在并发的情况,就存在先查询再操作时,两个命令没办法保证线程安全。但使用Lua脚本时,redis把这个脚本操作当成是一个命令,那么这个脚本中的多条操作也就保证了原子性。(注意:只保证原子性,不是事务)

虽然Lua脚本有这么多优点,但是也不能乱用,使用的时候要注意:

1、Lua脚本可以在redis单机模式、主从模式、Sentinel集群模式下正常使用,但是无法在分片集群模式下使用。(脚本操作的key可能不在同一个分片)。(其实集群模式不支持问题也是可以解决的,在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment,不支持cluster。但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可,在后面会说下这个问题)

2、Lua脚本中尽量避免使用循环操作(可能引发死循环问题),尽量避免长时间运行。

3、redis在执行lua脚本时,默认最长运行时间时5秒,当脚本运行时间超过这一限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。

初始化库存回调函数(IStockCallback )
/**
 * 获取库存回调
 * @author yuhao.wang
 */
public interface IStockCallback {
 
    /**
     * 获取库存
     * @return
     */
    int getStock();
}

扣减库存服务(StockService)

ackage com.xiaolyuh.service;
 
import com.xiaolyuh.lock.RedisLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * 扣库存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
 
    /**
     * 库存还未初始化
     */
    public static final long UNINITIALIZED_STOCK = -3L;
 
    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;
 
    static {
        /**
         *
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *      -3:库存未初始化
         *   -2:库存不足
         *   -1:不限库存
         *   大于等于0:剩余库存(扣减之后剩余的库存),直接返回-1
         */
        StringBuilder sb = new StringBuilder();
        // exists 判断是否存在KEY,如果存在返回1,不存在返回0
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        // get 获取KEY的缓存值,tonumber 将redis数据转成 lua 的整形
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        // 如果拿到的缓存数等于 -1,代表改商品库存是无限的,直接返回1
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        // incrby 特性进行库存的扣减
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0-num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
 
    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
 
        }
        return stock;
    }
 
    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
 
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
 
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
 
 
 
/**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {
 
        return addStock(key, null, num);
    }
 
    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
 
        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
 
        return num;
    }
 
    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
 
}
调用
@RestController
public class StockController {
 
    @Autowired
    private StockService stockService;
 
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
 
    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }
 
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.getStock(redisKey);
    }
 
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.addStock(redisKey, 2);
    }
}

思路理解

库存新增思路

库存新增的操作一般不存在高并发的情况,因为不可能某一种商品一直在新增库存,这属于管理员后台管理的一种操作。

这里新增库存采用了redis的

1.库存发生新增操作,调用层一般传过来商品的id标识和新增量,调用库存新增服务

2.库存新增服务

/**
 * 加库存(还原库存)
 * @param key    库存key
 * @param num    库存数量
 * @return 
 */
public long addStock(String key, int num) {

    return addStock(key, null, num);
}

库存新增服务主要是使用了redis的increment自增操作。

3.辛苦新增服务

boolean hasKey = redisTemplate.hasKey(key);
// 判断key是否存在,存在就直接更新
if (hasKey) {
    return redisTemplate.opsForValue().increment(key, num);
}

第一种情况是先判断redis中是否有这个商品库存的缓存,如果存在该商品库存,就直接进行增加操作;

Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
    if (redisLock.tryLock()) {
        // 获取到锁后再次判断一下是否有key
        hasKey = redisTemplate.hasKey(key);
        if (!hasKey) {
            // 初始化库存
            redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
        }
    }
} catch (Exception e) {
    logger.error(e.getMessage(), e);
} finally {
    redisLock.unlock();
}

return num;

然后是第二种情况了,就是redis中没有库存缓存了。所以就需要去初始化库存。因为初始化库存有一些非原子的操作,在分布式环境下不安全,所以这里先通过这个商品id获取分布式锁,拿到锁之后,再去判断一下redis中是否有这个缓存,确认没有,则可以进行初始化操作,然会返回数量,初始化操作可以从数据库查出真实库存的值,然后更新到缓存。

我这里的案列是直接把第一次传进来的库存数量进行初始化。

可能设计的问题

在对某key进行increment()方法时,可能会报错:

redis ERR value is not an integer or out of range 

这里库存新增我们使用的是RedisTemplateincrement的自增方法。

Spring对Redis序列化的策略有两种,分别是StringRedisTemplateRedisTemplate,其中StringRedisTemplate用于操作字符串,RedisTemplate使用的是JDK默认的二进制序列化。

大家都知道redis序列化是将key,value值先转换为流的形式,再存储到redis中。

RedisTemplate是使用的JdkSerializationRedisSerializer序列化,序列化后的值包含了对象信息,版本号,类信息等,是一串字符串,所以无法进行数值自增操作。

StringRedisTemplate序列化策略是字符串的值直接转为字节数组,所以存储到redis中是数值,所以可以进行自增操作。

所以问题出在这里,我们需要自定义序列化策略,在application启动类中添加如下:

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
   StringRedisTemplate template = new StringRedisTemplate(factory);
   //定义key序列化方式
   //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要
   //定义value的序列化方式
   Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
   ObjectMapper om = new ObjectMapper();
   om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
   om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
   jackson2JsonRedisSerializer.setObjectMapper(om);
 
   // template.setKeySerializer(redisSerializer);
   template.setValueSerializer(jackson2JsonRedisSerializer);
   template.setHashValueSerializer(jackson2JsonRedisSerializer);
   template.afterPropertiesSet();
   return template;

库存扣减思路

利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。

库存发生扣减操作,调用层一般传过来商品的id标识和扣减量,调用库存扣减服务

long stock = stock(key, num);

第一步是进行扣减操作,在正常情况下,如果缓存中存在库存数据,则会进行正常的扣减操作,并且返回结果。

// 初始化库存
if (stock == UNINITIALIZED_STOCK) {
    RedisLock redisLock = new RedisLock(redisTemplate, key);
    try {
        // 获取锁
        if (redisLock.tryLock()) {
            // 双重验证,避免并发时重复回源到数据库
            stock = stock(key, num);
            if (stock == UNINITIALIZED_STOCK) {
                // 获取初始化库存
                final int initStock = stockCallback.getStock();
                // 将库存设置到redis
                redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                // 调一次扣库存的操作
                stock = stock(key, num);
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
        redisLock.unlock();
    }

}

第二种是缓存中还没有数据的情况,则需要进行初始化操作。初始化库存存在非原子操作,所以需要使用分布式锁来实现。拿到锁之后,在进行一次库存扣减操作,看返回的结果还是不是没有缓存,这是进行一次双重验证,避免并发时重复回源到数据库。第二次验证的结果还是没有缓存的话,则需要进行一次初始化缓存操作。初始化操作可以从数据库查出真实库存的值,然后更新到缓存。然后再进行一次扣减操作。

可能存在的问题:

RedisTemplate执行lua脚本,集群模式下报错解决

在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment,不支持cluster。

但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可:

//spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本异常,此处拿到原redis的connection执行脚本
String result = (String)redisTemplate.execute(new RedisCallback<String>() {
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection();
        // 集群模式和单点模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
        // 集群
        if (nativeConnection instanceof JedisCluster) {
            return (String) ((JedisCluster) nativeConnection).eval(LUA, keys, args);
        }

        // 单点
        else if (nativeConnection instanceof Jedis) {
            return (String) ((Jedis) nativeConnection).eval(LUA, keys, args);
        }
        return null;
    }
});

 

标签:库存,扣减,Redis,redis,num,key,return,stock
来源: https://www.cnblogs.com/wk-missQ1/p/16108054.html