链表加速器--浅谈跳表SkipList及在Redis中的应用
作者:互联网
问题背景
众所周知,对于一个有序集合中元素的查找,如果集合是通过数组存储的,那么利用数组随机访问的高效性通过二分查找算法可以快速的查找元素。那么设想如果该集合是通过链表存储,是否也能实现快速查找的功能呢?
知识点
链表也是一种数据结构,和数组不同的是,它不需要一块连续的内存空间,链表中每个元素会维护一个指向下一个元素的指针,由于每个元素只知道下一个元素,所以查找元素必须从链表头开始,一个一个遍历直到查找到元素或者已到达链表尾部。
链表的特点:查询慢、插入删除快
解决思路
模仿二分查找思路,是否链表可以变通的使用这种思路实现快速查找呢?跳表这种数据结构就是为了解决这个问题应运而生。
先看例子,例如链表中要搜索11这个元素,必须要经过6次查找才可以找到
二分查找的思路是不断的缩小查找的范围从而找到指定元素,那么我们可以在链表上层建立一级或者多级 “索引”,先通过索引确定范围,再到指定范围内搜索元素
这里通过索引查找元素11,如箭头所示,搜索的次数将会减少。这个就是跳表的实现思路。
特别说明
哪些节点会建立索引不是一个有规律的过程,而是一个随机的过程,在插入节点的时候,会通过随机数的方式计算该节点可以建立几个索引。
例如如果节点计算level=2 则在level1-2建立索引,level=4,则在level1-4建立索引
所以如上图的跳表,有可能是这样的索引,如下图
跳表实现
节点定义
static final int MAX_LEVEL = 16;
public class Node {
// 节点存储的数值
private int data = -1;
// 如果节点在多层均有索引,维护了该节点在每一层指向的下一级节点指针
private Node forwards[] = new Node[MAX_LEVEL];
// 节点在几个层级建立索引
private int maxLevel = 0;
}
如上代码所示,节点抽象成了一个高级的链表节点,与链表节点不同的是,链表节点只维护下一个节点的指针,而当前节点维护了节点在对应层级的多个指针,如图所示
节点查询
从最高层级(顶层)开始搜索节点,找到小于查找节点的最大节点,例如下图,查找13节点
- 先从level3开始遍历,搜索到节点5,节点5值小于13,并且在level3层级后续节点为空
- 继续下沉到level2遍历,搜索到节点9,节点9值小于13,并且在level2层级后续节点为空
- 继续下沉到最底层level1,搜索到节点11,节点11后续节点不为空但是不小于节点13,暂取该节点
- 判断步骤3中节点的下一级节点值是否等于查找节点,是则返回下一级节点,否则返回null
代码实现:
public Node find(int value) {
Node p = head;
// 从顶层开始遍历搜索,找到小于查找节点的最大节点
for (int i = levelCount - 1; i >= 0; --i) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
}
// 如果上述最大节点下一个节点不是待查找的节点 返回null 否则返回该值
if (p.forwards[0] != null && p.forwards[0].data == value) {
return p.forwards[0];
} else {
return null;
}
}
节点插入
插入元素的思想等同于单链表的思路 ,按照上述节点查询方法找到待插入元素的前驱节点及后继节点,变换指向的节点完成插入
不同的是,跳表会维护多级链表,所以如上操作也在多层级进行操作
代码实现:
// 限制最大的层级数量
private static final int MAX_LEVEL = 16;
// 定义层级数量
private int levelCount = 1;
// 定义头结点
private Node head = new Node();
// 随机数计算
private Random r = new Random()
public void insert(int value) {
// 使用随机数计算方法随机计算插入的值建立几级索引
int level = randomLevel();
// 新建节点
Node newNode = new Node();
newNode.data = value;
newNode.maxLevel = level;
Node update[] = new Node[level];
for (int i = 0; i < level; ++i) {
update[i] = head;
}
// 记录每个层级上小于插入值的最大值节点,插入的时候会将该节点指针指向插入值
Node p = head;
for (int i = level - 1; i >= 0; --i) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
update[i] = p;
}
// 插入节点 如同每个层级均进行了类似链表插入的指针变化操作
for (int i = 0; i < level; ++i) {
newNode.forwards[i] = update[i].forwards[i];
update[i].forwards[i] = newNode;
}
if (levelCount < level) levelCount = level;
}
private int randomLevel() {
int level = 1;
for (int i = 1; i < MAX_LEVEL; ++i) {
if (r.nextInt() % 2 == 1) {
level++;
}
}
return level;
}
整个跳表实现的完整代码
package skiplist;
import java.util.Random;
/**
* 跳表的一种实现方法。
* 跳表中存储的是正整数,并且存储的是不重复的。
*
*
*/
public class SkipList {
private static final int MAX_LEVEL = 16;
private int levelCount = 1;
private Node head = new Node(); // 带头链表
private Random r = new Random();
public Node find(int value) {
Node p = head;
for (int i = levelCount - 1; i >= 0; --i) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
}
if (p.forwards[0] != null && p.forwards[0].data == value) {
return p.forwards[0];
} else {
return null;
}
}
public void insert(int value) {
int level = randomLevel();
Node newNode = new Node();
newNode.data = value;
newNode.maxLevel = level;
Node update[] = new Node[level];
for (int i = 0; i < level; ++i) {
update[i] = head;
}
// record every level largest value which smaller than insert value in update[]
Node p = head;
for (int i = level - 1; i >= 0; --i) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
update[i] = p;// use update save node in search path
}
// in search path node next node become new node forwords(next)
for (int i = 0; i < level; ++i) {
newNode.forwards[i] = update[i].forwards[i];
update[i].forwards[i] = newNode;
}
// update node hight
if (levelCount < level) levelCount = level;
}
public void delete(int value) {
Node[] update = new Node[levelCount];
Node p = head;
for (int i = levelCount - 1; i >= 0; --i) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
update[i] = p;
}
if (p.forwards[0] != null && p.forwards[0].data == value) {
for (int i = levelCount - 1; i >= 0; --i) {
if (update[i].forwards[i] != null && update[i].forwards[i].data == value) {
update[i].forwards[i] = update[i].forwards[i].forwards[i];
}
}
}
}
// 随机 level 次,如果是奇数层数 +1,防止伪随机
private int randomLevel() {
int level = 1;
for (int i = 1; i < MAX_LEVEL; ++i) {
if (r.nextInt() % 2 == 1) {
level++;
}
}
return level;
}
public void printAll() {
Node p = head;
while (p.forwards[0] != null) {
System.out.print(p.forwards[0] + " ");
p = p.forwards[0];
}
System.out.println();
}
public class Node {
// 节点存储的数值
private int data = -1;
/**
* 如果节点在多层均有索引,维护了该节点在每一层指向的下一级节点指针
*/
private Node forwards[] = new Node[MAX_LEVEL];
// 节点在几个层级建立索引
private int maxLevel = 0;
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ data: ");
builder.append(data);
builder.append("; levels: ");
builder.append(maxLevel);
builder.append(" }");
return builder.toString();
}
}
}
跳表在Redis中的应用
跳表在redis中主要应用在sorted set这种数据类型中,我们可以看下redis中关于跳表的源码实现
预备知识点
redis源码使用C语言编写,有一些特殊的语法简要描述一下
1、如果变量前带*,代表该变量为指针变量
2、如果函数名前使用*修饰,代表函数返回值是一个指针类型数据
3、struct结构体,类似class定义
4、类似 zsl->level = 1; zsl->length = 0; 代表zsl指针指向的数据的level及length属性
源码基于Redis 3.2.11,下载解压后在src目录下查看源码。
跳表的节点及跳表的定义(server.h文件)
// 定义跳表节点
typedef struct zskiplistNode {
//节点数据
robj *obj;
// 排序字段
double score;
// 前驱指针
struct zskiplistNode *backward;
struct zskiplistLevel {
// 后继指针 定义为数组 每个节点维护多个指针,在每个层级指向相应的下一级
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
// 定义跳表
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
// 节点数量
unsigned long length;
// 层级数
int level;
} zskiplist;
zskiplistNode定义了跳表中的节点,其中包含以下几个属性:
- robj *obj 节点的数据
- score 节点分数,可以理解为排序字段(不同的节点允许score相同,在score相同的情况下比较数据内容,score不同的情况下优先比较score,在下面的插入源码中可以看到对应逻辑)
- backward 节点的前驱指针,一般链表节点会维护指向下一个节点的后继指针,这里这个指针是相反的概念,指向前一个节点。从定义上看前驱指针未定义成一个数组,说明只有第一层链表是双向链表
- zskiplistLevel 是一个结构体,其中forward代表节点的后继指针,并且该结构体定义为了数组,表示每个节点会维护多个指针,每个指针在每个层级指向相应的下一级。span代表当前指针跨越了多少个节点,这个计数不包括指针的起点节点,但包括指针的终点节点。用于计算元素排名(rank)
zskiplist 定义跳表数据结构,其中包含以下几个属性:
- *header, *tail 头指针节点及尾部指针节点
- length 代表链表的节点的数量
- level 代表跳表的层级数
跳表相关操作(t_zset.c文件)
1、跳表创建
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
2、跳表插入
// 随机计算层级
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
// 节点插入
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// update 维护待插入节点的前驱节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
/**
* 从顶层开始遍历 找到小于插入节点的所有节点中的最大节点,作为插入节点的前驱节点
* 比较节点首先通过score比较,在score相同的情况下比较节点内容
*/
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
// 确定待插入节点占几个层级 随机计算
level = zslRandomLevel();
// 处理如果插入节点的层级大于现有跳表的层级情况下 前驱节点update的赋值
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
// 新的层级上没有节点 所以待插入节点前驱节点即为head节点
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 创建节点
x = zslCreateNode(level,score,obj);
// 节点插入 类似单链表新增元素
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
为什么redis中使用跳表的数据结构,而不使用红黑树来实现?
-
在做范围查找的时候,红黑树比skiplist操作要复杂。在红黑树上,我们找到指定范围的小值之后,还需要以中序遍历的顺序继续寻找其它不超过大值的节点。而在skiplist上进行范围查找就非常简单,只需要在找到小值之后,对第1层链表进行若干步的遍历就可以实现
-
红黑树的插入和删除操作可能引发子树的调整,逻辑复杂,而skiplist的插入和删除只需要修改相邻节点的指针,操作简单又快速
-
从内存占用上来说,skiplist比平衡树更灵活一些
标签:Node,浅谈,forwards,level,int,Redis,update,链表,节点 来源: https://blog.csdn.net/wodeguozili/article/details/104092004