跳至主要內容

SkipListMap

mozzie大约 14 分钟JavaJava

SkipListMap

底层结构

跳表 SkipList 是一个有序的链表,默认升序,底层是链表加多级索引的结构。跳表可以对元素进行快速查询,类似于平衡树,是一种利用空间换时间的算法

对于单链表,即使链表是有序的,如果查找数据也只能从头到尾遍历链表,所以采用链表上建索引的方式提高效率,跳表的查询时间复杂度是 O(logn),空间复杂度 O(n)

ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表,内部是跳表结构实现,通过 CAS + volatile 保证线程安全

平衡树和跳表的区别:

  • 对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整;而对跳表的插入和删除,只需要对整个结构的局部进行操作
  • 在高并发的情况下,保证整个平衡树的线程安全需要一个全局锁;对于跳表则只需要部分锁,拥有更好的性能

BaseHeader 存储数据,headIndex 存储索引,纵向上所有索引都指向链表最下面的节点

成员变量

  • 标识索引头节点位置
private static final Object BASE_HEADER = new Object();
  • 跳表的顶层索引
private transient volatile HeadIndex\<K,V\> head;
  • 比较器,为 null 则使用自然排序
final Comparator<? super K\> comparator;
  • Node 节点
static final class Node\<K, V\>{
    final K key;  				// key 是 final 的, 说明节点一旦定下来, 除了删除, 一般不会改动 key
    volatile Object value; 		// 对应的 value
    volatile Node\<K, V\> next; 	// 下一个节点,单向链表
}
  • 索引节点 Index,只有向下和向右的指针
static class Index\<K, V\>{
    final Node\<K, V\> node; 		// 索引指向的节点,每个都会指向数据节点
    final Index\<K, V\> down; 	// 下边level层的Index,分层索引
    volatile Index\<K, V\> right; // 右边的Index,单向

    // 在 index 本身和 succ 之间插入一个新的节点 newSucc
    final boolean link(Index\<K, V\> succ, Index\<K, V\> newSucc){
        Node\<K, V\> n = node;
        newSucc.right = succ;
        // 把当前节点的右指针从 succ 改为 newSucc
        return n.value != null && casRight(succ, newSucc);
    }

    // 断开当前节点和 succ 节点,将当前的节点 index 设置其的 right 为 succ.right,就是把 succ 删除
    final boolean unlink(Index\<K, V\> succ){
        return node.value != null && casRight(succ, succ.right);
    }
}
  • 头索引节点 HeadIndex
static final class HeadIndex\<K,V\> extends Index\<K,V\> {
    final int level;	// 表示索引层级,所有的 HeadIndex 都指向同一个 Base_header 节点
    HeadIndex(Node\<K,V\> node, Index\<K,V\> down, Index\<K,V\> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

成员方法

其他方法

  • 构造方法:
public ConcurrentSkipListMap() {
    this.comparator = null;	// comparator 为 null,使用 key 的自然序,如字典序
    initialize();
}
private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    // 初始化索引头节点,Node 的 key 为 null,value 为 BASE_HEADER 对象,下一个节点为 null
    // head 的分层索引 down 为 null,链表的后续索引 right 为 null,层级 level 为第 1 层
    head = new HeadIndex\<K,V\>(new Node\<K,V\>(null, BASE_HEADER, null), null, null, 1);
}
  • cpr:排序
// x 是比较者,y 是被比较者,比较者大于被比较者 返回正数,小于返回负数,相等返回 0
static final int cpr(Comparator c, Object x, Object y) {
    return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}

添加方法

  • findPredecessor():寻找前置节点 从最上层的头索引开始向右查找(链表的后续索引),如果后续索引的节点的 key 大于要查找的 key,则头索引移到下层链表,在下层链表查找,以此反复,一直查找到没有下层的分层索引为止,返回该索引的节点。如果后续索引的节点的 key 小于要查找的 key,则在该层链表中向后查找。由于查找的 key 可能永远大于索引节点的 key,所以只能找到目标的前置索引节点。如果遇到空值索引的存在,通过 CAS 来断开索引
private Node\<K,V\> findPredecessor(Object key, Comparator<? super K\> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        // 1.初始数据 q 是 head,r 是最顶层 h 的右 Index 节点
        for (Index\<K,V\> q = head, r = q.right, d;;) {
            // 2.右索引节点不为空,则进行向下查找
            if (r != null) {
                Node\<K,V\> n = r.node;
                K k = n.key;
                // 3.n.value 为 null 说明节点 n 正在删除的过程中,此时【当前线程帮其删除索引】
                if (n.value == null) {
                    // 在 index 层直接删除 r 索引节点
                    if (!q.unlink(r))
                        // 删除失败重新从 head 节点开始查找,break 一个 for 到步骤 1,又从初始值开始
                        break;
                    
                    // 删除节点 r 成功,获取新的 r 节点,
                    r = q.right;
                    // 回到步骤 2,还是从这层索引开始向右遍历
                    continue;
                }
                // 4.若参数 key > r.node.key,则继续向右遍历, continue 到步骤 2 处获取右节点
                //   若参数 key < r.node.key,说明需要进入下层索引,到步骤 5
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            // 5.先让 d 指向 q 的下一层,判断是否是 null,是则说明已经到了数据层,也就是第一层
            if ((d = q.down) == null) 
                return q.node;
            // 6.未到数据层, 进行重新赋值向下扫描
            q = d;		// q 指向 d
            r = d.right;// r 指向 q 的后续索引节点,此时(q.key < key < r.key)
        }
    }
}
  • put():添加数据
public V put(K key, V value) {
    // 非空判断,value不能为空
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node\<K,V\> z;
    // 非空判断,key 不能为空
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K\> cmp = comparator;
    // outer 循环,【把待插入数据插入到数据层的合适的位置,并在扫描过程中处理已删除(value = null)的数据】
    outer: for (;;) {
        //0.for (;;)
        //1.将 key 对应的前继节点找到, b 为前继节点,是数据层的, n 是前继节点的 next, 
		//  若没发生条件竞争,最终 key 在 b 与 n 之间 (找到的 b 在 base_level 上)
        for (Node\<K,V\> b = findPredecessor(key, cmp), n = b.next;;) {
            // 2.n 不为 null 说明 b 不是链表的最后一个节点
            if (n != null) {
                Object v; int c;
                // 3.获取 n 的右节点
                Node\<K,V\> f = n.next;
                // 4.条件竞争,并发下其他线程在 b 之后插入节点或直接删除节点 n, break 到步骤 0
                if (n != b.next)              
                    break;
                //  若节点 n 已经删除, 则调用 helpDelete 进行【帮助删除节点】
                if ((v = n.value) == null) {
                    n.helpDelete(b, f);
                    break;
                }
                // 5.节点 b 被删除中,则 break 到步骤 0,
				//  【调用findPredecessor帮助删除index层的数据, node层的数据会通过helpDelete方法进行删除】
                if (b.value == null || v == n) 
                    break;
                // 6.若 key > n.key,则进行向后扫描
                //   若 key < n.key,则证明 key 应该存储在 b 和 n 之间
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                // 7.key 的值和 n.key 相等,则可以直接覆盖赋值
                if (c == 0) {
                    // onlyIfAbsent 默认 false,
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        // 返回被覆盖的值
                        return vv;
                    }
                    // cas失败,break 一层循环,返回 0 重试
                    break;
                }
                // else c < 0; fall through
            }
            // 8.此时的情况 b.key < key < n.key,对应流程图1中的7,创建z节点指向n
            z = new Node\<K,V\>(key, value, n);
            // 9.尝试把 b.next 从 n 设置成 z
            if (!b.casNext(n, z))
                // cas失败,返回到步骤0,重试
                break;
            // 10.break outer 后, 上面的 for 循环不会再执行, 而后执行下面的代码
            break outer;
        }
    }
	// 【以上插入节点已经完成,剩下的任务要根据随机数的值来表示是否向上增加层数与上层索引】
    
    // 随机数
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    
    // 如果随机数的二进制与 10000000000000000000000000000001 进行与运算为 0
    // 即随机数的二进制最高位与最末尾必须为 0,其他位无所谓,就进入该循环
    // 如果随机数的二进制最高位与最末位不为 0,不增加新节点的层数
    
    // 11.判断是否需要添加 level,32 位
    if ((rnd & 0x80000001) == 0) {
        // 索引层 level,从 1 开始,就是最底层
        int level = 1, max;
        // 12.判断最低位前面有几个 1,有几个leve就加几:0..0 0001 1110,这是4个,则1+4=5
        //    【最大有30个就是 1 + 30 = 31
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        // 最终会指向 z 节点,就是添加的节点 
        Index\<K,V\> idx = null;
        // 指向头索引节点
        HeadIndex\<K,V\> h = head;
        
        // 13.判断level是否比当前最高索引小,图中 max 为 3
        if (level <= (max = h.level)) {
            for (int i = 1; i <= level; ++i)
                // 根据层数level不断创建新增节点的上层索引,索引的后继索引留空
                // 第一次idx为null,也就是下层索引为空,第二次把上次的索引作为下层索引,【类似头插法】
                idx = new Index\<K,V\>(z, idx, null);
            // 循环以后的索引结构
            // index-3	← idx
            //   ↓
            // index-2
            //   ↓
            // index-1
            //   ↓
            //  z-node
        }
        // 14.若 level > max,则【只增加一层 index 索引层】,3 + 1 = 4
        else { 
            level = max + 1;
            //创建一个 index 数组,长度是 level+1,假设 level 是 4,创建的数组长度为 5
            Index\<K,V\>[] idxs = (Index\<K,V\>[])new Index<?,?>[level+1];
            // index[0]的数组 slot 并没有使用,只使用 [1,level] 这些数组的 slot
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index\<K,V\>(z, idx, null);
              		// index-4   ← idx
                    //   ↓
                  	// ......
                    //   ↓
                    // index-1
                    //   ↓
                    //  z-node
            
            for (;;) {
                h = head;
                // 获取头索引的层数,3
                int oldLevel = h.level;
                // 如果 level <= oldLevel,说明其他线程进行了 index 层增加操作,退出循环
                if (level <= oldLevel)
                    break;
                // 定义一个新的头索引节点
                HeadIndex\<K,V\> newh = h;
                // 获取头索引的节点,就是 BASE_HEADER
                Node\<K,V\> oldbase = h.node;
                // 升级 baseHeader 索引,升高一级,并发下可能升高多级
                for (int j = oldLevel + 1; j <= level; ++j)
                    // 参数1:底层node,参数二:down,为以前的头节点,参数三:right,新建
                    newh = new HeadIndex\<K,V\>(oldbase, newh, idxs[j], j);
                // 执行完for循环之后,baseHeader 索引长这个样子,这里只升高一级
                // index-4             →             index-4	← idx
                //   ↓                                  ↓
                // index-3                           index-3     
                //   ↓                                  ↓
                // index-2                           index-2
                //   ↓                                  ↓
                // index-1                           index-1
                //   ↓                                  ↓
                // baseHeader    →    ....      →     z-node
                
                // cas 成功后,head 字段指向最新的 headIndex,baseHeader 的 index-4
                if (casHead(h, newh)) {
                    // h 指向最新的 index-4 节点
                    h = newh;
                    // 让 idx 指向 z-node 的 index-3 节点,
					// 因为从 index-3 - index-1 的这些 z-node 索引节点 都没有插入到索引链表
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // 15.【把新加的索引插入索引链表中】,有上述两种情况,一种索引高度不变,另一种是高度加 1
        // 要插入的是第几层的索引
        splice: for (int insertionLevel = level;;) {
            // 获取头索引的层数,情况 1 是 3,情况 2 是 4
            int j = h.level;
            // 【遍历 insertionLevel 层的索引,找到合适的插入位置】
            for (Index\<K,V\> q = h, r = q.right, t = idx;;) {
                // 如果头索引为 null 或者新增节点索引为 null,退出插入索引的总循环
                if (q == null || t == null)
                    // 此处表示有其他线程删除了头索引或者新增节点的索引
                    break splice;
                // 头索引的链表后续索引存在,如果是新层则为新节点索引,如果是老层则为原索引
                if (r != null) {
                    // 获取r的节点
                    Node\<K,V\> n = r.node;
                    // 插入的key和n.key的比较值
                    int c = cpr(cmp, key, n.key);
                    // 【删除空值索引】
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    // key > r.node.key,向右扫描
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                // 执行到这里,说明 key < r.node.key,判断是否是第 j 层插入新增节点的前置索引
                if (j == insertionLevel) {
                    // 【将新索引节点 t 插入 q r 之间】
                    if (!q.link(r, t))
                        break; 
                    // 如果新增节点的值为 null,表示该节点已经被其他线程删除
                    if (t.node.value == null) {
                        // 找到该节点
                        findNode(key);
                        break splice;
                    }
                    // 插入层逐层自减,当为最底层时退出循环
                    if (--insertionLevel == 0)
                        break splice;
                }
				// 其他节点随着插入节点的层数下移而下移
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
  • findNode()
private Node\<K,V\> findNode(Object key) {
    // 原理与doGet相同,无非是 findNode 返回节点,doGet 返回 value
    if ((c = cpr(cmp, key, n.key)) == 0)
        return n;
}

获取方法

  • get(key):获取对应的数据
public V get(Object key) {
    return doGet(key);
}
  • doGet():扫描过程会对已 value = null 的元素进行删除处理
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K\> cmp = comparator;
    outer: for (;;) {
        // 1.找到最底层节点的前置节点
        for (Node\<K,V\> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 2.【如果该前置节点的链表后续节点为 null,说明不存在该节点】
            if (n == null)
                break outer;
            // b → n → f
            Node\<K,V\> f = n.next;
            // 3.如果n不为前置节点的后续节点,表示已经有其他线程删除了该节点
            if (n != b.next) 
                break;
            // 4.如果后续节点的值为null,【需要帮助删除该节点】
            if ((v = n.value) == null) {
                n.helpDelete(b, f);
                break;
            }
            // 5.如果前置节点已被其他线程删除,重新循环
            if (b.value == null || v == n)
                break;
             // 6.如果要获取的key与后续节点的key相等,返回节点的value
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            // 7.key < n.key,因位 key > b.key,b 和 n 相连,说明不存在该节点或者被其他线程删除了
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

删除方法

  • remove()
public V remove(Object key) {
    return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K\> cmp = comparator;
    outer: for (;;) {
        // 1.找到最底层目标节点的前置节点,b.key < key
        for (Node\<K,V\> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            // 2.如果该前置节点的链表后续节点为 null,退出循环,说明不存在这个元素
            if (n == null)
                break outer;
            // b → n → f
            Node\<K,V\> f = n.next;
            if (n != b.next)                    // inconsistent read
                break;
            if ((v = n.value) == null) {        // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)      // b is deleted
                break;
            //3.key < n.key,说明被其他线程删除了,或者不存在该节点
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            //4.key > n.key,继续向后扫描
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            //5.到这里是 key = n.key,value 不为空的情况下判断 value 和 n.value 是否相等
            if (value != null && !value.equals(v))
                break outer;
            //6.【把 n 节点的 value 置空】
            if (!n.casValue(v, null))
                break;
            //7.【给 n 添加一个删除标志 mark】,mark.next = f,然后把 b.next 设置为 f,成功后n出队
            if (!n.appendMarker(f) || !b.casNext(n, f))
                // 对 key 对应的 index 进行删除,调用了 findPredecessor 方法
                findNode(key);
            else {
                // 进行操作失败后通过 findPredecessor 中进行 index 的删除
                findPredecessor(key, cmp);
                if (head.right == null)
                    // 进行headIndex 对应的index 层的删除
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

经过 findPredecessor() 中的 unlink() 后索引已经被删除

  • appendMarker():添加删除标记节点
boolean appendMarker(Node\<K,V\> f) {
    // 通过 CAS 让 n.next 指向一个 key 为 null,value 为 this,next 为 f 的标记节点
    return casNext(f, new Node\<K,V\>(f));
}
  • helpDelete():将添加了删除标记的节点清除,参数是该节点的前驱和后继节点
void helpDelete(Node\<K,V\> b, Node\<K,V\> f) {
    // this 节点的后续节点为 f,且本身为 b 的后续节点,一般都是正确的,除非被别的线程删除
    if (f == next && this == b.next) {
        // 如果 n 还还没有被标记
        if (f == null || f.value != f) 
            casNext(f, new Node\<K,V\>(f));
        else
            // 通过 CAS,将 b 的下一个节点 n 变成 f.next,即成为图中的样式
            b.casNext(this, f.next);
    }
}
  • tryReduceLevel():删除索引
private void tryReduceLevel() {
    HeadIndex\<K,V\> h = head;
    HeadIndex\<K,V\> d;
    HeadIndex\<K,V\> e;
    if (h.level > 3 &&
        (d = (HeadIndex\<K,V\>)h.down) != null &&
        (e = (HeadIndex\<K,V\>)d.down) != null &&
        e.right == null &&
        d.right == null &&
        h.right == null &&
        // 设置头索引
        casHead(h, d) && 
        // 重新检查
        h.right != null) 
        // 重新检查返回true,说明其他线程增加了索引层级,把索引头节点设置回来
        casHead(d, h);   
}

参考文章:https://my.oschina.net/u/3768341/blog/3135659

参考视频:https://www.bilibili.com/video/BV1Er4y1P7k1

贡献者: mozzie