一、容器初始化
1、源碼分析
在jdk8的ConcurrentHashMap中一共有5個構造方法,這四個構造方法中都沒有對內部的數組做初始化, 只是對一些變量的初始值做了處理
jdk8的ConcurrentHashMap的數組初始化是在第一次添加元素時完成
1
2
3
|
// 沒有維護任何變量的操作,如果調用該方法,數組長度默認是16 public ConcurrentHashMap() { } |
1
2
3
4
5
6
7
8
9
|
// 傳遞進來一個初始容量,ConcurrentHashMap會基于這個值計算一個比這個值大的2的冪次方數作為初始容量 public ConcurrentHashMap( int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); // 此處的初始容量計算結果為傳入的容量 + 傳入的容量的一半 + 1 this .sizeCtl = cap; } |
注意,調用這個方法,得到的初始容量和HashMap以及jdk7的ConcurrentHashMap不同,即使你傳遞的是一個2的冪次方數,該方法計算出來的初始容量依然是比這個值大的2的冪次方數
1
2
3
4
|
// 調用四個參數的構造 public ConcurrentHashMap( int initialCapacity, float loadFactor) { this (initialCapacity, loadFactor, 1 ); } |
1
2
3
4
5
6
7
8
9
10
11
12
|
// 計算一個大于或者等于給定的容量值,該值是2的冪次方數作為初始容量 public ConcurrentHashMap( int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 .0f) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = ( long )( 1.0 + ( long )initialCapacity / loadFactor); int cap = (size >= ( long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor(( int )size); this .sizeCtl = cap; } |
1
2
3
4
5
6
|
// 基于一個Map集合,構建一個ConcurrentHashMap // 初始容量為16 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this .sizeCtl = DEFAULT_CAPACITY; putAll(m); } |
2、sizeCtl
含義解釋
注意:以上這些構造方法中,都涉及到一個變量
sizeCtl
,這個變量是一個非常重要的變量,而且具有非常豐富的含義,它的值不同,對應的含義也不一樣,這里我們先對這個變量不同的值的含義做一下說明,后續源碼分析過程中,進一步解釋
sizeCtl
為0,代表數組未初始化, 且數組的初始容量為16
sizeCtl
為正數,如果數組未初始化,那么其記錄的是數組的初始容量,如果數組已經初始化,那么其記錄的是數組的擴容閾值
sizeCtl
為-1,表示數組正在進行初始化
sizeCtl
小于0,并且不是-1,表示數組正在擴容, -(1+n),表示此時有n個線程正在共同完成數組的擴容操作
3、其他屬性含義
代表整個哈希表
1
|
transient volatile Node<K,V>[] table; |
用于哈希表擴容,擴容完成后會被重置為null。
1
|
private transient volatile Node<K,V>[] nextTable; |
baseCount和counterCells一起保存著整個哈希表中存儲的所有的結點的個數總和。
1
2
|
private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; |
二、添加安全
1、源碼分析
1.1、添加元素put/putVal方法
1
2
3
|
public V put(K key, V value) { return putVal(key, value, false ); } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果有空值或者空鍵,直接拋異常 if (key == null || value == null ) throw new NullPointerException(); // 基于key計算hash值,并進行一定的擾動,這里計算的hash一定是正數,因為與7FFFFFFF進行了位與運算,負數的hash值另有他用 int hash = spread(key.hashCode()); // 記錄某個桶上元素的個數,如果超過8個(并且table長度>=64),會轉成紅黑樹 int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果數組還未初始化,先對數組進行初始化 if (tab == null || (n = tab.length) == 0 ) tab = initTable(); // 如果hash計算得到的桶位置沒有元素,利用cas將元素添加 else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { // cas+自旋(和外側的for構成自旋循環),保證元素添加安全 if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; // no lock when adding to empty bin } // 如果hash計算得到的桶位置元素的hash值為MOVED(-1),證明正在擴容,那么協助擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { // hash計算的桶位置元素不為空,且當前沒有處于擴容操作,進行元素添加 V oldVal = null ; // 對當前數組的第一個結點進行加鎖,執行添加操作,這里不僅保證了線程安全而且使得鎖的粒度相對較小 synchronized (f) { if (tabAt(tab, i) == f) { // 普通鏈表節點 if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; // 鏈表的遍歷找到最后一個結點進行尾插法(如果找到相同的key則會覆蓋) if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; // 找到了最后一個結點,尾插法插入新結點在最后 if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } // 樹節點,將元素添加到紅黑樹中 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { // 鏈表長度>=8,將鏈表轉成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) // (在該方法中會對table也就是數組長度進行判斷,>=64時才會進行轉樹,否則為數組擴容) treeifyBin(tab, i); // 如果是重復鍵,直接將舊值返回 if (oldVal != null ) return oldVal; break ; } } } // 添加的是新元素,維護集合長度,并判斷是否要進行擴容操作 addCount(1L, binCount); return null ; } |
通過以上源碼,我們可以看到,當需要添加元素時,會針對當前元素所對應的桶位進行加鎖操作,這樣一方面保證元素添加時,多線程的安全,同時對某個桶位加鎖不會影響其他桶位的操作,進一步提升多線程的并發效率
1.2、數組初始化,initTable方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // cas+自旋,保證線程安全,對數組進行初始化操作 while ((tab = table) == null || tab.length == 0 ) { // 如果sizeCtl的值(-1)小于0,說明此時正在初始化, 讓出cpu if ((sc = sizeCtl) < 0 ) Thread.yield(); // lost initialization race; just spin // cas修改sizeCtl的值為-1,修改成功,進行數組初始化,失敗,繼續自旋 else if (U.compareAndSwapInt( this , SIZECTL, sc, - 1 )) { try { // double checking,防止重復初始化 if ((tab = table) == null || tab.length == 0 ) { // sizeCtl為0,取默認長度16,否則去sizeCtl的值 int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings ( "unchecked" ) // 基于初始長度,構建數組對象 Node<K,V>[] nt = (Node<K,V>[]) new Node<?,?>[n]; table = tab = nt; // 計算擴容閾值,并賦值給sc // n就是當前數組的長度,當初始化完成后,sc記錄的是下次需要擴容的閾值 // n >>> 2 就相當于 n / 4 // 所以 n - (n >>> 2) 就相當于 n - n / 4 = n * 0.75,而0.75就是默認的加載因子 sc = n - (n >>> 2 ); } } finally { //將擴容閾值,賦值給sizeCtl sizeCtl = sc; } break ; } } return tab; } |
2、圖解
2.1、put加鎖圖解
三、擴容安全
1、源碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 如果是多cpu,那么每個線程劃分任務,最小任務量是16個桶位的遷移 // 如果是單cpu,則沒必要劃分 if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果是擴容線程,此時新數組為null if (nextTab == null ) { // initiating try { @SuppressWarnings ( "unchecked" ) // 兩倍擴容創建新數組 Node<K,V>[] nt = (Node<K,V>[]) new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; // 記錄線程開始遷移的桶位,從后往前遷移 transferIndex = n; } // 記錄新數組的末尾 int nextn = nextTab.length; // 已經遷移的桶位,會用這個節點占位(這個節點的hash值為-1——MOVED) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; // to ensure sweep before committing nextTab for ( int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; // i記錄當前正在遷移桶位的索引值 // bound記錄下一次任務遷移的開始桶位 // --i >= bound 成立表示當前線程分配的遷移任務還沒有完成 if (--i >= bound || finishing) advance = false ; // 沒有元素需要遷移 -- 后續會去將擴容線程數減1,并判斷擴容是否完成 else if ((nextIndex = transferIndex) <= 0 ) { i = - 1 ; advance = false ; } // 計算下一次任務遷移的開始桶位,并將這個值賦值給transferIndex else if (U.compareAndSwapInt ( this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } // 如果沒有更多的需要遷移的桶位,就進入該if if (i < 0 || i >= n || i + n >= nextn) { int sc; //擴容結束后,保存新數組,并重新計算擴容閾值,賦值給sizeCtl if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } // 擴容任務線程數減1 if (U.compareAndSwapInt( this , SIZECTL, sc = sizeCtl, sc - 1 )) { // 判斷當前所有擴容任務線程是否都執行完成 if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; // 所有擴容線程都執行完,標識結束 finishing = advance = true ; i = n; // recheck before commit } } // 當前遷移的桶位沒有元素,直接在該位置添加一個fwd節點 else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); // 當前節點已經被遷移 else if ((fh = f.hash) == MOVED) advance = true ; // already processed else { // 當前節點需要遷移,加鎖遷移,保證多線程安全 // 此處的遷移與hashmap類似 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } } |
2、圖解
四、多線程擴容效率改進(協助擴容)
多線程協助擴容的操作會在兩個地方被觸發:
① 當添加元素時,發現添加的元素對用的桶位為fwd節點,就會先去協助擴容,然后再添加元素
② 當添加完元素后,判斷當前元素個數達到了擴容閾值,此時發現sizeCtl的值小于0,并且新數組不為空,這個時候,會去協助擴容
每當有一個線程幫助擴容時,sc就會+1,有一個線程擴容結束時,sc就會-1,當sc重新回到(rs << RESIZE_STAMP_SHIFT) + 2這個值時,代表當前線程是最后一個擴容的線程,則擴容結束。
1、源碼分析
1.1、元素未添加,先協助擴容,擴容完后再添加元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; // no lock when adding to empty bin } // 發現此處為fwd節點,協助擴容,擴容結束后,再循環回來添加元素 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 省略代碼 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) { // 擴容,傳遞一個不是null的nextTab transfer(tab, nextTab); break ; } } return nextTab; } return table; } |
1.2、先添加元素,再協助擴容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
private final void addCount( long x, int check) { // 省略代碼 if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; // 元素個數達到擴容閾值 while (s >= ( long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // sizeCtl小于0,說明正在執行擴容,那么協助擴容 if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt( this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } } |
注意:擴容的代碼都在transfer方法中,這里不再贅述
2、圖解
五、集合長度的累計方式
1、源碼分析
1.1、addCount方法
① CounterCell數組不為空,優先利用數組中的CounterCell記錄數量
② 如果數組為空,嘗試對baseCount進行累加,失敗后,會執行fullAddCount邏輯
③ 如果是添加元素操作,會繼續判斷是否需要擴容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
private final void addCount( long x, int check) { CounterCell[] as; long b, s; // 當CounterCell數組不為空,則優先利用數組中的CounterCell記錄數量 // 或者當baseCount的累加操作失敗,會利用數組中的CounterCell記錄數量 if ((as = counterCells) != null || !U.compareAndSwapLong( this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 標識是否有多線程競爭 boolean uncontended = true ; // 當as數組為空 // 或者當as長度為0 // 或者當前線程對應的as數組桶位的元素為空 // 或者當前線程對應的as數組桶位不為空,但是累加失敗 if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 以上任何一種情況成立,都會進入該方法,傳入的uncontended是false fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; // 計算元素個數 s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; // 當元素個數達到擴容閾值 // 并且數組不為空 // 并且數組長度小于限定的最大值 // 滿足以上所有條件,執行擴容 while (s >= ( long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 這個是一個很大的正數 int rs = resizeStamp(n); // sc小于0,說明有線程正在擴容,那么會協助擴容 if (sc < 0 ) { // 擴容結束或者擴容線程數達到最大值或者擴容后的數組為null或者沒有更多的桶位需要轉移,結束操作 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; // 擴容線程加1,成功后,進行協助擴容操作 if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) // 協助擴容,newTable不為null transfer(tab, nt); } // 沒有其他線程在進行擴容,達到擴容閾值后,給sizeCtl賦了一個很大的負數 // 1+1=2 --》 代表此時有一個線程在擴容 // rs << RESIZE_STAMP_SHIFT)是一個很大的負數 else if (U.compareAndSwapInt( this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) // 擴容,newTable為null transfer(tab, null ); s = sumCount(); } } } |
1.2、fullAddCount方法
① 當CounterCell數組不為空,優先對CounterCell數組中的CounterCell的value累加
② 當CounterCell數組為空,會去創建CounterCell數組,默認長度為2,并對數組中的CounterCell的value累加
③ 當數組為空,并且此時有別的線程正在創建數組,那么嘗試對baseCount做累加,成功即返回,否則自旋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
private final void fullAddCount( long x, boolean wasUncontended) { int h; // 獲取當前線程的hash值 if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } // 標識是否有沖突,如果最后一個桶不是null,那么為true boolean collide = false ; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 數組不為空,優先對數組中CouterCell的value累加 if ((as = counterCells) != null && (n = as.length) > 0 ) { // 線程對應的桶位為null if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { // Try to attach new Cell // 創建CounterCell對象 CounterCell r = new CounterCell(x); // Optimistic create // 利用CAS修改cellBusy狀態為1,成功則將剛才創建的CounterCell對象放入數組中 if (cellsBusy == 0 && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { // Recheck under lock CounterCell[] rs; int m, j; // 桶位為空, 將CounterCell對象放入數組 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; // 表示放入成功 created = true ; } } finally { cellsBusy = 0 ; } if (created) //成功退出循環 break ; // 桶位已經被別的線程放置了已給CounterCell對象,繼續循環 continue ; // Slot is now non-empty } } collide = false ; } // 桶位不為空,重新計算線程hash值,然后繼續循環 else if (!wasUncontended) // CAS already known to fail wasUncontended = true ; // Continue after rehash // 重新計算了hash值后,對應的桶位依然不為空,對value累加 // 成功則結束循環 // 失敗則繼續下面判斷 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; // 數組被別的線程改變了,或者數組長度超過了可用cpu大小,重新計算線程hash值,否則繼續下一個判斷 else if (counterCells != as || n >= NCPU) collide = false ; // At max size or stale // 當沒有沖突,修改為有沖突,并重新計算線程hash,繼續循環 else if (!collide) collide = true ; // 如果CounterCell的數組長度沒有超過cpu核數,對數組進行兩倍擴容 // 并繼續循環 else if (cellsBusy == 0 && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { // Expand table unless stale CounterCell[] rs = new CounterCell[n << 1 ]; for ( int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } // CounterCell數組為空,并且沒有線程在創建數組,修改標記,并創建數組 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt( this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[ 2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } // 數組為空,并且有別的線程在創建數組,那么嘗試對baseCount做累加,成功就退出循環,失敗就繼續循環 else if (U.compareAndSwapLong( this , BASECOUNT, v = baseCount, v + x)) break ; // Fall back on using base } } |
2、圖解
fullAddCount方法中,當as數組不為空的邏輯圖解
六、集合長度獲取
1、源碼分析
1.1、size方法
1
2
3
4
5
6
|
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > ( long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : ( int )n); } |
1.2、sumCount方法
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 獲取baseCount的值 long sum = baseCount; if (as != null ) { // 遍歷CounterCell數組,累加每一個CounterCell的value值 for ( int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; } |
注意:這個方法并不是線程安全的
七、get方法
這個就很簡單了,獲得hash值,然后判斷存在與否,遍歷鏈表即可,注意get沒有任何鎖操作!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 計算key的hash值 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { // 表不為空并且表的長度大于0并且key所在的桶不為空 if ((eh = e.hash) h) { // 表中的元素的hash值與key的hash值相等 if ((ek = e.key) key || (ek != null && key.equals(ek))) // 鍵相等 // 返回值 return e.val; } else if (eh < 0 ) // 是個TreeBin hash = -2 // 在紅黑樹中查找,因為紅黑樹中也保存這一個鏈表順序 return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { // 對于結點hash值大于0的情況鏈表 if (e.hash h && ((ek = e.key) key || (ek != null && key.equals(ek)))) return e.val; } } return null ; } |
到此這篇關于JDK1.8中的ConcurrentHashMap源碼分析的文章就介紹到這了,更多相關JDK1.8 ConcurrentHashMap源碼內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/weixin_42856430/article/details/115420011