数据库
首页 > 数据库> > Redis 6.0 源码阅读笔记(7) -- ZSet 数据类型源码分析

Redis 6.0 源码阅读笔记(7) -- ZSet 数据类型源码分析

作者:互联网

1. 存储结构
在 有序集合对象 ZSet 的介绍中已经提到 ZSet 集合的底层存储结构主要有两种,其结构示例如下:

OBJ_ENCODING_ZIPLIST
当 ziplist 作为 zset 的底层存储结构时,每个集合元素使用两个紧挨在一起的压缩列表节点来保存,第一个节点保存元素值,第二个元素保存元素的分值,而且分值小的靠近表头,大的靠近表尾

OBJ_ENCODING_SKIPLIST
底层实现是跳跃表结合字典。每个跳跃表节点都保存一个集合元素,并按分值从小到大排列,每个节点的 ele 属性保存了元素的值,score属性保存分值;字典的每个键值对保存一个集合元素,元素值包装为字典的键,元素分值保存为字典的值。注意集合的元素成员和分值是共享的,跳跃表和字典通过指针指向同一地址,不会浪费内存

2. 源码解析
2.1 存储过程分析
对有序集合 ZSet 的操作命令的处理函数在t_zset.c 文件中,向 ZSet 添加元素调用的函数为t_zset.c#zaddCommand()。从源码可以看到,这个函数其实主要调用了t_zset.c#zaddGenericCommand(),zaddGenericCommand() 的主要逻辑如下:

解析参数得到每个元素及其对应的分值
查找key对应的 ZSet 集合对象是否存在,不存在则创建。此处创建新的 ZSet 集合对象有两个配置需要注意,如果 zset_max_ziplist_entries(默认 128) 为 0 或者要存储的元素长度大于 zset_max_ziplist_value(默认 64),则创建底层结构为 OBJ_ENCODING_SKIPLIST 的有序集合对象,否则创建底层结构为 OBJ_ENCODING_ZIPLIST 的有序集合对象
对于所有的元素,循环调用 t_zset.c#zsetAdd()函数将其添加到有序集合中

void zaddCommand(client *c) {
 zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
 static char *nanerr = "resulting score is not a number (NaN)";
 robj *key = c->argv[1];
 robj *zobj;
 sds ele;
 double score = 0, *scores = NULL;
 int j, elements;
 int scoreidx = 0;
 /* The following vars are used in order to track what the command actually
  * did during the execution, to reply to the client and to trigger the
  * notification of keyspace change. */
 int added = 0;      /* Number of new elements added. */
 int updated = 0;    /* Number of elements with updated score. */
 int processed = 0;  /* Number of elements processed, may remain zero with
                        options like XX. */

 /* Parse options. At the end 'scoreidx' is set to the argument position
  * of the score of the first score-element pair. */
 scoreidx = 2;
 while(scoreidx < c->argc) {
     char *opt = c->argv[scoreidx]->ptr;
     if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
     else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
     else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
     else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
     else break;
     scoreidx++;
 }

 /* Turn options into simple to check vars. */
 int incr = (flags & ZADD_INCR) != 0;
 int nx = (flags & ZADD_NX) != 0;
 int xx = (flags & ZADD_XX) != 0;
 int ch = (flags & ZADD_CH) != 0;

 /* After the options, we expect to have an even number of args, since
  * we expect any number of score-element pairs. */
 elements = c->argc-scoreidx;
 if (elements % 2 || !elements) {
     addReply(c,shared.syntaxerr);
     return;
 }
 elements /= 2; /* Now this holds the number of score-element pairs. */

 /* Check for incompatible options. */
 if (nx && xx) {
     addReplyError(c,
         "XX and NX options at the same time are not compatible");
     return;
 }

 if (incr && elements > 1) {
     addReplyError(c,
         "INCR option supports a single increment-element pair");
     return;
 }

 /* Start parsing all the scores, we need to emit any syntax error
  * before executing additions to the sorted set, as the command should
  * either execute fully or nothing at all. */
 scores = zmalloc(sizeof(double)*elements);
 for (j = 0; j < elements; j++) {
     if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
         != C_OK) goto cleanup;
 }

 /* Lookup the key and create the sorted set if does not exist. */
 zobj = lookupKeyWrite(c->db,key);
 if (zobj == NULL) {
     if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
     if (server.zset_max_ziplist_entries == 0 ||
         server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
     {
         zobj = createZsetObject();
     } else {
         zobj = createZsetZiplistObject();
     }
     dbAdd(c->db,key,zobj);
 } else {
     if (zobj->type != OBJ_ZSET) {
         addReply(c,shared.wrongtypeerr);
         goto cleanup;
     }
 }

 for (j = 0; j < elements; j++) {
     double newscore;
     score = scores[j];
     int retflags = flags;

     ele = c->argv[scoreidx+1+j*2]->ptr;
     int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
     if (retval == 0) {
         addReplyError(c,nanerr);
         goto cleanup;
     }
     if (retflags & ZADD_ADDED) added++;
     if (retflags & ZADD_UPDATED) updated++;
     if (!(retflags & ZADD_NOP)) processed++;
     score = newscore;
 }
 server.dirty += (added+updated);

reply_to_client:
 if (incr) { /* ZINCRBY or INCR option. */
     if (processed)
         addReplyDouble(c,score);
     else
         addReplyNull(c);
 } else { /* ZADD. */
     addReplyLongLong(c,ch ? added+updated : added);
 }

cleanup:
 zfree(scores);
 if (added || updated) {
     signalModifiedKey(c,c->db,key);
     notifyKeyspaceEvent(NOTIFY_ZSET,
         incr ? "zincr" : "zadd", key, c->db->id);
 }
}

ZSet 集合对象的创建函数在 object.c中,源码如下,可以看到其两种存储结构是有明显区别的

robj *createZsetObject(void) {
 zset *zs = zmalloc(sizeof(*zs));
 robj *o;

 zs->dict = dictCreate(&zsetDictType,NULL);
 zs->zsl = zslCreate();
 o = createObject(OBJ_ZSET,zs);
 o->encoding = OBJ_ENCODING_SKIPLIST;
 return o;
}

robj *createZsetZiplistObject(void) {
 unsigned char *zl = ziplistNew();
 robj *o = createObject(OBJ_ZSET,zl);
 o->encoding = OBJ_ENCODING_ZIPLIST;
 return o;
}

t_zset.c#zsetAdd()函数实现较长,但是逻辑比较清晰:

首先判断如果存储结构是 ziplist,在执行添加的过程中需要调用函数 t_zset.c#zzlFind() 区分元素存在和不存在两种情况。另外需注意,函数 t_zset.c#zzlInsert() 将元素插入到 ziplist 中是根据 score 分数有序插入的,分数小的在 ziplist 头部,分数大的在尾部,元素和分数分两次操作插入到 ziplist 中,并且紧挨在一起
存在的情况下如果元素 score 改变了,则先删除 ziplist 中的元素然后重新添加
不存在的情况下直接向 ziplist 添加元素,插入完成后需要考虑元素的长度是否超出限制或 ziplist 存储的元素个数是否超过最大限制,进而决定是否将底层结构转为 skiplist
如果存储结构是 skiplist,同样需要区分元素存在和不存在两种情况。需要注意,在跳跃表中元素是按照分数有序存储的,这样范围查找的时间复杂度为 O(logN);使用字典则能达到单个元素查找O(1)的时间复杂度,非常高效
存在的情况并且 score 分数发生变化的话,调用 t_zset.c#zslUpdateScore() 函数在跳跃表中查找到目标节点,将其删除并重新插入一个节点,完成后返回新的跳跃表节点用于更新字典中元素的分数
不存在情况下就直接调用t_zset.c#zslInsert() 函数往跳跃表添加节点,完成后同时往字典新增一个节点

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
 /* Turn options into simple to check vars. */
 int incr = (*flags & ZADD_INCR) != 0;
 int nx = (*flags & ZADD_NX) != 0;
 int xx = (*flags & ZADD_XX) != 0;
 *flags = 0; /* We'll return our response flags. */
 double curscore;

 /* NaN as input is an error regardless of all the other parameters. */
 if (isnan(score)) {
     *flags = ZADD_NAN;
     return 0;
 }

 /* Update the sorted set according to its encoding. */
 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
     unsigned char *eptr;

     if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
         /* NX? Return, same element already exists. */
         if (nx) {
             *flags |= ZADD_NOP;
             return 1;
         }

         /* Prepare the score for the increment if needed. */
         if (incr) {
             score += curscore;
             if (isnan(score)) {
                 *flags |= ZADD_NAN;
                 return 0;
             }
             if (newscore) *newscore = score;
         }

         /* Remove and re-insert when score changed. */
         if (score != curscore) {
             zobj->ptr = zzlDelete(zobj->ptr,eptr);
             zobj->ptr = zzlInsert(zobj->ptr,ele,score);
             *flags |= ZADD_UPDATED;
         }
         return 1;
     } else if (!xx) {
         /* Optimize: check if the element is too large or the list
          * becomes too long *before* executing zzlInsert. */
         zobj->ptr = zzlInsert(zobj->ptr,ele,score);
         if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
             sdslen(ele) > server.zset_max_ziplist_value)
             zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
         if (newscore) *newscore = score;
         *flags |= ZADD_ADDED;
         return 1;
     } else {
         *flags |= ZADD_NOP;
         return 1;
     }
 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
     zset *zs = zobj->ptr;
     zskiplistNode *znode;
     dictEntry *de;

     de = dictFind(zs->dict,ele);
     if (de != NULL) {
         /* NX? Return, same element already exists. */
         if (nx) {
             *flags |= ZADD_NOP;
             return 1;
         }
         curscore = *(double*)dictGetVal(de);

         /* Prepare the score for the increment if needed. */
         if (incr) {
             score += curscore;
             if (isnan(score)) {
                 *flags |= ZADD_NAN;
                 return 0;
             }
             if (newscore) *newscore = score;
         }

         /* Remove and re-insert when score changes. */
         if (score != curscore) {
             znode = zslUpdateScore(zs->zsl,curscore,ele,score);
             /* Note that we did not removed the original element from
              * the hash table representing the sorted set, so we just
              * update the score. */
             dictGetVal(de) = &znode->score; /* Update score ptr. */
             *flags |= ZADD_UPDATED;
         }
         return 1;
     } else if (!xx) {
         ele = sdsdup(ele);
         znode = zslInsert(zs->zsl,score,ele);
         serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
         *flags |= ZADD_ADDED;
         if (newscore) *newscore = score;
         return 1;
     } else {
         *flags |= ZADD_NOP;
         return 1;
     }
 } else {
     serverPanic("Unknown sorted set encoding");
 }
 return 0; /* Never reached. */
}

2.2 数据存储结构 zskiplist
2.2.1 存储结构定义
Redis 中的有序集合 ZSet 有属于它自己的数据结构,其定义在server.h文件中,源码如下:

可以看到 OBJ_ENCODING_SKIPLIST 编码的 ZSet 以 zset 结构体封装数据,其内部包含了两个关键的数据结构

dict :字典,键为成员,值为分值,可用于支持 O(1) 复杂度的按元素取分值操作
zsl: 跳跃表,按分值排序元素,用于支持平均复杂度为 O(log N) 的按分值定位元素的操作以及范围操作

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

数据结构 zskiplist 的定义如下,可以看到其包含的关键数据如下:

header: 跳跃表头节点
tail: 跳跃表尾节点
length: 即链表包含的节点总数。新创建的 skiplist 包含一个空的头指针,这个头指针不包含在 length 计数中
level: 表中层数最大的节点的层数

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

数据结构 zskiplistNode 是zskiplist 的单个节点的定义,其包含的关键属性如下:

  1. ele: 动态字符串对象,用于存储元素
  2. score: 元素的分值
  3. backward: 当前节点的上一个节点
  4. level[]: 跳跃表层数数组,保存了每一层指向的下一个节点
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

2.2.2 关键函数

2.2.2.1 跳跃表的构建

t_zset.c#zslInsert() 是跳跃表中非常关键的函数,这里体现了跳跃表的构建过程及其查找策略。可以看到 skiplist 为每个节点随机出一个层数(level),比如一个节点随机出的层数是3,那么就把它链入到从第1层到第3层这三层链表中

首先通过分数 score 在跳跃表中查找到新增的节点应该插入的位置上的节点
调用 zslRandomLevel() 函数随机生成节点层数,使用该层数新建一个跳跃表节点
更新新增的节点应插入位置上的节点前后指针,将新增节点插入到跳跃表中
 

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;

 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
             (x->level[i].forward->score < score ||
                 (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele,ele) < 0)))
     {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since we allow duplicated
  * scores, reinserting the same element should never happen since the
  * caller of zslInsert() should test in the hash table if the element is
  * already inside or not. */
 level = zslRandomLevel();
 if (level > zsl->level) {
     for (i = zsl->level; i < level; i++) {
         rank[i] = 0;
         update[i] = zsl->header;
         update[i]->level[i].span = zsl->length;
     }
     zsl->level = level;
 }
 x = zslCreateNode(level,score,ele);
 for (i = 0; i < level; i++) {
     x->level[i].forward = update[i]->level[i].forward;
     update[i]->level[i].forward = x;

     /* update span covered by update[i] as x is inserted here */
     x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
     update[i]->level[i].span = (rank[0] - rank[i]) + 1;
 }

 /* increment span for untouched levels */
 for (i = level; i < zsl->level; i++) {
     update[i]->level[i].span++;
 }

 x->backward = (update[0] == zsl->header) ? NULL : update[0];
 if (x->level[0].forward)
     x->level[0].forward->backward = x;
 else
     zsl->tail = x;
 zsl->length++;
 return x;
}

int zslRandomLevel(void) {
 int level = 1;
 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
     level += 1;
 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

2.2.2.2 跳跃表的查找

在以上三层链表结构上,如果需要查找15,查找的路径是沿着图中的红色箭头所指向的方向进行的15 首先和 10 比较,比 10 大,继续与后继节点比较
10 的后继节点为 NULL,说明最上层的链表已经结束了,回到第二层链表继续比较。此时 10 的后继节点为 20,15 比 20 小,回到 10 的第一层链表
10 的一层链表后继节点为 15,待查数据找到了
以上过程中沿着最上层链表首先要比较的是10,发现待查数据比10大,接下来就只需要到 10 的后面去继续查找,从而跳过了 10 前面的所有节点。这样当链表足够长的时候,这种多层链表的查找方式就能跳过很多下层节点,大大加快查找的速度
 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:score,ZSet,level,数据类型,zset,源码,flags,ZADD,节点
来源: https://blog.csdn.net/alpha_love/article/details/116568705