数据库
首页 > 数据库> > Redis从入门到放弃之Redis key过期监听

Redis从入门到放弃之Redis key过期监听

作者:互联网

1.概述

Redis是一个高效的key-value数据库,同时拥有很多较为强大的功能。针对redis的过期key,可以来实现一些业务,这些业务的共性是不需要较高的实时性。Redis的过期事件可能会存在延迟,所以它无法实现实时性较高的功能。本文主要讲述的是跟据过期事件,实现数据统计与上报的功能。

2.监听事件的两种实现

2.1 业务背景

本文中所讲述的场景主要是:一个服务下面有多台应用设备,设备会定时上报一些数据,此服务需要跟据上报数据定时计算得出一个结果,并将结果进行上报到中心服务进行持久化操作。
设计方案:将每条数据跟据设备唯一标识(这里称为设备编号)和时间戳进行拼接作为key,将上报信息作为value,存为string类型,过期时间可设为统计周期。同时将该设备编号作为hash的key,将每条key中的时间戳作为hash的field,并将上报信息作为hash的value。这样每台设备都会拥有一个唯一的Hash来存储该设备某段时间内的数据,当某一台设备的key过期时,可以在监听事件中监听该事件,并从过期key中获取设备号,利用Redis中hash的getAll方法,将该设备号作为参数,获取该段时间内对应设备的信息,再进行遍历统计,计算结果并进行上报。
方案利弊:
优势:适用于数据量不是特别巨大,实时性要求不是特别高的场景,实现较为简单。
劣势:过期时间可能稍有延迟,实时性较低。

2.2 代码实现

2.2.1 更改Redis配置


```bash
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
notify-keyspace-events Ex

上述配置的最后一行原本是被注销掉,打开该注释即可。
2.2.2 编写RedisConfig配置文件

import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.util.RedisUtils;
import com.baidu.capacity.common.push.provider.redisListener.StreamQualityListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/26 14:06
 */
@Configuration
public class RedisListenerConfiguration {

    @Value("${spring.redis.database}")
    private Integer db;

    @Autowired
    private RedisUtils redisUtils;

    @Bean
    public RedisMessageListenerContainer getListenerContainer(RedisConnectionFactory connectionFactory) {

        //创建连接容器
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        //放入redis连接
        container.setConnectionFactory(connectionFactory);
        
		//指定监听类型
        String patternTopic = "__keyspace@" + db + "__:" + Constant.EDGE_STREAM_QUALITY_INFO_LIST + "*";

        //写入需要被监听的类型
        Topic topic = new PatternTopic(patternTopic);
        container.addMessageListener(new StreamQualityListener(redisUtils), topic);
        return container;
    }
}

2.2.3 过期事件监听流程

package com.baidu.capacity.common.push.provider.redisListener;

import com.alibaba.fastjson.JSON;
import com.baidu.capacity.common.constant.Constant;
import com.baidu.capacity.common.entity.StreamQuality;
import com.baidu.capacity.common.util.RedisLuaDistributedLockUtils;
import com.baidu.capacity.common.util.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import java.util.*;

/**
 * @Description: 流质量key过期处理线程
 * @Author Marin
 * @Date 2021/2/26 16:03
 */
@Slf4j
public class StreamQualityListener implements MessageListener {

    private RedisUtils redisUtils;

    public StreamQualityListener(RedisUtils redisUtils) {
        this.redisUtils = redisUtils;
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String opsType = new String(body);
        String opsValue = new String(channel);
        log.info(opsType);
        log.info(new String(channel));
        String lockVal = UUID.randomUUID().toString();
        //获取过期key,统计过期key对应hash中的数据信息,跟据规则处理并将结果存入缓存
        //__keyspace@9__:STREAM-QUALITY-REPORT-INFO-test1#test1-1614304386593
        if ("expired".equals(opsType)) {
            if (StringUtils.isNotBlank(opsValue) && opsValue.split(":").length > 1) {
                //STREAM-QUALITY-REPORT-INFO-33010201021327688912#33010201021327688912-1614132409846
                String key = opsValue.split(":")[1];
                log.info("流质量信息key过期:{},过期时间为:{}",key,System.currentTimeMillis());
                String commonkey = key.split("-")[4];
                String deviceNum = commonkey.split("#")[0];
                String channelNum = commonkey.split("#")[1];
                try {
                    //加锁,防止同一台设备map未删除完而另一个key过期触发事件,导致生成多条记录结果
                    boolean lock = RedisLuaDistributedLockUtils.luaAcquireLock(commonkey, lockVal, 30);
                    if (lock) {
                        Long start = System.currentTimeMillis();
                        //lostRate总和
                        int count = 0;
                        //非空数据总条数
                        int totalNum = 0;
                        //计算结果,上报中心
                        StreamQuality streamQualityInfo = new StreamQuality();
                        streamQualityInfo.setDeviceNum(deviceNum);
                        streamQualityInfo.setChannelNum(channelNum);
                        streamQualityInfo.setCreateTime(new Date());
                        streamQualityInfo.setModifyTime(new Date());
                        Map<Object, Object> objectObjectMap = redisUtils.hGetAll(Constant.STREAM_QUALITY_HASH_KEY + commonkey);
                        for (Object object : objectObjectMap.keySet()) {
                            String hkey = (String) object;
                            String streamInfo = (String) objectObjectMap.get(hkey);
                            StreamQuality streamQuality = JSON.parseObject(streamInfo, StreamQuality.class);
                            if (StringUtils.isBlank(streamQuality.getDeviceNum()) || StringUtils.isBlank(streamQuality.getChannelNum()) || null == streamQuality.getLostRate()) {
                                log.error("设备号为:{},通道号为:{},lostRate的值为:{}不存在", deviceNum, JSON.toJSONString(streamQuality.getChannelNum()),JSON.toJSONString(streamQuality.getLostRate()));
                                //删除脏数据
                                redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                                continue;
                            }
                            count += streamQuality.getLostRate();
                            totalNum++;
                            //同一设备和同一channelNum对应的unit_id一致
                            streamQualityInfo.setUnitId(streamQuality.getUnitId());

                            //删除已处理的Map中的记录
                            redisUtils.hdel(Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                            //删除该批Map中所对应的未过期的记录
                            redisUtils.delete(Constant.EDGE_STREAM_QUALITY_INFO_LIST + commonkey + "-" + hkey);
                            log.info("删除设备编号为:{},key为:{}", Constant.STREAM_QUALITY_HASH_KEY + commonkey, hkey);
                        }
                        log.info("本次统计设备号为:{}的总数据为:{}", deviceNum, totalNum);
                        //TODO 此处可跟据规则计算结果并进行上报
                        //设置当前时间段内丢包率平均值
                        streamQualityInfo.setLostRate(count / (totalNum > 0 ? totalNum : 1));
                        log.info("本次上报结果为:{}", JSON.toJSONString(streamQualityInfo));
                        //将结果放入队列,开启线程轮询上报
                        redisUtils.lpush(Constant.STREAM_QUALITY_RESULT, JSON.toJSONString(streamQualityInfo));
                        log.info("处理结果时间为:{} ms", System.currentTimeMillis() - start);
                    }
                } catch (Exception e) {
                    log.error("获取redis流质量信息出错,错误信息:{}", e);
                } finally {
                    /** 释放分布式锁 */
                    RedisLuaDistributedLockUtils.luaReleaseLock(commonkey, lockVal);
                }
            }
        }
    }
}

3.小结

1.鉴于Redis的默认过期删除策略,上述方法适用于同批次上报数据量不大的场景(数据量小于500左右,上报周期1s);
2.经测试,当同批数据量较大,超过1000或更多(上报周期为1s),会造成某段时间内同一台设备会生成两条结果,原因是当前设备的第一条数据过期时,处理流程未能及时删除第二条过期key,而数据仍不断写入,则第二条数据又触发了一次过期事件,导致生成两条结果。目前正在寻找优化方案,稍后更新。

4.参考文献

1.https://juejin.cn/post/6844904158227595271
2.https://www.cnblogs.com/pinxiong/p/13288087.html
3.https://blog.csdn.net/shuizimuzhonglingf/article/details/102782014

标签:String,redisUtils,过期,Redis,key,org,import,监听
来源: https://blog.csdn.net/qq_33479841/article/details/114270089