数据库
首页 > 数据库> > redis6.0.5之HyperLogLog阅读笔记3-基数估算实现之辅助函数

redis6.0.5之HyperLogLog阅读笔记3-基数估算实现之辅助函数

作者:互联网

/* ========================= HyperLogLog algorithm  ========================= */

/* Our hash function is MurmurHash2, 64 bit version.
 * It was modified for Redis in order to provide the same result in
 * big and little endian archs (endian neutral). */
我们的hash函数用的是64位版本的MurmurHash2。
Redis使用时为了在大端和小端的结构中提供更好的结果修改了该算法

uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) {
    const uint64_t m = 0xc6a4a7935bd1e995;  初始化的值
    const int r = 47;   常用轮数
    uint64_t h = seed ^ (len * m);  种子 异或 ( 长度 乘以 初始化值 ) 
    const uint8_t *data = (const uint8_t *)key;  获取数据初始位置
    const uint8_t *end = data + (len-(len&7)); 获取模8结束位置

    while(data != end) {  不到结尾
        uint64_t k;

#if (BYTE_ORDER == LITTLE_ENDIAN)  小段情况
    #ifdef USE_ALIGNED_ACCESS  字节对齐
        memcpy(&k,data,sizeof(uint64_t));
    #else
        k = *((uint64_t*)data);
    #endif
#else  大端情况
        k = (uint64_t) data[0];
        k |= (uint64_t) data[1] << 8;
        k |= (uint64_t) data[2] << 16;
        k |= (uint64_t) data[3] << 24;
        k |= (uint64_t) data[4] << 32;
        k |= (uint64_t) data[5] << 40;
        k |= (uint64_t) data[6] << 48;
        k |= (uint64_t) data[7] << 56;
#endif

        k *= m;  k 乘以 m
        k ^= k >> r;  k向右移动r位
        k *= m;  k 乘以 m
        h ^= k;  h 异或 k
        h *= m;  h 乘以 m
        data += 8;   下一个8字节数据
    }

    switch(len & 7) {  剩下的不对齐8字节的数据处理
    case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
    case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
    case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
    case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
    case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
    case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
    case 1: h ^= (uint64_t)data[0];
            h *= m; /* fall-thru */
    };

    h ^= h >> r;  h 异或 h向右移动r位 
    h *= m;   h 乘以 m
    h ^= h >> r;  h 异或 h向右移动r位 
    return h;  返回最终h值
}
******************************************************************************************
/* Given a string element to add to the HyperLogLog, returns the length
 * of the pattern 000..1 of the element hash. As a side effect 'regp' is
 * set to the register index this element hashes to. */
给定一个字符串元素,添加到HyperLogLog,返回元素hash值的 按照模式000..1的长度。
作为一个副作用,变量regp被设置为这个元素所在寄存器的hash索引,即用regp标记寄存器
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
    uint64_t hash, bit, index;
    int count;

    /* Count the number of zeroes starting from bit HLL_REGISTERS
     * (that is a power of two corresponding to the first bit we don't use
     * as index). The max run can be 64-P+1 = Q+1 bits.
     对HLL_REGISTERS的开始位置的0进行计数(这是一个2的指数,对应第一个我们不作为索引使用的比特位)
     * Note that the final "1" ending the sequence of zeroes must be
     * included in the count, so if we find "001" the count is 3, and
     * the smallest count possible is no zeroes at all, just a 1 bit
     * at the first position, that is a count of 1.
     *注意到最后的1,结束序列0必须包含在计数中,所以我们找到001的计数是3,
     最小的计数是可能一个前导零也没有,第一个比特位就是1,那么计数就是1
     * This may sound like inefficient, but actually in the average case
     * there are high probabilities to find a 1 after a few iterations. */
     这个听起来不是那么有效,实际上在平均情况下,有很高的概率只需要少量迭代就可以发现一个为1的比特位
     
    hash = MurmurHash64A(ele,elesize,0xadc83b19ULL);
    index = hash & HLL_P_MASK; /* Register index. */ 寄存器的索引  HLL_P_MASK=2^14-1
    hash >>= HLL_P; /* Remove bits used to address the register. */  移出14个用于标记寄存器的比特位
    hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates and count will be <= Q+1. */
    确保循环终止,并且计数小于等于Q+1
    
    bit = 1;
    count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */ 初始化为1,因为我们计数为"00000...1"的模式
    while((hash & bit) == 0) {   这里需要注意hash是小端表示,所以低地址放的是高位的数据,所以得到的模式是000...1的模式
        count++;
        bit <<= 1;
    }
    *regp = (int) index;
    return count;
}
******************************************************************************************
/* ================== Dense representation implementation  ================== */ 密集表示的实现

/* Low level function to set the dense HLL register at 'index' to the
 * specified value if the current value is smaller than 'count'.
低层级的函数,如果当前值比计数值小的情况下,用来设置在索引位置的密集HLL寄存器为特定的值。
 * 'registers' is expected to have room for HLL_REGISTERS plus an
 * additional byte on the right. This requirement is met by sds strings
 * automatically since they are implicitly null terminated.
寄存器期望在右边有一个字节的额外空间,这个需求sds字符串可以自动满足,因为隐式的含有一个null结尾字节
 * The function always succeed, however if as a result of the operation
 * the approximated cardinality changed, 1 is returned. Otherwise 0
 * is returned. */
这个函数总是成功的,除了 操作导致基数值改变了就返回1,其它情况返回0
int hllDenseSet(uint8_t *registers, long index, uint8_t count) {
    uint8_t oldcount;

    HLL_DENSE_GET_REGISTER(oldcount,registers,index);  获取index所在寄存器的值
    if (count > oldcount) {
        HLL_DENSE_SET_REGISTER(registers,index,count); 设置index所在寄存器的值
        return 1;
    } else {
        return 0;
    }
}
******************************************************************************************
/* "Add" the element in the dense hyperloglog data structure.
 * Actually nothing is added, but the max 0 pattern counter of the subset
 * the element belongs to is incremented if needed.
在密集表示的HLL数据结构中添加元素。实际上没有添加任何内容,不过是最大的子集中模式0的基树可能增加
 * This is just a wrapper to hllDenseSet(), performing the hashing of the
 * element in order to retrieve the index and zero-run count. */
这个函数是函数hllDenseSet的封装,执行元素的hash为了获取索引和零行程的计数
int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
    long index;
    uint8_t count = hllPatLen(ele,elesize,&index);
    /* Update the register if this element produced a longer run of zeroes. */ 更新寄存器 如果元素产生了一个更长的零行程
    return hllDenseSet(registers,index,count);
}
******************************************************************************************
/* Compute the register histogram in the dense representation. */
计算密集表示中寄存器的直方图
void hllDenseRegHisto(uint8_t *registers, int* reghisto) {
    int j;

    /* Redis default is to use 16384 registers 6 bits each. The code works
     * with other values by modifying the defines, but for our target value
     * we take a faster path with unrolled loops. */
redis默认使用16384个6比特的寄存器。通过修改定义这个代码可以工作在其它的值下,
但是对我们目标值来说,采用了更快的展开循环的方式进行(获取更快的速度)
    if (HLL_REGISTERS == 16384 && HLL_BITS == 6) {
        uint8_t *r = registers;
        unsigned long r0, r1, r2, r3, r4, r5, r6, r7, r8, r9,
                      r10, r11, r12, r13, r14, r15;
        for (j = 0; j < 1024; j++) {
            /* Handle 16 registers per iteration. */
            r0 = r[0] & 63;
            r1 = (r[0] >> 6 | r[1] << 2) & 63;
            r2 = (r[1] >> 4 | r[2] << 4) & 63;
            r3 = (r[2] >> 2) & 63;
            r4 = r[3] & 63;
            r5 = (r[3] >> 6 | r[4] << 2) & 63;
            r6 = (r[4] >> 4 | r[5] << 4) & 63;
            r7 = (r[5] >> 2) & 63;
            r8 = r[6] & 63;
            r9 = (r[6] >> 6 | r[7] << 2) & 63;
            r10 = (r[7] >> 4 | r[8] << 4) & 63;
            r11 = (r[8] >> 2) & 63;
            r12 = r[9] & 63;
            r13 = (r[9] >> 6 | r[10] << 2) & 63;
            r14 = (r[10] >> 4 | r[11] << 4) & 63;
            r15 = (r[11] >> 2) & 63;

            reghisto[r0]++;
            reghisto[r1]++;
            reghisto[r2]++;
            reghisto[r3]++;
            reghisto[r4]++;
            reghisto[r5]++;
            reghisto[r6]++;
            reghisto[r7]++;
            reghisto[r8]++;
            reghisto[r9]++;
            reghisto[r10]++;
            reghisto[r11]++;
            reghisto[r12]++;
            reghisto[r13]++;
            reghisto[r14]++;
            reghisto[r15]++;

            r += 12;  12 * 8  = 16 * 6  一个字节是8比特,所以12个字节对应 16个6比特的寄存器
        }
    } else {  //其它值的模式下,采用定义
        for(j = 0; j < HLL_REGISTERS; j++) {
            unsigned long reg;
            HLL_DENSE_GET_REGISTER(reg,registers,j);
            reghisto[reg]++;
        }
    }
}
******************************************************************************************
/* ================== Sparse representation implementation  ================= */  稀疏表示实现

/* Convert the HLL with sparse representation given as input in its dense
 * representation. Both representations are represented by SDS strings, and
 * the input representation is freed as a side effect.
对于输入为稀疏表示的HLL转化为密集表示。两种表示都使用SDS字符串,作为一种副作用,输入表示被释放(即函数调用之后,输入的表示不存了)
 * The function returns C_OK if the sparse representation was valid,
 * otherwise C_ERR is returned if the representation was corrupted. */
这个函数返回C_OK,如果稀疏表示是有效的,否则返回C_ERR,如果稀疏表示奔溃了
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

int hllSparseToDense(robj *o) {
    sds sparse = o->ptr, dense;
    struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse; 旧HLL结构
    int idx = 0, runlen, regval;
    uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse); 获取结尾

    /* If the representation is already the right one return ASAP. */
    如果表示是我们需要的那个,立即返回
    hdr = (struct hllhdr*) sparse;
    if (hdr->encoding == HLL_DENSE) return C_OK;

    /* Create a string of the right size filled with zero bytes.
     * Note that the cached cardinality is set to 0 as a side effect
     * that is exactly the cardinality of an empty HLL. */
创建一个用零填充的大小合适的字符串
注意这个缓存的基数被设置为0是一个副作用,正好是空HLL的基数
    dense = sdsnewlen(NULL,HLL_DENSE_SIZE); 根据长度创建一个新字符串
    hdr = (struct hllhdr*) dense;
    *hdr = *oldhdr; /* This will copy the magic and cached cardinality. */ 将旧值赋值给新创建的字符串
    hdr->encoding = HLL_DENSE; 编码类型设置为密集

    /* Now read the sparse representation and set non-zero registers accordingly. */
现在读取稀疏表示并且设置对应非0寄存器
    p += HLL_HDR_SIZE; 跳过结构体头部,指向数据部分
    while(p < end) {  没有到结尾
        if (HLL_SPARSE_IS_ZERO(p)) {  是零操作符
            runlen = HLL_SPARSE_ZERO_LEN(p);  获取长度
            idx += runlen;  
            p++;
        } else if (HLL_SPARSE_IS_XZERO(p)) { X零操作符
            runlen = HLL_SPARSE_XZERO_LEN(p);
            idx += runlen;
            p += 2;
        } else {
            runlen = HLL_SPARSE_VAL_LEN(p);
            regval = HLL_SPARSE_VAL_VALUE(p);
            if ((runlen + idx) > HLL_REGISTERS) break; /* Overflow. */
            while(runlen--) {
                HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval);设置寄存器的值
                idx++;
            }
            p++;
        }
    }

    /* If the sparse representation was valid, we expect to find idx
     * set to HLL_REGISTERS. */
     如果稀疏表示是有效的,我们期望找到idx设置为HLL_REGISTERS
    if (idx != HLL_REGISTERS) { 不是有效表示
        sdsfree(dense);
        return C_ERR;
    }

    /* Free the old representation and set the new one. */ 释放旧的表示 设置新值
    sdsfree(o->ptr);
    o->ptr = dense;
    return C_OK;
}
******************************************************************************************
/* Low level function to set the sparse HLL register at 'index' to the
 * specified value if the current value is smaller than 'count'.
底层的函数  用来设置索引位置的HLL稀疏寄存器的值,如果当前值小于计数值
 * The object 'o' is the String object holding the HLL. The function requires
 * a reference to the object in order to be able to enlarge the string if
 * needed.
对象o是保存hll的字符串。这个函数需要一个对象的引用,为了能够在需要的时候扩大字符串
 * On success, the function returns 1 if the cardinality changed, or 0
 * if the register for this element was not updated.
 * On error (if the representation is invalid) -1 is returned.
成功的情况下,如果基数改变了函数返回1,否则返回0如果这个元素的寄存器没有更新。
错误的情况下(如果这个表达式是无效的),返回-1
 * As a side effect the function may promote the HLL representation from
 * sparse to dense: this happens when a register requires to be set to a value
 * not representable with the sparse representation, or when the resulting
 * size would be greater than server.hll_sparse_max_bytes. */
作为一个副作用,这个函数可能促进HLL的表示从稀疏转向密集:
这种情况发生在一个寄存器需要设置一个非稀疏表示的值,
或者这个结果的大小将大于定义的server.hll_sparse_max_bytes的值
int hllSparseSet(robj *o, long index, uint8_t count) {
    struct hllhdr *hdr;
    uint8_t oldcount, *sparse, *end, *p, *prev, *next;
    long first, span;
    long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0;

    /* If the count is too big to be representable by the sparse representation
     * switch to dense representation. */
     如果计数太大了导致不能稀疏表示,转向密集表示
    if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote;
    如果超过了稀疏表示能表示的最大值,转向密集表示

    /* When updating a sparse representation, sometimes we may need to
     * enlarge the buffer for up to 3 bytes in the worst case (XZERO split
     * into XZERO-VAL-XZERO). Make sure there is enough space right now
     * so that the pointers we take during the execution of the function
     * will be valid all the time. */
当更新稀疏表示时候,有时候在最差的情况下,我们需要扩大缓存增加3个字节(XZERO分裂为XZERO-VAL-XZERO).
请确保现在有足够的空间,以便我们在函数执行期间使用的指针始终有效。

    o->ptr = sdsMakeRoomFor(o->ptr,3); 扩大了3个字节,确保指针有效

    /* Step 1: we need to locate the opcode we need to modify to check
     * if a value update is actually needed. */
步骤1,我们需要定位需要修改的操作符,检查是否需要更新值
    sparse = p = ((uint8_t*)o->ptr) + HLL_HDR_SIZE;  开始位置
     end = p + sdslen(o->ptr) - HLL_HDR_SIZE;  结束位置

    first = 0;
    prev = NULL; /* Points to previous opcode at the end of the loop. */  在循环结束的时候,指向前一个操作码
    next = NULL; /* Points to the next opcode at the end of the loop. */  在循环结束的时候,指向下一个操作码
    span = 0;
    while(p < end) {  还没有结束
        long oplen;

        /* Set span to the number of registers covered by this opcode.
         将span变量设置为这个操作码覆盖的寄存器
         * This is the most performance critical loop of the sparse
         * representation. Sorting the conditionals from the most to the
         * least frequent opcode in many-bytes sparse HLLs is faster. */
这是稀疏表示中性能最关键的循环。将条件从多字节稀疏HLL中的最频繁操作码排序到最不频繁操作码达到最快的速度。
 
        oplen = 1;
        if (HLL_SPARSE_IS_ZERO(p)) {  短途的零操作最多
            span = HLL_SPARSE_ZERO_LEN(p);
        } else if (HLL_SPARSE_IS_VAL(p)) {  然后蚕食值操作
            span = HLL_SPARSE_VAL_LEN(p);
        } else { /* XZERO. */
            span = HLL_SPARSE_XZERO_LEN(p);  最不频繁的就是长途的零
            oplen = 2;
        }
        /* Break if this opcode covers the register as 'index'. */  
        如果当前操作码包含了寄存器index所在的位置,那么就不用往下找了
        if (index <= first+span-1) break;
        prev = p; 保存前一个操作码
        p += oplen; 指向下一个操作码
        first += span; 变更初始值
    }
    if (span == 0 || p >= end) return -1; /* Invalid format. */ 无效格式  跨度为0或者超过结尾,格式错误

    next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;  指向下一个操作码
    if (next >= end) next = NULL; 如果下一个操作码超过结尾,赋值为空

    /* Cache current opcode type to avoid using the macro again and
     * again for something that will not change.
     * Also cache the run-length of the opcode. */
缓存当前操作码类型,以避免对不会更改的内容反复使用宏。同时缓存操作码的行程长度。
    if (HLL_SPARSE_IS_ZERO(p)) {
        is_zero = 1;
        runlen = HLL_SPARSE_ZERO_LEN(p);
    } else if (HLL_SPARSE_IS_XZERO(p)) {
        is_xzero = 1;
        runlen = HLL_SPARSE_XZERO_LEN(p);
    } else {
        is_val = 1;
        runlen = HLL_SPARSE_VAL_LEN(p);
    }

    /* Step 2: After the loop:
     *
     * 'first' stores to the index of the first register covered
     *  by the current opcode, which is pointed by 'p'.
     first保存p指向的 当前寄存器中保存的操作码
     * 'next' ad 'prev' store respectively the next and previous opcode,
     *  or NULL if the opcode at 'p' is respectively the last or first.
     *next和prev分辨保存下一个和前一个的操作码或者空,如果操作在p分别是最后一个或者第一个
     * 'span' is set to the number of registers covered by the current
     *  opcode.
     *span被设置为本当前操作码覆盖的寄存器数目(就是当前)
     * There are different cases in order to update the data structure
     * in place without generating it from scratch:
     有多种不同的情况, 可以更新对应位置的数据结构而不用破坏原来的结构
     * A) If it is a VAL opcode already set to a value >= our 'count'
     *    no update is needed, regardless of the VAL run-length field.
     *    In this case PFADD returns 0 since no changes are performed.
     情况A,如果对应位置是一个值,并且大于我们需要更新的值count,不需要更新,
     无论值的行程有多长。在这种情况下,PFADD返回0,因为没有任何更新需要执行
     * B) If it is a VAL opcode with len = 1 (representing only our
     *    register) and the value is less than 'count', we just update it
     *    since this is a trivial case. */
     情况B,如果恰好是一个值操作码,而且长度为1(只表示了我们当前所在的寄存器,就是p指向的寄存器)
     并且值比我们的count小,我们只需要更新它即可,因为这也是一种平凡的情况,不需要改变原来的数据结构.
    if (is_val) {
        oldcount = HLL_SPARSE_VAL_VALUE(p);
        /* Case A. */  情况A,无需改变
        if (oldcount >= count) return 0;

        /* Case B. */ 情况B,值操作码长度为1,只需要更新值即可
        if (runlen == 1) {
            HLL_SPARSE_VAL_SET(p,count,1);
            goto updated;
        }
    }

    /* C) Another trivial to handle case is a ZERO opcode with a len of 1.
     * We can just replace it with a VAL opcode with our value and len of 1. */
     情况C,另外一个平凡的情况是 需要更新的值是一个长度为1的零操作码,我们只需要用长度为1的值操作码替代即可
    if (is_zero && runlen == 1) {  需要更新的是长度为1的零操作码,直接用长度为1的值操作码更新即可
        HLL_SPARSE_VAL_SET(p,count,1);
        goto updated;
    }

    /* D) General case.
     *一般情况,就是需要改变数据结构的情况
     * The other cases are more complex: our register requires to be updated
     * and is either currently represented by a VAL opcode with len > 1,
     * by a ZERO opcode with len > 1, or by an XZERO opcode.
     其它情况比较复杂:我们的寄存器需要更新,并且当前表达式是一个长度大于1的值操作码,
     或者长度大于1的零操作码,或者XZERO操作码。
     * In those cases the original opcode must be split into multiple
     * opcodes. The worst case is an XZERO split in the middle resuling into
     * XZERO - VAL - XZERO, so the resulting sequence max length is 5 bytes.
     *在这些情况中,原始的操作码需要分裂成多个操作码。最坏的情况是一个XZERO在中间分裂成XZERO-VAL-XZERO,
     这样导致序列最大长度为5个字节。
     * We perform the split writing the new sequence into the 'new' buffer
     * with 'newlen' as length. Later the new sequence is inserted in place
     * of the old one, possibly moving what is on the right a few bytes
     * if the new sequence is longer than the older one. */
    我们执行分裂操作,将新序列写入到名为new实际长度为newlen的缓存.后续再将新序列插入到旧值原来的地方,
    如果新值的长度超过旧值,那么可能需要向右移动几个字节。
    uint8_t seq[5], *n = seq;
    int last = first+span-1; /* Last register covered by the sequence. */ 最后一个被序列覆盖的寄存器
    int len;

    if (is_zero || is_xzero) {  原本是0序列, 短的零序列 和长的零序列
        /* Handle splitting of ZERO / XZERO. */ 分裂 ZERO 或者 XZERO序列.
        if (index != first) {  不是头部
            len = index-first;  偏离头部长度
            if (len > HLL_SPARSE_ZERO_MAX_LEN) {  超过了ZERO的最大长度,那么就是XZERO
                HLL_SPARSE_XZERO_SET(n,len);
                n += 2;
            } else {
                HLL_SPARSE_ZERO_SET(n,len);
                n++;
            }
        }
        HLL_SPARSE_VAL_SET(n,count,1);
        n++;
        if (index != last) { 如果不是尾部
            len = last-index;偏离尾部的距离
            if (len > HLL_SPARSE_ZERO_MAX_LEN) {  超过了ZERO的最大长度,那么就是XZERO
                HLL_SPARSE_XZERO_SET(n,len);
                n += 2;
            } else {
                HLL_SPARSE_ZERO_SET(n,len);
                n++;
            }
        }
    } else { 原本是个值
        /* Handle splitting of VAL. */ 处理值的分裂情况,多个连续相同的原值
        int curval = HLL_SPARSE_VAL_VALUE(p); 获取原值

        if (index != first) { 不是头部
            len = index-first;
            HLL_SPARSE_VAL_SET(n,curval,len);
            n++;
        }
        HLL_SPARSE_VAL_SET(n,count,1); 新值
        n++;
        if (index != last) {
            len = last-index;
            HLL_SPARSE_VAL_SET(n,curval,len);原值
            n++;
        }
    }

    /* Step 3: substitute the new sequence with the old one.
     步骤3,用新的序列代替老的序列
     * Note that we already allocated space on the sds string
     * calling sdsMakeRoomFor(). */
     注意到我们已经在sds字符串调用函数sdsMakeRoomFor()中分配空间
     int seqlen = n-seq;  新值的长度
     int oldlen = is_xzero ? 2 : 1;  旧值的长度
     int deltalen = seqlen-oldlen;  两者差值

     if (deltalen > 0 &&
         sdslen(o->ptr)+deltalen > server.hll_sparse_max_bytes) goto promote;  如果超出了最大稀疏表示,就改用密集表示
     if (deltalen && next) memmove(next+deltalen,next,end-next);  
     如果有新旧值长度有差值并且原值有后续操作码,那么向右移动后续的操作码,腾出新值所需的空间
     sdsIncrLen(o->ptr,deltalen); 需改字符串的空间大小
     memcpy(p,seq,seqlen); 将新值拷贝到上面腾出的空间中
     end += deltalen;  尾部位置需要将新增加的空间计算上

updated:
    /* Step 4: Merge adjacent values if possible.
     *步骤4 如果可能就合并相邻的值
     * The representation was updated, however the resulting representation
     * may not be optimal: adjacent VAL opcodes can sometimes be merged into
     * a single one. */
     表达式被更新了,然而这个结果表达式可能不是最优的:相邻的值操作码能被合并到一个新的值操作码
    p = prev ? prev : sparse; 指向前一个操作码,如果没有就从头开始
    int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */从前一个值开始扫描5个更新操作码字节,因为最长也就是5个
    while (p < end && scanlen--) {
        if (HLL_SPARSE_IS_XZERO(p)) {
            p += 2;
            continue;
        } else if (HLL_SPARSE_IS_ZERO(p)) {
            p++;
            continue;
        }
        /* We need two adjacent VAL opcodes to try a merge, having
         * the same value, and a len that fits the VAL opcode max len. */
         我们需要合并相邻的值操作码,拥有相同的值,并且长度符合值操作码的最大长度
        if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) {
            int v1 = HLL_SPARSE_VAL_VALUE(p);
            int v2 = HLL_SPARSE_VAL_VALUE(p+1);
            if (v1 == v2) {
                int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1); 将两个同样值的长度相加
                if (len <= HLL_SPARSE_VAL_MAX_LEN) { 小于最大长度
                    HLL_SPARSE_VAL_SET(p+1,v1,len); 设置到p+1的位置
                    memmove(p,p+1,end-p); 向前移动一个位置
                    sdsIncrLen(o->ptr,-1); 字符串长度减去一个位置
                    end--;
                    /* After a merge we reiterate without incrementing 'p'
                     * in order to try to merge the just merged value with
                     * a value on its right. */
                     经过合并,我们在不增加p的情况下,重复尝试合并刚合并的值和它右边的值
                    continue;
                }
            }
        }
        p++;  前后两个都是值,不用合并的情况下,向后移动一位,继续尝试合并
    }

    /* Invalidate the cached cardinality. */  使得存储的基数无效
    hdr = o->ptr;
    HLL_INVALIDATE_CACHE(hdr);
    return 1;

promote: /* Promote to dense representation. */ 变化为密集表示
    if (hllSparseToDense(o) == C_ERR) return -1; /* Corrupted HLL. */  变成密集表示失败
    hdr = o->ptr;

    /* We need to call hllDenseAdd() to perform the operation after the
     * conversion. However the result must be 1, since if we need to
     * convert from sparse to dense a register requires to be updated.
     *在转化为密集表示后,我们需要调用hllDenseAdd去执行操作修改旧值,
     这个结果必须为1,因为我们从稀疏转向密集,必定有寄存器需要修改,否则不需要转密集
     * Note that this in turn means that PFADD will make sure the command
     * is propagated to slaves / AOF, so if there is a sparse -> dense
     * conversion, it will be performed in all the slaves as well. */
     注意,这反过来意味着PFADD将确保命令被传播到slaves/AOF,
     因此如果存在稀疏->密集转换,它也将在所有slave中执行
    int dense_retval = hllDenseSet(hdr->registers,index,count);
    serverAssert(dense_retval == 1);  确保为1
    return dense_retval;
}
******************************************************************************************
/* "Add" the element in the sparse hyperloglog data structure.
 * Actually nothing is added, but the max 0 pattern counter of the subset
 * the element belongs to is incremented if needed.
在稀疏表示的HLL数据结构中添加元素。实际上什么也没有添加。但是如果所属子集的最大0模式的计数增加,将会做出变动。
 * This function is actually a wrapper for hllSparseSet(), it only performs
 * the hashshing of the elmenet to obtain the index and zeros run length. */
这个函数实际上是对hllSparseSet的封装。它只是执行元素hash函数来获取位置和零行程的长度
int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) {
    long index;
    uint8_t count = hllPatLen(ele,elesize,&index); 获取统计的零行程长度
    /* Update the register if this element produced a longer run of zeroes. */ 更新寄存器如果元素馋了一个更长的零行程
    return hllSparseSet(o,index,count);
}
******************************************************************************************
/* Compute the register histogram in the sparse representation. */
计算稀疏表示中寄存器的直方图
void hllSparseRegHisto(uint8_t *sparse, int sparselen, int *invalid, int* reghisto) {
    int idx = 0, runlen, regval;
    uint8_t *end = sparse+sparselen, *p = sparse;

    while(p < end) {  序列没有结束
        if (HLL_SPARSE_IS_ZERO(p)) {  是0行程
            runlen = HLL_SPARSE_ZERO_LEN(p);
            idx += runlen;
            reghisto[0] += runlen; 零个数的统计
            p++;
        } else if (HLL_SPARSE_IS_XZERO(p)) {
            runlen = HLL_SPARSE_XZERO_LEN(p);
            idx += runlen;
            reghisto[0] += runlen;  零个数的统计
            p += 2;
        } else {
            runlen = HLL_SPARSE_VAL_LEN(p);
            regval = HLL_SPARSE_VAL_VALUE(p);
            idx += runlen;
            reghisto[regval] += runlen;  值个数的统计
            p++;
        }
    }
    if (idx != HLL_REGISTERS && invalid) *invalid = 1;  寄存器个数正确并且传入的invalid非空,赋值invalid为1
}
******************************************************************************************

 

标签:index,redis6.0,HyperLogLog,VAL,++,笔记,操作码,HLL,SPARSE
来源: https://www.cnblogs.com/cquccy/p/14635748.html