面试突击:ConcurrentHashMap 源码详解

本文已收录于:https://github.com/danmuking/all-in-one(持续更新)

前言

哈喽,大家好,我是 DanMu。这篇文章想和大家聊聊 ConcurrentHashMap 相关的知识点。严格来说,ConcurrentHashMap 属于java.lang.current,并属于通常意义上的容器,但是考虑到面试官非常喜欢打出 HashMap 和 ConcurrentHashMap 这套丝滑小连招,因此我决定将它们放在一起,来加深大家的印象,发车!1636366877800.jpg

数据结构

与 HashMap 一样,ConcurrentHashMap 的底层结构在JDK1.8的时候出现过一次比较大的变化,这篇文章将以JDK1.8的数据结构为主,对于1.8前的结构,大家只需要大致了解即可(JDK版本都已经更新到21了,让我看看哪个活化石面试官还问老版的数据结构)
dtb-1718985477846.jpg

JDK1.8之前

ConcurrentHashMap.drawio.png

JDK1.8ConcurrentHashMap-第 2 页.drawio.png

在JDK1.8之前,ConcurrentHashMap 采用 **Segment 数组 + HashEntry 数组 + 链表 **的结构,因此其最大并发数会收到 Segment 数量的限制。在 JDK 1.8中,对 ConcurrentHashMap 主要进行了两点改进:

  1. 细化锁的粒度,从针对 Segment 加锁修改为针对 每个数组元素单独加锁,
  2. 与 HashMap 一样,将 链表 的结构修改为 链表/红黑树 的结构,提高了搜索链表的效率

对ConcurrentHashMap的数据结构有了一个基础的认识之后,接下来就准备深入底层,彻彻底底的搞懂 ConcurrentHashMap 的实现原理。

ConcurrentHashMap 的源码相对于 HashMap 复杂了很多,希望大家能静下心来慢慢看,我会尽力帮助大家简单的理解源码中的思想。

核心源码详解

重要的成员变量

老规矩,还是先来看看 ConcurrentHashMap 中维护了哪些重要的成员属性:

// ConcurrentHashMap 中存放元素的最大值
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认的初始化大小
private static final int DEFAULT_CAPACITY = 16;
// 从链表转换为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 从红黑树转换为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 开启红黑树转换的数组阈值
// 只有同时满足 TREEIFY_THRESHOLD 和 MIN_TREEIFY_CAPACITY 才会转换为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
// 接下来几个都是标志位,用来表示扩容过程中的状态,在后面会详细解释
static final int MOVED     = -1; // 表示正在转移
static final int TREEBIN   = -2; // 表示已经转换成树
static final int RESERVED  = -3; // hash for transient reservations
// 实际存放元素的数组
transient volatile Node<K,V>[] table;
// 在扩容时会用到的数组
private transient volatile Node<K,V>[] nextTable;
/**
 * 用来控制表初始化和扩容的
 * 默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,先有个印象
 *  -1 说明正在初始化
    -N 说明有 N-1 个线程正在进行扩容
     0 默认状态,表明table没有初始化
    >0 表示 table 扩容的阈值,如果 table 已经初始化。
 */
private transient volatile int sizeCtl;
// 扩容时使用,用来表示扩容进行到哪里
private transient volatile int transferIndex;

初始化

接下来,来看看 ConcurrentHashMap 在初始化的时候做了什么

// 默认的初始化函数
// 可以看到默认的初始化函数啥都没干,所以实际分配空间也是推迟到第一次添加元素的时候
// 这里也是采用的懒加载
public ConcurrentHashMap() {
}
// 指定初始化容量的初始化函数
public ConcurrentHashMap(int initialCapacity) {
    // 这边先处理了一下两个边界情况
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               // tableSizeFor 的作用是找到最接近 initialCapacity 的2的次幂
               // 在扩容的时候也是扩容为原容量的2倍
               // 因此 ConcurrentHashMap 的容量永远是2的幂次
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}


// 因为 ConcurrentHashMap 使用了懒加载,因此实际分配空间将会推迟到第一次添加函数时
// 这里为了内容的连贯性,将它们放在一起
// initTable 初始化空间
private final Node<K,V>[] initTable() {
    // 用来存放 table 成员变量
    Node<K,V>[] tab; 
    // 用来存放 sizeCtl 成员变量
    int sc;

    // 判断空间初始化了没有
    // 注意:这里可能有多个线程想进行初始化,因此这里用了一个循环
    // 只有 cas 成功的线程才能进行初始化
    // 其他 cas 失败的线程会进入自旋,找到 ConcurrentHashMap 初始化完成,然后退出。
    while ((tab = table) == null || tab.length == 0) {
        // cas 失败的线程会走到这个条件
        // 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
        // 失败的线程释放 CPU,或者在 while 里自旋
        if ((sc = sizeCtl) < 0)
            // 让出 CPU 使用权
            Thread.yield(); // lost initialization race; just spin
        // cas 修改 sc 的值为 -1,只有一个线程能够成功,然后开始初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 上面说了,如果是带容量的初始化函数会将容量暂存在 sc 里
                    // 所以如果 sc > 0,就是走带容量的流程,否则就是默认初始化容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 给成员变量赋值
                    table = tab = nt;
                    // 给 sc 赋值,现在 sc 存放的是扩容阈值
                    // sc为0.75数组大小
                    sc = n - (n >>> 2);
                }
            } finally {
                // 赋值给对应的成员变量
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

添加元素

下面,来看看如何向 ConcurrentHashMap 中添加元素。
要向 ConcurrentHashMap 中添加元素可分为这几个步骤:

  1. 计算 key 对应的 hash 值,根据 hash 定位到对应的数组下标
  2. 先快速判断一下对应下标是否为空,如果为空,直接使用 cas 将元素写入
  3. 否则的话就需要遍历对应数组位置的链表/红黑树,为保证遍历过程中数据不发生变化,需要先给对应数组位置加锁。
  4. 如果在遍历过程中遇到相同的 key 就直接覆盖,否则就添加到尾部。

大致理解一下流程应该能帮助大家更好的理解源码,接下来直接上硬菜:

// 向 ConcurrentHashMap 中添加元素
public V put(K key, V value) {
    return putVal(key, value, false);
}
// 添加元素中实际调用的函数
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 边界条件判断,ConcurrentHashMap 是不允许添加 null 的
    if (key == null || value == null) throw new NullPointerException();
    // 计算 hash
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    // 先把table的值存到tab里
    // 这里加了一个循环,和上面初始化时候的循环作用类似
    for (Node<K,V>[] tab = table;;) {
        // f 用来存放 key 对应数组位置的第一个节点
        Node<K,V> f; int n, i, fh;
        // 如果数组"空",进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            // 上面已经说过了
            tab = initTable();

        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,
            // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
            // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,这个可以先放着,涉及到后面扩容的知识
        else if ((fh = f.hash) == MOVED)
            // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
            tab = helpTransfer(tab, f);

        // 到这里就是说,hash 对应的数组位置已经有值了,需要遍历链表/红黑树
        else { 

            V oldVal = null;
            // 对该数组位置加锁
            // 接下来就是遍历了
            // 如果有相同的 key 就覆盖,否则添加到尾部
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 头节点的 hash 值大于 0,说明是链表
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树的遍历,这里可以不用看了,很复杂,面试也不大可能会考
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
                    // 具体源码我们就不看了,扩容部分后面说
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

获取元素

获取元素的逻辑和添加元素差不多,这里就不在赘述了:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算 hash 值
    int h = spread(key.hashCode());
    // 根据 hash 值确定节点位置
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果搜索到的节点 key 与传入的 key 相同且不为 null,直接返回这个节点  
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 这边涉及到一些后面扩容的知识,可以先不看,等看完后面扩容的知识在来看就懂了
        // 如果 eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了
        // 所以这个 find 方法是树节点的 find 方法或者是 fwd 节点的 find 方法
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
         // 否则遍历链表 找到对应的值并返回
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
 // 这是 ForwardingNode 节点的 find 方法   
Node<K,V> find(int h, Object k) {
    // 注意这里的nextTable 是扩容时传进来的
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        // 没找到直接返回 null
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        // 自旋
        for (;;) {
            int eh; K ek;
            // 找到了就返回节点
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // 同样要判断是树节点还是 ForwardingNode 节点
            if (eh < 0) {
                // ForwardingNode 节点就继续往里找
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    // 树节点 就调用数节点的 find 方法
                    return e.find(h, k);
            }
            // 没找到就返回n ull
            if ((e = e.next) == null)
                return null;
        }
    }
}

扩容

在 ConcurrentHashMap 中还有一个非常重要的知识点,就是 ConcurrentHashMap 的扩容,与 HashMap 的库容不同,由于 ConcurrentHashMap 需要再扩容的过程中不仅要有高性能,还要保证线程的安全,因此 ConcurrentHashMap 的扩容过程可以说是相当的复杂,希望大家能够耐下心来慢慢慢看,我也会尽力帮大家理解 ConcurrentHashMap 的扩容思想。
总体来说,ConcurrentHashMap 的**核心思想就是:分而治之。**在扩容过程中,ConcurrentHashMap 将数组划分成若干个不重合的任务包,并将任务包分配给多个线程,来达到提高性能的同时保证线程安全的目的。
但是体现在源码中,可不像说起来这么简单。来,上硬菜:

// tryPresize 会在将链表转换为红黑树的 treeifyBin 中调用
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
// 也就是说,如果当前 ConcurrentHashMap 的数组长度为8,那么 size 就是 16
private final void tryPresize(int size) {
    // c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    // 注意,c 不表示扩容后的数组大小
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;

        // 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 0.75 * n
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 这个分支负责处理一些边界情况
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            // 扩容戳,为一个负值
            // 高16为为扩容标记
            // 低16为为扩容线程数+1,从2开始是因为1已经作为初始化标记
            // 这个函数会在后面详细分析
            int rs = resizeStamp(n);

            // 这个分支表示当前已经开始扩容,用来帮助扩容
            if (sc < 0) {
                Node<K,V>[] nt;
                // 这个分支用来处理边界,只要满足一个条件就直接退出,不帮助扩容
                    // RESIZE_STAMP_SHIFT 表示左移16位,用来比较扩容标记是否相同
                    // 这个条件有问题,不会走到
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                    // 最后一个扩容进程在执行最后一个任务
                    sc == rs + 1 ||
                    // 帮助扩容的线程数达到上线
                    sc == rs + MAX_RESIZERS || 
                    // 新数组还没有初始化完成
                    (nt = nextTable) == null ||
                    // 所有的任务包都已经分发完
                    transferIndex <= 0)
                    break;
                // cas 将 sc+1 表示扩容进程数+1,然后开始帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }

            // 第一个开始扩容的线程,cas 将 sc 赋值为 rs+2 开始,开始扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

从上面可以看到,具体实现扩容的逻辑都通过调用 transfer 来实现,接下来就来看看 transfer 函数:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // n 是原数组的长度
    int n = tab.length, stride;

    // 前面说了,扩容的核心思想就分而治之,将数组划分为多个任务包,stride就是每个任务包的大小
    // stride 在单核下直接等于 n,就等于只有一个任务包
    // 多核模式下为 (n>>>3)/NCPU,最小值是 16
    // 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果 nextTab 为 null,先进行一次初始化
    // 前面我们说了,外围会保证第一个发起扩容的线程调用此方法时,参数 nextTab 为 null
    // 之后帮助扩容的线程调用此方法时,nextTab 不会为 null
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的属性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
        transferIndex = n;
    }

    int nextn = nextTab.length;

    // ForwardingNode 翻译过来就是正在被迁移的 Node
    // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
    // 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
    // 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
    // 所以它其实相当于是一个标志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);


    // advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    // 初始化为 true
    boolean advance = true;
    // 迁移工作是否全部完成
    boolean finishing = false; // to ensure sweep before committing nextTab

    /*
     * 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
     * 
     */

    // i 是位置索引,bound 是边界,注意是从后往前
    // 这里可以理解为初始化,并不是要使用的值
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        // advance 为 true 表示可以进行下一个位置的迁移了
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 第一次会走这个分支,将 transferIndex 赋值为 nextIndex-stride
            // 第一个进程就负责处理 nextIndex-stride 到 nextIndex 这一个任务包
            // 任务包会被从后往前处理
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
                bound = nextBound;
                i = nextIndex - 1;
                // 刚开始当前任务肯定没完成
                advance = false;
            }
        }
            // 所有任务完成
        if (i < 0 || 
            // 不应该发生
            i >= n || 
            // 不应该发生
            i + n >= nextn) {
            int sc;
            // 扩容完成
            if (finishing) {
                // 所有的迁移操作已经完成
                nextTable = null;
                // 将新的 nextTab 赋值给 table 属性,完成迁移
                table = nextTab;
                // 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }

            // 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
            // 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任务结束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;

                // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 下面几个分支就是进行迁移的代码了,比较简单
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头节点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        // 需要将链表一分为二,
                        // 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                        // lastRun 之前的节点需要进行克隆,然后分到两个链表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    // 红黑树可以不看
                    else if (f instanceof TreeBin) {
                        // 红黑树的迁移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数小于等于6,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        //    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                }
            }
        }
    }
}

恭喜你,看到这里你已经对于扩容有了相当深入的理解!扩容的代码确实相当的硬核,需要大家有一点耐心,慢慢理解。

答疑

在源码中还有一些用一句两句话很难解释清楚的问题,就在这部分进行更详细的解读

扩容戳有什么用

先来填个前面留下的坑
image.png
resizeStamp 会生成一个扩容戳,要理解
要理解扩容戳,必须从二进制的角度来分析。resizeStamp 方法就一句话:

static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

其中 RESIZE_STAMP_BITS是一个默认值 16。这里面关键就是 Integer.numberOfLeadingZeros(n) 这个方法,这个方法源码就不贴出来了,实际上这个方法就是做一件事,那就是获取当前数据转成二进制后的最高位的 1 前的 0 的个数。这句话有点拗口,举个例子:
以 16 为准,16 转成二进制是10000,最高非 0 位是在第 5 位,因为 int 类型是 32 位,所以他前面还有 27 位,而且都是 0,那么这个方法得到的结果就是 27(1 的前面还有 27 个 0)。
然后1 << (RESIZE_STAMP_BITS - 1)在当前版本就是1<<15,也就是得到一个二进制数1000 0000 0000 0000,这里也是要做一件事,把这个 1 移动到第 16 位。最后这两个数通过|操作一定得到的结果就是第 16 位是 1,因为 int 是 32 位,最多也就是 32 个 0,而且因为 n 的默认大小是 16(ConcurrentHashMap 默认大小),所以实际上最多也就是 27(11011)个 0,也就是说这个数最高位的 1 也只是在第五位,执行|运算最多也就是影响低 5 位的结果。扩容戳在被使用的时候会先左移 16 位,然后赋值给 sc,所以这边计算出的扩容戳,其实是高 16 位。上面说的高 16 位代表当前扩容的标记,低 16 位代表了扩容的线程数的扩容戳实际上指的是 rs 赋值后的 sizeCtl。

注意:这里之所以要保证第 16 位为 1,是为了保证 sizeCtl 变量为负数,扩容戳在被使用时会先左移 16 位,这样最高位永远为 1 保证 sizeCtl 变量为负数。

首次扩容为什么计数要 +2 而不是 +1

我们上面已经说了扩容戳的作用:

  • 高 16 位代表当前扩容的标记
  • 低 16 位代表了扩容的线程数。

知道了这两个条件就好理解了,因为 rs 最终是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,所以应该 +1,但是这里是 +2,原因是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以如果直接 +1 是会和标志位发生冲突。为了避开标志位,所以初始化第一次记录扩容线程数的时候才需要 +2。

点关注,不迷路

好了,以上就是这篇文章的全部内容了,如果你能看到这里,非常感谢你的支持!
如果你觉得这篇文章写的还不错, 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!
白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !

最后推荐我的IM项目DiTing(https://github.com/danmuking/DiTing-Go),致力于成为一个初学者友好、易于上手的 IM 解决方案,希望能给你的学习、面试带来一点帮助,如果人才你喜欢,给个Star⭐叭!

参考资料

助力面试之ConcurrentHashMap面试灵魂拷问,你能扛多久
JUC集合: ConcurrentHashMap详解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/761511.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【电源拓扑】PFC

为什么开关电源中都有PFC电路 PFC电路就是功率矫正电路&#xff0c;目的是为了防止杂波对电网产生冲击 AC220V通过整流桥之后电压和电流的波形分析 PFC电路为什么选择是Boost升压电路 PFC电路为什么要把电压升高到400V 为了解决输入电压低于滤波电容电压这个矛盾&#xff0…

LDM-XRNY-102溜槽堵塞开关 JOSEF约瑟 接点容量:5A/380V

工作原理 当物料在溜槽中造成堵塞时&#xff0c;堆积的物料会给溜槽侧壁一个压力&#xff0c;从而推动LDM-XRNY-102溜槽堵塞开关的活动门向外或向内推移&#xff08;根据具体设计而定&#xff09;。 当活动门偏转一个设定的角度时&#xff0c;其控制开关会动作&#xff0c;发出…

基于Python的自动化测试框架-Pytest总结-第一弹基础

Pytest总结第一弹基础 入门知识点安装pytest运行pytest测试用例发现规则执行方式命令行执行参数 配置发现规则 如何编写测试Case基础案例断言语句的使用pytest.fail() 和 Exceptions自定义断言函数异常测试测试类形式 pytest的Fixture使用Fixture入门案例使用fixture的Setup、T…

[A133]全志u-boot中的I2C驱动分析

[A133]全志u-boot中的I2C驱动分析 hongxi.zhu 2024-6-27 一、IIC标准读写时序 IIC是高位(MSB)先传输 二、代码流程 2.1主机写数据 brandy/brandy-2.0/u-boot-2018/drivers/i2c/sunxi_i2c.c static int sunxi_i2c_write(struct i2c_adapter *adap, uint8_t chip,uint32_t addr…

深入解析 androidx.databinding.BaseObservable

在现代 Android 开发中&#xff0c;数据绑定 (Data Binding) 是一个重要的技术&#xff0c;它简化了 UI 和数据之间的交互。在数据绑定框架中&#xff0c;androidx.databinding.BaseObservable 是一个关键类&#xff0c;用于实现可观察的数据模型。本文将详细介绍 BaseObservab…

Centos7安装Minio笔记

一、Minio概述 Minio是一款开源的对象存储服务器&#xff0c;可以运行在多种操作系统上&#xff0c;包括Linux、Windows和MacOS等。提供一种简单、可扩展、高可用的对象存储解决方案&#xff0c;支持多种数据格式&#xff0c;包括对象、块和文件等。Minio是一款强大、灵活、可…

基于若依(ruoyi-vue)的周报管理系统

喂wangyinlon 填报人页面 审批人 审批不通过,填报人需要重新填写.

智慧校园新气象:校园气象站

在数字化、智能化的浪潮下&#xff0c;传统校园正在迎来一场革命性的变革。在这场变革中&#xff0c;校园气象站以其独特的功能和魅力&#xff0c;成为推动校园气象科普教育、提升校园品质的重要力量。 一、校园气象站&#xff1a;智慧校园的“气象眼” 校园气象站&#xff0c…

宠物医院管理系统-计算机毕业设计源码07221

目 录 1 绪论 1.1 选题背景和意义 1.2国内外研究现状 1.3论文结构与章节安排 2 宠物医院管理系统系统分析 2.1 可行性分析 2.1.1技术可行性分析 2.1.2 操作可行性分析 2.1.3 法律可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分…

Ansible 最佳实践:现代 IT 运维的利器

Ansible 最佳实践&#xff1a;现代 IT 运维的利器 Ansible 是一种开源的 IT 自动化工具&#xff0c;通过 SSH 协议实现远程节点和管理节点之间的通信&#xff0c;适用于配置管理、应用程序部署、任务自动化等多个场景。本文将介绍 Ansible 的基本架构、主要功能以及最佳实践&a…

为什么80%的码农都做不了架构师?

文章目录 一、技术广度和深度的要求1.1 技术广度1.2 技术深度 二、全局视角和系统思维2.1 全局视角2.2 系统思维 三、沟通能力和团队合作3.1 沟通能力3.2 团队合作 四、业务理解和需求分析4.1 业务理解4.2 需求分析 五、持续学习和创新能力5.1 持续学习5.2 创新能力 六、总结 &…

鸿蒙:页面路由使用

页面路由使用步骤&#xff1a; 1.导入Router模块 2.使用路由功能&#xff0c;以pushUrl模式为例 3.接收参数、返回 4.此时的路由是不能使用的&#xff0c;需要到main_pages.json中进行注册

FFmpeg视频处理工具安装使用

一、前言 FFmpeg是流行的开源视频处理工具&#xff0c;用于转码、合并、编辑等。以下是安装和使用方法&#xff1a; 二、步骤 1.下载 1.1 ffmpeg下载 官网下载地址 wget https://www.ffmpeg.org/releases/ffmpeg-6.1.1.tar.xz1.2 nasm下载 https://www.nasm.us/pub/nasm/…

PHP安龙县农产品销售网站-计算机毕业设计源码13137

目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3论文结构与章节安排 2 相关技术介绍 2.1 PHP描述 2.2 MySQL数据库 2.3 Think PHP框架 3网站分析 3.1 可行性分析 3.2 网站流程分析 3.2.1 数据新增流程 3.2.2 数据删除流程 3.3 网站功能分析 3.3.1 功能性分析…

VSCode创建并运行html页面(使用Live Server插件)

目录 一、参考博客二、安装Live Server插件三、新建html页面3.1 选择文件夹3.2 新建html文件3.3 快速生成html骨架 四、运行html页面 一、参考博客 https://blog.csdn.net/zhuiqiuzhuoyue583/article/details/126610162 https://blog.csdn.net/m0_74014525/article/details/13…

偏微分方程算法之抛物型方程差分格式编程示例八(紧交替方向隐格式)

目录 一、研究问题 二、C++代码 三、计算结果 一、研究问题 示例七中采用交替方向格式进行抛物型方程求解,这里继续以紧交替方向隐格式对相同的问题进行求解。 紧交替方向隐格式的原理及推导请参考: 偏微分方程算法之二维初边值问题(紧交替方向隐格式)_二维抛物方程的p…

Kafka-时间轮和延迟操作-源码流程

TimingWheel 字段&#xff1a; buckets&#xff1a;Array.tabulate[TimerTaskList]类型&#xff0c;其每一个项都对应时间轮中的一个时间格&#xff0c;用于保存 TimerTaskList的数组。在TimingWheel中&#xff0c;同一个TimerTaskList中的不同定时任务的到期时间可能 不同&a…

小型语言模型的兴起

过去几年&#xff0c;我们看到人工智能能力呈爆炸式增长&#xff0c;其中很大一部分是由大型语言模型 (LLM) 的进步推动的。GPT-3 等模型包含 1750 亿个参数&#xff0c;已经展示了生成类似人类的文本、回答问题、总结文档等能力。然而&#xff0c;虽然 LLM 的能力令人印象深刻…

海洋海事NEMA2000耐腐蚀不锈钢航空插头插座

海洋海事NEMA2000耐腐蚀不锈钢航空插头插座是为适应海洋环境中船舶使用的特殊要求而设计的。这类插头插座不仅要满足基本的电气连接功能&#xff0c;还要具备耐海水腐蚀、防水、防尘、防震等特性&#xff0c;以确保在恶劣的海上环境下仍能保持稳定的性能。 NMEA 2000插头插座的…

cesium自定义弹框

token记得换成您自己的&#xff01;&#xff01;&#xff01; 申请cesium的token 官网【Cesium: The Platform for 3D Geospatial】 pickEllipsoid在加载地形的情况下有一定误差&#xff0c;地形凹凸程度越大&#xff0c;误差越大。 pickPosition在depthTestAgainstTerrain …