其他分享
首页 > 其他分享> > 微服务:多级缓存下

微服务:多级缓存下

作者:互联网

多级缓存下

Redis缓存预热

Redis缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

1)利用Docker安装Redis

docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes

2)在item-service服务中引入Redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3)配置Redis地址

spring:
  redis:
    host: 192.168.150.101

4)编写初始化类

缓存预热需要在项目启动时完成,并且必须是拿到RedisTemplate之后。

这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被Spring创建并且成员变量全部注入后执行。

@Component
public class RedisHandler implements InitializingBean {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService iItemStockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();
    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化缓存
        //1.查询商品信息
        List<Item> itemList = itemService.list();
        //2.放入缓存
        for (Item item:itemList) {
            //2.1 item序列化为json
            String json = MAPPER.writeValueAsString(item);
            //2.2将json存入
            redisTemplate.opsForValue().set("item:id:"+item.getId(),json) ;
        }

        //3.查询库存信息
        List<ItemStock> stockList = iItemStockService.list();
        //4.放入缓存
        for (ItemStock stock:stockList) {
            //4.1 item序列化为json
            String json = MAPPER.writeValueAsString(stock);
            //4.2将json存入
            redisTemplate.opsForValue().set("stock:id:"+stock.getId(),json) ;
        }
    }
}

Redis缓存

封装Redis工具

OpenResty提供了操作Redis的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将Redis操作封装到之前的common.lua工具库中。

修改/usr/local/openresty/lualib/common.lua文件:

1)引入Redis模块,并初始化Redis对象

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2)封装函数,用来释放Redis连接,其实是放入连接池

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

3)封装函数,根据key查询Redis数据

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

4)导出

-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

完整的common.lua:

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

实现Redis查询

接下来,我们就可以去修改item.lua文件,实现对Redis的查询了。

查询逻辑是:

1)修改/usr/local/openresty/lua/item.lua文件,添加一个查询函数:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

2)而后修改商品查询、库存查询的业务:

image-20210821114528954

3)完整的item.lua代码:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

nginx本地缓存

现在,整个多级缓存中只差最后一环,也就是nginx的本地缓存了。如图:

image-20210821114742950

本地缓存API

OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

1)开启共享字典,在nginx.conf的http下添加配置:

 # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
 lua_shared_dict item_cache 150m; 

2)操作共享字典:

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

案例:在查询商品时,优先查询OpenResty的本地缓存

需求:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
--导入cjson
local cjson = require('cjson')
--导入共享词典
local item_cache = ngx.shared.item_cache


--封装查询函数
function read_data(key,expire,path,params)
    --查询本地缓存
    local val = item_cache:get(key)
    ngx.log(ngx.ERR,"val:",val)
    if not val then
        ngx.log(ngx.ERR,"本地缓存查询失败,key:",key) 
        --如果没空,则查询redis
        val = read_redis("127.0.0.1",6379,key)
        --判断查询结果
        if not resp then
            --redis查询失败查http
            ngx.log(ngx.ERR,"redis查询失败,key:",key)
            val = read_http(path,params)
        end
    end
    --将数据写入本地缓存
    item_cache:set(key,val,expire)
    return val
    
end

local id =ngx.var[1]

--查询商品信息
local itemJson = read_data("item:id:" .. id,1800,"/item/" .. id,nil)
--查询库存信息
local stockJson = read_data("stock:id:" .. id,60,"/item/stock/" .. id,nil)
--json转化为lua的table,反序列化
local item = cjson.decode(itemJson)
local stock = cjson.decode(stockJson)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
--序列化后返回结果
ngx.say(cjson.encode(item))

缓存同步

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

同步双写:在修改数据库的同时,直接修改缓存

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

而异步实现又可以基于MQ或者Canal来实现:

1)基于MQ的异步通知:

image-20210821115552327

解读:

依然有少量的代码侵入。

2)基于Canal的通知

image-20210821115719363

解读:

代码零侵入


认识Canal

Canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

image-20210821115914748

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20210821115948395

监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

image-20210821120049024

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

RabbitMQ高级特性

消息队列在使用过程中,面临着很多实际问题需要思考:

image-20210718155003157

消息可靠性

消息从发送,到消费者接收,会经理多个过程:

image-20210718155059371

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

针对这些问题,RabbitMQ分别给出了解决方案:


生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

image-20210718160907166

注意:

image-20210718161707992

案例:SpringAMQP实现生产者确认

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
   

说明:

编写逻辑

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

//编写publisher-return
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取rabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //lambda表达式
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{}",
                    replyCode,replyText,exchange,routingKey);
            //重发消息等。。
        });
    }
}
//编写publisher-confirm
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    //准备消息
    String message = "hello, spring amqp!";

    //准备correlationData
    //准备消息id
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

    correlationData.getFuture().addCallback(result -> {
        if(result.isAck()){
            //ACK
            log.debug("消息成功投递到交换机!消息id:{}",correlationData.getId());
        }else{
            //NACK
            log.error("发送失败,消息id:{}",correlationData.getId());
            //重发消息
        }
    }, ex -> {
        log.error("消息发送失败!"+ex);
        //重发消息
    });

    //发送消息
    rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
//消息成功投递到交换机!消息id:ec49a9e8-d218-4279-b314-c3d12f896c85
//发送失败,消息id:3d5d2b55-d194-4c29-833e-eb921b0e265f
//消息发送到队列失败,响应码:312,失败原因:NO_ROUTE,交换机:amq.topic,路由key:saimple.test

小结

SpringAMQP中处理消息确认的几种情况:

publisher-comfirm:


消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

持久化交换机

/**
 * 持久化的交换机
 * @return
 */
@Bean
public DirectExchange simpleDirect(){
    return new DirectExchange("simpleDirect",true,false);
}

持久化队列

/**
 * 持久化的队列
 * @return
 */
@Bean
public Queue simpleQueue(){
    return QueueBuilder.durable("simple.queue").build();
}

消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

@Test
public void testD(){
    Message msg = MessageBuilder.withBody("hello_world".getBytes(StandardCharsets.UTF_8))//消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
        .build();
    rabbitTemplate.convertAndSend("simple.queue",msg);
}

消费者确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

由此可知:

一般,我们都是使用默认的auto即可。

使用spring提供的默认消息确认

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):

image-20210718171705383

抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

image-20210718171759179


失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

image-20210718172746378

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

结论:


失败重试策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

总结

如何确保RabbitMQ消息的可靠性?

标签:缓存,服务,--,ngx,多级,redis,查询,item,local
来源: https://www.cnblogs.com/Boerk/p/16120599.html