数据库
首页 > 数据库> > [记录点滴]Redis实现简单消息队列

[记录点滴]Redis实现简单消息队列

作者:互联网

本文提出了一种用Redis实现简单消息队列的方案,适合在资源不足的条件下临时使用。

[记录点滴]Redis实现简单消息队列


目录


0x00 摘要

本文提出了一种用Redis实现简单消息队列的方案,适合在资源不足的条件下临时使用。

0x01 缘由

一个兄弟创业,资源严重不足,但是还希望搭建一个消息队列,于是就咨询我。我恰好有些相关经验,就和他分享。他的需求如下:

在这种情况下,我建议他在Redis上构建消息队列,暂时渡过难关。

0x02 背景概念

2.1 Redis是否适合做消息队列

首先说结论:Redis肯定是不适合做消息队列的,因为这个本身就不是Redis设计的初衷。

但是如果确实资源受限,为了降低系统的维护成本和实现复杂度,还是可以考虑使用Redis的。

2.1.1 Redis的问题

因为Redis就不是为消息队列设计的,所以它没有考虑一些消息队列的基本问题:

如果按照是否容忍错误来区分,可以分为两种队列,但是这两种都不适合Redis。

2.1.2 不容忍错误

这种队列的要求是:不允许丢失消息,要保证一致性。比如下单操作。

在这个需求下,用Redis是不实际的,因为你需要考虑如何在Redis基础上做一次性和异步幂等,保证exactly once。那样就应该使用常见的MQ,比如RabbitMQ, Kafka....

2.1.3 容忍错误

比如日志收集。这种允许一定程度的数据丢失,这种其实也不适合Redis,而且现成的方案有很多,比如fluentd,logstash……

2.2 Redis做消息队列的方案

一般来说有四种方式

或者可以考虑基于Redis作者写的disque来做开发?

2.4 本文采取的方案

本文采用Redis的List作为队列可以用来在不同程序之间交换消息。生成者使用LPUSH或者RPUSH将一个消息放入队列。消费者使用RPOP或者LPOP命令取出队列中等待时间最长的消息。List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

但是这样有两个问题:

因此需要

2.4.1 RPOPLPUSH

RPOPLPUSH好处在于:它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中。如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用LREM命令将这个消息从备份列表删除。

最后,还可以添加一个客户端专门用于监视备份表,它自动地将超过一定处理时限的消息重新放入队列中去(负责处理该消息的客户端可能已经崩溃),这样就不会丢失任何消息了。

需要注意的问题 :RPOPLPUSH重新入队,即把备份列表右侧元素(表尾)重新入队,可能会出现消息被重复消费的情况。因此消费操作要实现幂等性,即保证重复消费结果一致.

2.4.2 BLPOP和BRPOP

好处在于 :阻塞读在队列没有数据的时候进入休眠状态,一旦数据到来则立刻醒过来,消息延迟几乎为零。

需要注意的问题 :空闲连接的问题。如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。

0x03 生产者LUA

他的数据是由LUA产生的,由Openresty运行。

具体代码摘要如下:

local REDIS = require "redis_iresty"

local REDIS_STORE = REDIS:new(CONF)

REDIS_STORE:lpush(LOG_LIST, log)

0x04 消费者 JAVA

因为生产者是 LPUSH,所以消费者使用 RPOPLPUSH。

4.1 数据变量

因为RPOPLPUSH不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,所以mKey是消息列表,mKeyRollback是备份列表。从Redis读出消息后临时存储在mActionList。

protected ListmActionList = new CopyOnWriteArrayList();

@Value("${key.list}")
private String mKey;	
	
@Value("${key.rollback.list}")
private String mKeyRollback;	

4.2 消费函数

consume是消费函数。当出现异常时候,会从备份列表中把消息再写回到消息队列。

public boolean consume() {
      rollbackLastLaunch(); //上次同步失败的,这次先弄回去
      
      while(true) {  
         
         try {
            if (schedulejob) {
               timerecord = System.currentTimeMillis();
               schedulejob = false;
            }           
            
           // 从消息队列取出消息,同时Redis操作会自动把取出的消息放入备份队列。
            String action = mRedisStore.listRightPopAndLeftPush(mKey, mKeyRollback, mWaitTimeLimit, TimeUnit.SECONDS);
            if(action != null) {
               mActionList.add(action);
            }
            
            currentTimeStamp = System.currentTimeMillis();
            if (mActionList.size() >= mBatchSize || 
                  (currentTimeStamp - timerecord >= mTimeElapsedLimit && mActionList.size() > 0)) {
               schedulejob = true;       
               boolean res = sync2MySql();
               if (res == true) {
                  clearRollback(); //清除备份列表
               } else {                 
                  rollbackLastLaunch(); //rollback();
               }
               mActionList.clear();
            }
         } catch (Exception e) {
             //发生了网络异常,需要把processing中的id再放回到waiting queue中
             //如果redis, mysql异常,都会在这里被catch
             rollbackLastLaunch(); 
         } finally {             
         }
      }
   }
}

具体Redis操作是StringRedisTemplate.opsForList().rightPopAndLeftPush函数。

public String listRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) {
   getTemplate().setDefaultSerializer(new StringRedisSerializer());
       return getTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit);
}

使用这个是因为它支持配置超时时间。

V rightPopAndLeftPush(K var1, K var2, long var3, TimeUnit var5);

4.3 删除备份消息

clearRollback函数是当消息被成功处理之后,从备份队列中删除备份消息。

protected void clearRollback() {
   Long count = mRedisStore.getListSize(mKeyRollback);
   while(count > 0 ) {
      mRedisStore.listRightPop(mKeyRollback);
      count--;
   }     
}

4.4 处理异常

当出现问题时候,会调用rollbackLastLaunch函数,从备份列表中把消息再写回到消息队列。

因为我们需要在一个Redis操作中执行lpop和rpush两个操作,必须把这两个操作构建成一个原子序列,所以这里涉及到了Lua脚本的使用。通过内嵌对 Lua 环境的支持, Redis 解决了长久以来不能高效地处理 CAS (check-and-set)命令的缺点, 并且可以通过组合使用多个命令, 轻松实现以前很难实现或者不能高效实现的模式。

void rollbackLastLaunch() {
   try {
      Long count = mRedisStore.getListSize(mKeyRollback);          
      Long dbsize = 0l;
      while(count > 0 ) {
         Listkeys = new ArrayList();
         keys.add(mKeyRollback);
         keys.add(mKey);
         
         DefaultRedisScriptscript = new DefaultRedisScript();
         script.setScriptText("local action = redis.call('lpop', KEYS[1]); local result = redis.call('rpush', KEYS[2], action); return result;");
         script.setResultType(Long.class);
         
         dbsize += mRedisStore.executeScript(script, keys, null);
         count--;
      }           
   } catch (Exception e) {    
   }  
}

0x05 参考

你对Redis的使用靠谱吗?

Lua 脚本

Redis实现消息队列的方案

Redis 怎么做消息队列?

Redis 阻塞、安全队列 BLPOP / BRPOP / LPUSH

标签:队列,备份,Redis,列表,消息,2.1,点滴
来源: https://blog.51cto.com/u_15179348/2734174