数据库
首页 > 数据库> > 深入分析redis之rax底层原理,前缀树?

深入分析redis之rax底层原理,前缀树?

作者:互联网

文章目录


前言

本文参考源码版本为 redis6.2

前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串。

由于树中每个节点只存储字符串中的一个字符,故而有时会造成空间的浪费。Rax 的出现就是为了解决这一问题。Redis 对于 Rax 的解释为 A radix tree implement,基数树的一种实现。Rax中不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是key-value。


一、Radix Tree 是什么?

Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。

前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。

不同的是,rax 在前缀树上做了一些优化,来,继续往下看~

二、Radix Tree 数据结构

1. 结构

raxNode定义:


typedef struct raxNode {
    uint32_t iskey:1;     //节点是否包含key
    uint32_t isnull:1;    //节点的值是否为NULL
    uint32_t iscompr:1;   //节点是否被压缩
    uint32_t size:29;     //节点大小
    unsigned char data[]; //节点的实际存储数据
} raxNode;

该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。

这 4 个元数据就对应了压缩节点非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit

另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。我们知道,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:

在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:

而这两个特点给 Radix Tree 实际保存数据时的结构,带来了两个方面的变化。

如果你觉得晦涩不易懂,坚持,继续往下看~

2. 非压缩节点

这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符

同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。

结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode 的指针;

需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

3. 压缩节点

这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串

和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。

注意,压缩节点仅有一个子节点

[header iscompr=1][abc][c-ptr](value-ptr?)

4. 图解Radix Tree

假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。

              (f) ""
                \
                (o) "f"
                  \
                  (o) "fo"
                    \
                  [t   b] "foo"
                  /     \
         "foot" (e)     (a) "foob"
                /         \
      "foote" (r)         (r) "fooba"
              /             \
    "footer" []             [] "foobar"

然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:

                  ["foo"] ""
                     |
                  [t   b] "foo"
                  /     \
        "foot" ("er")    ("ar") "foob"
                 /          \
       "footer" []          [] "foobar"

这便是Radix Tree的模型图了。

不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:

                    (f) ""
                    /
                 (i o) "f"
                 /   \
    "fi"  ("rst")  (o) "fo"
              /        \
    "first" []       [t   b] "foo"
                     /     \
           "foot" ("er")    ("ar") "foob"
                    /          \
          "footer" []          [] "foobar"

三、基本操作

1. 查询

rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:

/* Find a key in the rax, returns raxNotFound special void pointer value
 * if the item was not found, otherwise the value associated with the
 * item is returned. */
void *raxFind(rax *rax, unsigned char *s, size_t len) {
    raxNode *h;

    int splitpos = 0;
    size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
    if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
        return raxNotFound;
    return raxGetData(h);
}

其中,该方法的作用有是,在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。

可以看到,具体的查询逻辑交给了 raxLowWalk 去完成,该方法作为底层方法,在增删查改操作中均有调用,接口定义为:

static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) 

其中,入参定义为:

该函数返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素),具体实现如下:

    raxNode *h = rax->head; // 从跟节点开始匹配
    raxNode **parentlink = &rax->head;

    size_t i = 0; /* Position in the string. */
    size_t j = 0; /* Position in the node children (or bytes if compressed).*/
    while(h->size && i < len) {
        unsigned char *v = h->data;
          
        if (h->iscompr) {
            // 压缩节点,挨个比对,相同则继续往下找;反之,则退出,说明匹配到这就开始不同或者已经到末尾了
            for (j = 0; j < h->size && i < len; j++, i++) {
                if (v[j] != s[i]) break;
            }
            if (j != h->size) break;
        } else { // 非压缩节点,只要找到一个相同的就可以继续往下,在子节点中查找了
            /* Even when h->size is large, linear scan provides good
             * performances compared to other approaches that are in theory
             * more sounding, like performing a binary search. */ 
            for (j = 0; j < h->size; j++) {
                if (v[j] == s[i]) break;
            }
            if (j == h->size) break;
            // 可以匹配到当前字符,继续往下匹配下一个字符,然后会移动到子节点进行匹配
            i++;
        }
        // 记录路径信息
        if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
        // 找到第一个子节点的地址
        raxNode **children = raxNodeFirstChildPtr(h);
        if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
        // 将当前节点移动到其第j个子节点
        memcpy(&h,children+j,sizeof(h));
        parentlink = children+j;
        j = 0; /* If the new node is non compressed and we do not
                  iterate again (since i == len) set the split
                  position to 0 to signal this node represents
                  the searched key. */
    }
    if (stopnode) *stopnode = h;
    if (plink) *plink = parentlink;
    if (splitpos && h->iscompr) *splitpos = j;
    return i;

实现逻辑:

2. 插入

插入元素比较复杂,源码中也花了大量注释说明。

向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。

插入操作的真正实现函数 raxGenericInsert,方法定义如下:

int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)

第一步,当完全匹配并且目标位置的节点不需要切割

    size_t i;
    int j = 0; // 切割位置,当定位到具体的压缩节点时,j用于定位该压缩节点待插入的位置
    raxNode *h, **parentlink;
    
    // 在插入元素前,会通过 raxLowWalk 判断 key 是否存在或者找到待插入位置。
    i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);

     // 1. i == len(代表已经匹配到了整个字符串)
     // 2. 如果不是压缩节点 或者 是压缩节点但该压缩节点不需要切割(j==0)
     // 注:对于压缩节点node,如果要在node中插入新元素 或者 该node中代表了这次要插入元素的key, 那么将会重新分配该node, 确保空间足够
    if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
        /* Make space for the value pointer if needed. */
        if (!h->iskey || (h->isnull && overwrite)) {
            h = raxReallocForData(h,data);
            if (h) memcpy(parentlink,&h,sizeof(h));
        }
        if (h == NULL) {
            errno = ENOMEM;
            return 0;
        }
        
        // 如果key存在,根据方法入参进行处理(获取旧数据或者重写)
        if (h->iskey) {
            if (old) *old = raxGetData(h);
            if (overwrite) raxSetData(h,data);
            errno = 0;
            return 0; /* Element already exists. */
        }

        // 给新增节点赋值
        raxSetData(h,data);
        rax->numele++;
        return 1; /* Element inserted. */
    }

第二步:对于需要切割的压缩节点,有几种情况:

假设我们当前所在节点 h 包含字符串 “ANNIBALE” ,h 有且仅有一个子节点(“SC”),并且该节点没有子节点,用 ‘0’ 表示,结构如下:

"ANNIBALE" -> "SCO" -> []

案例1: 插入 “ANNIENTARE”

  
               |B| -> "ALE" -> "SCO" -> []
     "ANNI" -> |-|
               |E| -> (... continue algo ...) "NTARE" -> []

需要将 “ANNIBALE” 拆分成3部分:压缩节点,非压缩节点,压缩节点

案例2: 插入 “ANNIBALI”

                  |E| -> "SCO" -> []
     "ANNIBAL" -> |-|
                  |I| -> (... continue algo ...) []

需要将 “ANNIBALE” 拆成2部分:压缩节点,非压缩节点。

案例3: 插入 “AGO”(和案例1类似,不过,需要在原节点中设置 iscompr = 0)

            |N| -> "NIBALE" -> "SCO" -> []
     |A| -> |-|
            |G| -> (... continue algo ...) |O| -> []

需要将 “ANNIBALE” 节点拆分为3部分:非压缩节点,非压缩节点,压缩节点。

案例4: 插入 “CIAO”

     |A| -> "NNIBALE" -> "SCO" -> []
     |-|
     |C| -> (... continue algo ...) "IAO" -> []

此时需要将 “ANNIBALE” 节点拆分为2部分:第一部分是非压缩节点,第二部分为压缩节点。

案例5: 插入 “ANNI”

"ANNI" -> "BALE" -> "SCO" -> []

将 “ANNIBALE” 拆分成2个压缩节点。


综上,对于压缩节点拆分过程,总体而言分为2类:

源码实现上,是用了两种算法覆盖以上5种案例:

算法1: 覆盖案例 1~4,出现字符不匹配的位置,是在某个压缩节点上。

SPLITPOS 代表压缩节点切割位置,例如 压缩节点包含 “ANNIBALE”,将插入 “ANNIENTARE”,那么 SPLITPOS = 4,也就是 未匹配元素的下标。

源码实现如下:

    /* ------------------------- ALGORITHM 1 --------------------------- */
    if (h->iscompr && i != len) {
        /* 1: Save next pointer. */
        // 用 NEXT 保存 h的子节点指针
        raxNode **childfield = raxNodeLastChildPtr(h);
        raxNode *next;
        memcpy(&next,childfield,sizeof(next));

        if (h->iskey) {
            debugf("key value is %p\n", raxGetData(h));
        }

        // 设置新增节点长度,这里 j 是压缩节点切割位置,因此压缩节点会被切割成两部分,长度分别是 j 和 postfixlen
        size_t trimmedlen = j;
        size_t postfixlen = h->size - j - 1;
        // 仔细想想?当压缩节点被切割成两部分,也就是说该压缩节点部分与 s 没有相同部分;如果该压缩节点本身 iskey, 那么切割后 split_node 自然也是 iskey (注意:iskey表示从根节点到压缩节点的父节点组成的key)
        int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;
        size_t nodesize;

        /* 2: 创建 split node 节点,并分配空间 */
        // 可以看到,这里会切割成三部分,当切割位置在第一个字符时(j==0),即trimmedlen=0 表示没有公共前缀,此时会切割成两部分
        raxNode *splitnode = raxNewNode(1, split_node_is_key);  // 切割位置
        raxNode *trimmed = NULL; // 已匹配的前缀部分
        raxNode *postfix = NULL; // 切割位置之后,未匹配的部分

        // 如果匹配前缀大于0,分配新空间
        if (trimmedlen) {
            nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
                       sizeof(raxNode*);
            if (h->iskey && !h->isnull) nodesize += sizeof(void*);
            trimmed = rax_malloc(nodesize);
        }

        // 未匹配部分长度大于0,分配空间
        if (postfixlen) {
            nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
                       sizeof(raxNode*);
            postfix = rax_malloc(nodesize);
        }

        /* OOM? Abort now that the tree is untouched. */
        // 分配失败
        if (splitnode == NULL ||
            (trimmedlen && trimmed == NULL) ||
            (postfixlen && postfix == NULL))
        {
            rax_free(splitnode);
            rax_free(trimmed);
            rax_free(postfix);
            errno = ENOMEM;
            return 0;
        }
        splitnode->data[0] = h->data[j];
        
        // j == 0,意味着压缩节点只需拆分成两部分,split node 与 后缀部分已经拆分了,因此 parentlink 将指向 split node
        if (j == 0) {
            // 3a. 仔细想想?原压缩节点 iskey, 到这一步,split node 同样 iskey,因此将数据拷贝过去
            if (h->iskey) {
                void *ndata = raxGetData(h);
                raxSetData(splitnode,ndata);
            }
            memcpy(parentlink,&splitnode,sizeof(splitnode));
        } else {
            // 3b. 切割公共前缀,也就意味着,压缩节点将被分成三部分
            trimmed->size = j;
            memcpy(trimmed->data,h->data,j);
            trimmed->iscompr = j > 1 ? 1 : 0;
            trimmed->iskey = h->iskey;
            trimmed->isnull = h->isnull;
            if (h->iskey && !h->isnull) {
                void *ndata = raxGetData(h);
                raxSetData(trimmed,ndata);
            }
            raxNode **cp = raxNodeLastChildPtr(trimmed);
            memcpy(cp,&splitnode,sizeof(splitnode));
            memcpy(parentlink,&trimmed,sizeof(trimmed));
            parentlink = cp; /* Set parentlink to splitnode parent. */
            rax->numnodes++;
        }

        // 4. 创建 postfix 节点,即原压缩节点未匹配的后缀部分
        if (postfixlen) {
            /* 4a: create a postfix node. */
            postfix->iskey = 0;
            postfix->isnull = 0;
            postfix->size = postfixlen;
            postfix->iscompr = postfixlen > 1;
            memcpy(postfix->data,h->data+j+1,postfixlen);
            raxNode **cp = raxNodeLastChildPtr(postfix);
            memcpy(cp,&next,sizeof(next));
            rax->numnodes++;
        } else {
            /* 4b: just use next as postfix node. */
            postfix = next;
        }

        // 5. 设置 postfix 节点作为 splitnode 的第一个子节点
        raxNode **splitchild = raxNodeLastChildPtr(splitnode);
        memcpy(splitchild,&postfix,sizeof(postfix));

         // 6. 继续插入。以上只处理了原压缩节点,对于新插入key, 未匹配部分还未插入,源码稍后介绍;这里插入之后,splitnode将会多一个子节点
        rax_free(h);
        h = splitnode;
    }

算法2:应对场景5,即在压缩节点某个位置处,完成了字符串 s 的匹配,此时,只需将压缩节点拆分成两部分即可。

例如,rax树中存在节点 “ANNIBALE”,将插入 “ANNI”,此时 SPLITPOS = 4。

源码实现如下:

else if (h->iscompr && i == len) {
    /* ------------------------- ALGORITHM 2 --------------------------- */
        // 切割并分配 postfix 和 trimmed 节点
        size_t postfixlen = h->size - j;
        size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
                          sizeof(raxNode*);
        if (data != NULL) nodesize += sizeof(void*);
        raxNode *postfix = rax_malloc(nodesize);

        nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);
        if (h->iskey && !h->isnull) nodesize += sizeof(void*);
        raxNode *trimmed = rax_malloc(nodesize);

        // 分配失败
        if (postfix == NULL || trimmed == NULL) {
            rax_free(postfix);
            rax_free(trimmed);
            errno = ENOMEM;
            return 0;
        }

        /* 1: Save next pointer. */
        raxNode **childfield = raxNodeLastChildPtr(h);
        raxNode *next;
        memcpy(&next,childfield,sizeof(next));

        /* 2: Create the postfix node. */
        postfix->size = postfixlen;
        postfix->iscompr = postfixlen > 1;
        postfix->iskey = 1;
        postfix->isnull = 0;
        memcpy(postfix->data,h->data+j,postfixlen);
        raxSetData(postfix,data);
        raxNode **cp = raxNodeLastChildPtr(postfix);
        memcpy(cp,&next,sizeof(next));
        rax->numnodes++;

        /* 3: Trim the compressed node. */
        trimmed->size = j;
        trimmed->iscompr = j > 1;
        trimmed->iskey = 0;
        trimmed->isnull = 0;
        memcpy(trimmed->data,h->data,j);
        memcpy(parentlink,&trimmed,sizeof(trimmed));
        if (h->iskey) {
            void *aux = raxGetData(h);
            raxSetData(trimmed,aux);
        }

        /* Fix the trimmed node child pointer to point to
         * the postfix node. */
        cp = raxNodeLastChildPtr(trimmed);
        memcpy(cp,&postfix,sizeof(postfix));

        /* Finish! We don't need to continue with the insertion
         * algorithm for ALGO 2. The key is already inserted. */
        rax->numele++;
        rax_free(h);
        return 1; /* Key inserted. */
    }

到这里,还有一点工作需要完成, 那就是新key未匹配的部分,还需要插入rax树,源码实现如下:

    while(i < len) {
        raxNode *child;

        // 这里就是判断要处理成压缩节点还是非压缩节点
        if (h->size == 0 && len-i > 1) {
            debugf("Inserting compressed node\n");
            size_t comprsize = len-i;
            if (comprsize > RAX_NODE_MAX_SIZE)
                comprsize = RAX_NODE_MAX_SIZE;
            raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);
            if (newh == NULL) goto oom;
            h = newh;
            memcpy(parentlink,&h,sizeof(h));
            parentlink = raxNodeLastChildPtr(h);
            i += comprsize;
        } else {
            debugf("Inserting normal node\n");
            raxNode **new_parentlink;
            raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);
            if (newh == NULL) goto oom;
            h = newh;
            memcpy(parentlink,&h,sizeof(h));
            parentlink = new_parentlink;
            i++;
        }
        rax->numnodes++;
        h = child;
    }
    raxNode *newh = raxReallocForData(h,data);
    if (newh == NULL) goto oom;
    h = newh;
    if (!h->iskey) rax->numele++;
    raxSetData(h,data);
    memcpy(parentlink,&h,sizeof(h));
    return 1; /* Element inserted. */

以上便是插入新key的全部过程。可以看出,实现上根据列举的5种案例进行处理。

3. 删除

rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:

// 在 rax 中删除长度为 len 的 s
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 释放 rax
void raxFree(rax *rax);
// 释放 rax, 释放每个 key 时,都会调用 free_callback 方法
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));

其中,rax 的释放操作,采用的是深度优先算法

下面重点介绍 raxRemove 方法:

第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:

    raxNode *h;
    raxStack ts;
    
    raxStackInit(&ts); // 保存路径
    int splitpos = 0;
    size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);
    if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出
        raxStackFree(&ts);
        return 0;
    }

第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。

案例1:
假设 rax 树存储了 “FOO” = 1 和 “FOOBAR” = 2,结构如下:

"FOO" -> "BAR" -> [] (2)
          (1)

删除 “FOO” ,压缩后,结构如下:

"FOOBAR" -> [] (2)

案例2:
假设 rax 树存储了 “FOOBAR” = 1 和 “FOOTER” = 2

          |B| -> "AR" -> [] (1)
 "FOO" -> |-|
          |T| -> "ER" -> [] (2)

删除 “FOOTER” 后,结构如下:

"FOO" -> |B| -> "AR" -> [] (1)

可以压缩成的结果如下:

"FOOBAR" -> [] (1)

删除操作具体可以分为2个阶段,删除阶段以及压缩阶段

1)删除阶段:

利用查找key时记录的匹配路径,依次向上直到无法删除为止。

    if (h->size == 0) {
        debugf("Key deleted in node without children. Cleanup needed.\n");
        raxNode *child = NULL;
        while(h != rax->head) {
            child = h;
            debugf("Freeing child %p [%.*s] key:%d\n", (void*)child,
                (int)child->size, (char*)child->data, child->iskey);
            rax_free(child);
            rax->numnodes--;
            h = raxStackPop(&ts);
            // 如果节点为 key,或者子节点 size != 1,则无法删除
            if (h->iskey || (!h->iscompr && h->size != 1)) break;
        }
        if (child) {
            raxNode *new = raxRemoveChild(h,child);
            if (new != h) {
                raxNode *parent = raxStackPeek(&ts);
                raxNode **parentlink;
                if (parent == NULL) {
                    parentlink = &rax->head;
                } else {
                    parentlink = raxFindParentLink(parent,h);
                }
                memcpy(parentlink,&new,sizeof(new));
            }

            // 删除后尝试进行压缩
            if (new->size == 1 && new->iskey == 0) {
                trycompress = 1;
                h = new;
            }
        }
    } else if (h->size == 1) {
        // 可以尝试进行压缩
        trycompress = 1;
    }

2)压缩阶段: 压缩过程可以细化为2步。

① 找到可以进行压缩的第一个元素,之后将所有可进行压缩的节点进行压缩。由于raxRowWalk函数已经记录了查找key的过程,压缩时只需从记录栈中不断弹出元素,即可找到可进行压缩的第一个元素,过程如下:

        raxNode *parent;
        while(1) {
            parent = raxStackPop(&ts);
            if (!parent || parent->iskey ||
                (!parent->iscompr && parent->size != 1)) break;
            h = parent;
        }
        // 可以进行压缩的第一个节点
        raxNode *start = h; 

② 找到第一个可压缩节点后,进行数据压缩。由于可压缩的节点都只有一个子节点,压缩过程只需要读取每个节点的内容,创建新的节点,并填充新节点的内容即可:

       /* Scan chain of nodes we can compress. */
        size_t comprsize = h->size;
        int nodes = 1;
        while(h->size != 0) {
            raxNode **cp = raxNodeLastChildPtr(h);
            memcpy(&h,cp,sizeof(h));
            if (h->iskey || (!h->iscompr && h->size != 1)) break;
            /* Stop here if going to the next node would result into
             * a compressed node larger than h->size can hold. */
            if (comprsize + h->size > RAX_NODE_MAX_SIZE) break;
            nodes++;
            comprsize += h->size;
        }
        if (nodes > 1) {
            /* If we can compress, create the new node and populate it. */
            size_t nodesize =
                sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*);
            raxNode *new = rax_malloc(nodesize);
            /* An out of memory here just means we cannot optimize this
             * node, but the tree is left in a consistent state. */
            if (new == NULL) {
                raxStackFree(&ts);
                return 1;
            }
            new->iskey = 0;
            new->isnull = 0;
            new->iscompr = 1;
            new->size = comprsize;
            rax->numnodes++;

            /* Scan again, this time to populate the new node content and
             * to fix the new node child pointer. At the same time we free
             * all the nodes that we'll no longer use. */
            comprsize = 0;
            h = start;
            while(h->size != 0) {
                memcpy(new->data+comprsize,h->data,h->size);
                comprsize += h->size;
                raxNode **cp = raxNodeLastChildPtr(h);
                raxNode *tofree = h;
                memcpy(&h,cp,sizeof(h));
                rax_free(tofree); rax->numnodes--;
                if (h->iskey || (!h->iscompr && h->size != 1)) break;
            }

            /* Now 'h' points to the first node that we still need to use,
             * so our new node child pointer will point to it. */
            raxNode **cp = raxNodeLastChildPtr(new);
            memcpy(cp,&h,sizeof(h));

            /* Fix parent link. */
            if (parent) {
                raxNode **parentlink = raxFindParentLink(parent,start);
                memcpy(parentlink,&new,sizeof(new));
            } else {
                rax->head = new;
            }
        }

4. 遍历

为了能够遍历 rax 中所有的 key, Redis 提供了迭代器操作。Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:

void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);

1)raxStart 用于初始化 raxIterator 结构:

/* Initialize a Rax iterator. This call should be performed a single time
 * to initialize the iterator, and must be followed by a raxSeek() call,
 * otherwise the raxPrev()/raxNext() functions will just return EOF. */
void raxStart(raxIterator *it, rax *rt) {
    it->flags = RAX_ITER_EOF; /* No crash if the iterator is not seeked. */
    it->rt = rt;
    it->key_len = 0;
    it->key = it->key_static_string;
    it->key_max = RAX_ITER_STATIC_LEN;
    it->data = NULL;
    it->node_cb = NULL;
    raxStackInit(&it->stack);
}

2)raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。

int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);

查找末尾元素可以直接在Rax中找到最右侧的叶子节点,查找首个元素被转换为查找大于等于空的操作。

处理大于、小于、等于等操作主要分为以下几步:

3)raxNext & raxPrev,主要用于迭代上一个或者下一个key

/* Go to the next element in the scope of the iterator 'it'.
 * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
 * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxNext(raxIterator *it) {
    if (!raxIteratorNextStep(it,0)) {
        errno = ENOMEM;
        return 0;
    }
    if (it->flags & RAX_ITER_EOF) {
        errno = 0;
        return 0;
    }
    return 1;
}

/* Go to the previous element in the scope of the iterator 'it'.
 * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
 * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxPrev(raxIterator *it) {
    if (!raxIteratorPrevStep(it,0)) {
        errno = ENOMEM;
        return 0;
    }
    if (it->flags & RAX_ITER_EOF) {
        errno = 0;
        return 0;
    }
    return 1;
}

4)raxStop & raxEOF:
raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:

/* Free the iterator. */
void raxStop(raxIterator *it) {
    if (it->key != it->key_static_string) rax_free(it->key);
    raxStackFree(&it->stack);
}

/* Return if the iterator is in an EOF state. This happens when raxSeek()
 * failed to seek an appropriate element, so that raxNext() or raxPrev()
 * will return zero, or when an EOF condition was reached while iterating
 * with raxNext() and raxPrev(). */
int raxEOF(raxIterator *it) {
    return it->flags & RAX_ITER_EOF;
}

总结

首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别

来,再回顾下 压缩节点 和 非压缩节点 的区别:
它们的相同之处在于:

不同之处在于:




相关参考:

redis readme.md
Redis · 引擎特性 · radix tree 源码解析

标签:前缀,压缩,redis,key,raxNode,节点,rax,size
来源: https://blog.csdn.net/ldw201510803006/article/details/122445595