ConcurrentHashMap是如何实现线程安全的你知道吗
作者:桐花思雨 时间:2023-11-28 23:14:25
1. 前言
我们知道,在日常开发中使用的 HashMap 是线程不安全的,而线程安全类 HashTable 和 SynchronizedMap 只是简单的在方法上加锁实现了线程安全,效率低下,所以在线程安全的环境下我们通常会使用 ConcurrentHashMap,那么 ConcurrentHashMap 又是如何实现线程安全的呢?
2. ConcurrentHashMap 是如何实现线程安全的
针对这个问题,可以从以下几个方面来阅读源码予以解答
2.1. 初始化数据结构时的线程安全
在 JDK 1.8 中,初始化 ConcurrentHashMap 的时候这个 Node[] 数组是还未初始化的,会等到第一次 put() 方法调用时才初始化
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 判断Node数组为空
if (tab == null || (n = tab.length) == 0)
// 初始化Node数组
tab = initTable();
......
}
此时会有并发问题的,如果多个线程同时调用 initTable() 初始化 Node[] 数组怎么办?
看看 Doug Lea 大师是如何处理的
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 每次循环都获取最新的Node[]数组引用
while ((tab = table) == null || tab.length == 0) {
// sizeCtl是一个标记位,若为-1,代表有线程在进行初始化工作了
if ((sc = sizeCtl) < 0)
// 让出CPU时间片
Thread.yield();
// 此时,代表没有线程在进行初始化工作,CAS操作,将本实例的sizeCtl变量设置为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 如果CAS操作成功了,代表本线程将负责初始化工作
try {
// 再检查一遍数组是否为空
if ((tab = table) == null || tab.length == 0) {
// 在初始化ConcurrentHashMap时,sizeCtl代表数组大小,默认16
// 所以此时n默认为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将其赋值给table变量
table = tab = nt;
// 通过位运算,n减去n二进制右移2位,相当于乘以0.75
// 例如16经过运算为12,与乘0.75一样,只不过位运算更快
sc = n - (n >>> 2);
}
} finally {
// 将计算后的sc(12)直接赋值给sizeCtl,表示达到12长度就扩容
// 由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题,只需要保证可见性
sizeCtl = sc;
}
break;
}
}
return tab;
}
table 变量使用了 volatile 来保证每次获取到的都是最新写入的值
transient volatile Node<K,V>[] table;
ConcurrentHashMap 源码中 sizeCtl 变量注释如下
// 表初始化和调整控件大小。如果为负值,则表正在初始化或调整大小:-1用于初始化,否则-(1+活动调整大小线程的数量)
// 否则,当table为null时,将保留创建时使用的初始表大小,默认值为0。初始化后,保存下一个要调整表大小的元素计数值
private transient volatile int sizeCtl;
在 ConcurrentHashMap 初始化时,初始化 sizeCtl
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
2.1.1. 总结
就算有多个线程同时进行 put 操作,在初始化 Node[] 数组时,使用了 CAS 操作来决定到底是哪个线程有资格进行初始化,其他线程只能等待。
用到的并发技巧如下:
volatile 修饰 sizeCtl 变量:它是一个标记位,用来告诉其他线程这个坑位有没有线程在进行初始化工作,其线程间的可见性由 volatile 保证
CAS 操作:CAS 操作保证了设置 sizeCtl 标记位的原子性,保证了在多线程同时进行初始化 Node[] 数组时,只有一个线程能成功
2.2. put 操作时的线程安全
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// K,V 都不能为空
if (key == null || value == null) throw new NullPointerException();
// 取得 key 的 hash 值
int hash = spread(key.hashCode());
// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转换为树
int binCount = 0;
// 数组的遍历,自旋插入结点,直到成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 当Node[]数组为空时,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// Unsafe类volatile的方式取出hashCode散列后通过与运算得出的Node[]数组下标值对应的Node对象
// 此时 Node 位置若为 null,则表示还没有线程在此 Node 位置进行插入操作,说明本次操作是第一次
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果这个位置没有元素的话,则通过 CAS 的方式插入数据
if (casTabAt(tab, i, null,
// 创建一个 Node 添加到数组中,null 表示的是下一个节点为空
new Node<K,V>(hash, key, value, null)))
// 插入成功,退出循环
break;
}
// 如果检测到某个节点的 hash 值是 MOVED,则表示正在进行数组扩容
else if ((fh = f.hash) == MOVED)
// 帮助扩容
tab = helpTransfer(tab, f);
// 此时,说明已经有线程对Node[]进行了插入操作,后面的插入很有可能会发生Hash冲突
else {
V oldVal = null;
// ----------------synchronized----------------
synchronized (f) {
// 二次确认此Node对象还是原来的那一个
if (tabAt(tab, i) == f) {
// ----------------table[i]是链表结点----------------
if (fh >= 0) {
// 记录结点数,超过阈值后,需要转为红黑树,提高查找效率
binCount = 1;
// 遍历这个链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 要存的元素的 hash 值和 key 跟要存储的位置的节点的相同的时候,替换掉该节点的 value 即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将新值放到链表的最末端
Node<K,V> pred = e;
// 如果不是同样的 hash,同样的 key 的时候,则判断该节点的下一个节点是否为空
if ((e = e.next) == null) {
// ----------------“尾插法”插入新结点----------------
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// ----------------table[i]是红黑树结点----------------
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用putTreeVal方法,将该元素添加到树中去
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
if (binCount >= TREEIFY_THRESHOLD)
// 链表 -> 红黑树 转换
treeifyBin(tab, i);
// 表明本次put操作只是替换了旧值,不用更改计数值
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);// 计数值加1
return null;
}
值得关注的是 tabAt(tab, i) 方法,其使用 Unsafe 类 volatile 的操作 volatile 式地查看值,保证每次获取到的值都是 最新 的
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
虽然上面的 table 变量加了 volatile,但也只能保证其引用的可见性,并不能确保其数组中的对象是否是最新的,所以需要 Unsafe 类 volatile 式地拿到最新的 Node
2.2.1. 总结
put() 方法的核心思想:由于其减小了锁的粒度,若 Hash 完美不冲突的情况下,可同时支持 n 个线程同时 put 操作,n 为 Node 数组大小,在默认大小 16 下,可以支持最大同时 16 个线程无竞争同时操作且线程安全
当 Hash 冲突严重时,Node 链表越来越长,将导致严重的锁竞争,此时会进行扩容,将 Node 进行再散列,下面会介绍扩容的线程安全性。
总结一下用到的并发技巧
减小锁粒度:将 Node 链表的头节点作为锁,若在默认大小 16 情况下,将有 16 把锁,大大减小了锁竞争(上下文切换),就像开头所说,将串行的部分最大化缩小,在理想情况下线程的 put 操作都为并行操作。同时直接锁住头节点,保证了线程安全
使用了 volatile 修饰 table 变量,并使用 Unsafe 的 getObjectVolatile() 方法拿到最新的 Node
CAS 操作:如果上述拿到的最新的 Node 为 null,则说明还没有任何线程在此 Node 位置进行插入操作,说明本次操作是第一次
synchronized 同步锁:如果此时拿到的最新的 Node 不为 null,则说明已经有线程在此 Node 位置进行了插入操作,此时就产生了 hash 冲突;此时的 synchronized 同步锁就起到了关键作用,防止在多线程的情况下发生数据覆盖(线程不安全),接着在 synchronized 同步锁的管理下按照相应的规则执行操作
当 hash 值相同并 key 值也相同时,则替换掉原 value
否则,将数据插入链表或红黑树相应的节点
2.3. get 操作时的线程安全
对于 get 操作其实没有线程安全的问题,只有可见性的问题,只需要确保 get 的数据是线程之间可见的即可
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 重新计算key的hash值
int h = spread(key.hashCode());
// table不能为null,且table[i]不能为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 检查头结点,table[i]就是待查找的项,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash值<0, 说明遇到特殊结点(非链表结点), 调用find()方法查找
else if (eh < 0)
// 调用节点对象的find方法查找值
return (p = e.find(h, key)) != null ? p.val : null;
// 按链表方式查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
在 get 操作中除了增加了迁移的判断以外,基本与 HashMap 的 get 操作无异,这里不多赘述,值得一提的是这里使用了 tabAt() 方法 Unsafe 类 volatile 的方式去获取 Node[] 数组中的 Node,保证获得到的 Node 是最新的
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
2.4. 扩容操作时的线程安全
在扩容时,ConcurrentHashMap 支持多线程并发扩容,在扩容过程中同时支持 get 查数据,若有线程 put 数据,还会帮助一起扩容,这种无阻塞算法,将并行最大化的设计,堪称一绝
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride可理解成“步长”,即“数据迁移”时,每个线程要负责旧table中的多少个桶,根据几核的CPU决定“步长”
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 本线程分到的迁移量,假设为16(默认也为16)
stride = MIN_TRANSFER_STRIDE;
// 说明第一次扩容
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// 创建新table数组,扩大一倍为 32,n还为16
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// 将表示容量的sizeCtl 设置为最大值,然后返回
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 表示当前线程要进行数据迁移的桶区间
transferIndex = n;
}
int nextn = nextTab.length;
// 在get或者put时若遇到此 Node,则可以知道当前Node正在迁移
// ForwardingNode结点,当旧table的某个桶中的所有结点都迁移完后,用该结点占据这个桶
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 标识一个桶的迁移工作是否完成,advance == true 表示可以进行下一个位置的迁移
boolean advance = true;
// 最后一个数据迁移的线程将该值置为true,并进行本轮扩容的收尾工作
boolean finishing = false;
// i标识桶索引, bound标识边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 每一次自旋前的预处理,主要是为了定位本轮处理的桶区间
// 正常情况下,预处理完成后:i == transferIndex-1:右边界
// bound == transferIndex-stride:左边界
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 当前是处理最后一个tranfer任务的线程或出现扩容冲突
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {// 所有桶迁移均已完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 扩容线程数减1,表示当前线程已完成自己的transfer任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前线程是否是本轮扩容中的最后一个线程,如果不是,则直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
/**
* 最后一个数据迁移线程要重新检查一次旧table中的所有桶,看是否都被正确迁移到新table了:
* ①正常情况下,重新检查时,旧table的所有桶都应该是ForwardingNode;
* ②特殊情况下,比如扩容冲突(多个线程申请到了同一个transfer任务),此时当前线程领取的任务会作废,那么最后检查时,
* 还要处理因为作废而没有被迁移的桶,把它们正确迁移到新table中
*/
i = n;
}
}
// 旧桶本身为null,不用迁移,直接尝试放一个ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该旧桶已经迁移完成,直接跳过
else if ((fh = f.hash) == MOVED)
advance = true;
// 该旧桶未迁移完成,进行数据迁移
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 桶的hash>0,说明是链表迁移
if (fh >= 0) {
/**
* 下面的过程会将旧桶中的链表分成两部分:ln链和hn链
* ln链会插入到新table的槽i中,hn链会插入到新table的槽i+n中
*/
int runBit = fh & n;
// lastRun指向最后一个相邻runBit不同的结点
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 以lastRun所指向的结点为分界,将链表拆成2个子链表ln、hn
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // ln链表存入新桶的索引i位置
setTabAt(nextTab, i + n, hn); // hn链表存入新桶的索引i+n位置
setTabAt(tab, i, fwd); // 设置ForwardingNode占位
advance = true;// 表示当前旧桶的结点已迁移完毕
}
else if (f instanceof TreeBin) {
/**
* 下面的过程会先以链表方式遍历,复制所有结点,然后根据高低位组装成两个链表;
* 然后看下是否需要进行红黑树转换,最后放到新table对应的桶中
*/
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 判断是否需要进行 红黑树 <-> 链表 的转换
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // 设置ForwardingNode占位
advance = true; // 表示当前旧桶的结点已迁移完毕
}
}
}
}
}
}
2.4.1. 扩容时的 get 操作
假设 Node下标为 16 的 Node 节点正在迁移扩容,突然有一个线程进来调用 get() 方法,正好 key 又散列到下标为 16 的节点,此时怎么办?
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 重新计算key的hash值
int h = spread(key.hashCode());
// table不能为null,且table[i]不能为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 检查头结点,table[i]就是待查找的项,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash值<0, 说明遇到特殊结点(非链表结点), 调用find()方法查找
else if (eh < 0)
// 调用节点对象的find方法查找值
return (p = e.find(h, key)) != null ? p.val : null;
// 按链表方式查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
在 get() 操作的源码中,会判断 Node 中的 hash 是否小于 0(eh < 0),是否还记得我们的占位 Node,其 hash 为 MOVED,为常量值 -1,所以此时判断线程正在迁移,委托给内部类 ForwardingNode 占位 Node 去查找值
// //内部类ForwardingNode 中 find() 方法
Node<K,V> find(int h, Object k) {
// 这里的查找,是去新Node数组中查找的
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
到这里之所以占位 Node 需要保存新 Node[] 数组的引用也是因为这个,它可以支持在迁移的过程中照样不阻塞地查找值,可谓是精妙绝伦的设计
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
2.4.2. 多线程协助扩容
在 put 操作时,假设正在迁移扩容,正好有一个线程进来,想要 put 值到迁移的 Node上,怎么办?
在 put() 方法中调用了 helpTransfer() 方法
// put() 方法中的代码片段,帮助扩容
tab = helpTransfer(tab, f);
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// sizeCtl加 1,表示多一个线程进来协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
此方法涉及大量复杂的位运算,这里只是简单的说几句,此时 sizeCtl 变量用来表示 ConcurrentHashMap 正在扩容,当其准备扩容时,会将 sizeCtl 设置为一个负数
2.4.3. 总结
ConcurrentHashMap 运用各类 CAS 操作,将扩容操作的并发性能实现最大化,在扩容过程中,
就算有线程调用 get 查询方法,也可以安全的查询数据
若有线程进行 put 操作,还会协助扩容
利用 sizeCtl 标记位和各种 volatile 变量进行 CAS 操作达到多线程之间的通信、协助,在迁移扩容过程中只锁一个 Node 节点,即保证了线程安全,又提高了并发性能
3. 什么情况下 ConcurrentHashMap 会进行扩容操作
在 put 值时,发现 Node 为占位 Node(ForwardingNode)时,会协助扩容
// 在 put() 方法中的代码片段
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
在 put 值时,检测到单链表长度大于 8 时
final V putVal(K key, V value, boolean onlyIfAbsent) {
......
if (binCount != 0) {
// TREEIFY_THRESHOLD=8,当链表长度大于8时
if (binCount >= TREEIFY_THRESHOLD)
// 调用treeifyBin方法
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
......
}
treeifyBin() 方法会将单链表转换为红黑树,增加查找效率,但在这之前,会检查数组长度,若小于 64,则会优先做扩容操作
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY = 64,若数组长度小于64,则先扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
// 转换为红黑树的操作
......
}
}
}
}
在每次 put 值之后,都会调用 addCount() 方法,检测 Node[] 数组大小是否达到阈值
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 统计元素个数的操作
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 元素个数达到阈值,进行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 发现sizeCtl为负数,证明有线程正在迁移扩容
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 不为负数,则为第一个迁移的线程
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
来源:https://blog.csdn.net/weixin_38192427/article/details/122537790