ConcurrentHashMap在我的面試生涯中,10次有8次是會被問到的,記得剛畢業(yè)那會,被問到ConcurrentHashMap源碼的無助與苦澀,無奈只能網(wǎng)上找了一些教程,背一背,才算是蒙混過關,當然其實這種法子是不推崇的,而且不能真正理解源碼真諦,遇到高手還是很容易就問出來的,那么我們這篇就一起來了解下ConcurrentHashMap如何成為一個理(meng)解(hun)源(guo)碼(guan)的高手,建議對Map不是很了解的可以看看我前面寫的一篇HashMap源碼解析。其實網(wǎng)上找了很多資料,尤其是并發(fā)擴容這一塊,沒找到什么有用的資料,沒辦法,只能自己慢慢啃了蠻長時間的,所以這塊也會詳細寫出來。
ConcurrentHashMap的數(shù)據(jù)結構和HashMap基本一致,這里就不重新畫了,copy一下之前畫的圖:
那么問題來了,既然兩個差不多,為啥又要搞個ConcurrentHashMap呢,這就要說到兩者之間的區(qū)別到底在哪里:
既生HashMap何生ConcurrentHashMap:
HashMap和ConcurrentHashMap的區(qū)別說起來很多,但在我看來,最本質的區(qū)別就是HashMap是線程不安全的,而ConcurrentHashMap是線程安全的,這時候可能有人要說話了,那HashTable不也是線程安全的嗎,如果只是為了線程安全,HashTable不就行了嗎,還非要搞一個ConcurrentHashMap,看看jdk1.8這玩意代碼都多少行了。
既生HashTable何生ConcurrentHashMap:
HashTable這里我就不贅述了,大體上就是HashMap加了個synchronized,區(qū)別有但不是很大,但ConcurrentHashMap從1.7的segment分段鎖,到1.8的node節(jié)點鎖,那代碼復雜度那么高,doug lea圖啥呢,這里我可以先給出個不標準答案: 性能,jdk代碼風格死扣的話,你會發(fā)現(xiàn)這幫人就是變態(tài),他可以多寫十行你看不懂的代碼,就為了提升幾毫秒的,更別說ConcurrentHashMap這么多的了。
廢話不多說,還是先看代碼吧:
初始化不聊了,put時會先去初始化,我們直接看put方法:
put:
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //非空校驗 int hash = spread(key.hashCode()); //再hash int binCount = 0; for (Node<K,V>[] tab = table;;) { //死循環(huán) Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //表示當前map為空,則需初始化操作 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //表示當前下標對應沒有值 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //直接cas設值(其實和hashMap差不多),cas失敗則下次循環(huán)繼續(xù) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) //ForwardingNode節(jié)點會進入 tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { //看到?jīng)]有,這里用的是synchronized,大神也覺得synchronized的性能至少不比ReentrantLock差吧 if (tabAt(tab, i) == f) { //這里校驗是為了防止加鎖前被其他線程修改了 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //節(jié)點存在,直接修改 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //節(jié)點不存在,加載鏈表尾部 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //紅黑樹操作 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //節(jié)點數(shù) if (binCount >= TREEIFY_THRESHOLD) // >=8,鏈表轉紅黑樹 treeifyBin(tab, i); if (oldVal != null) //節(jié)點存在,直接返回舊值 return oldVal; break; } } } addCount(1L, binCount); return null; }
put直接調用了putVal方法,putVal方法在非空校驗,再hash計算(為了更均勻分布)之后,開始自旋,做正事了,這里面寫了幾個if else :
1.判斷map是否為空,為空初始化
2.定位到下標對應的節(jié)點,為空則說明當前節(jié)點鏈表未生成,new 一個cas塞進去
3.判斷是否是ForwardingNode節(jié)點,是則需要協(xié)助擴容
4.都不是則正常塞值,判斷是鏈表還是紅黑樹,走不同新增流程
接下來判斷是否需要轉紅黑樹,最后返回,節(jié)點存在則返回舊值,否則返回null,addCount方法我們后面會詳細介紹,其實就是count加一,并判斷是否需要擴容。
我們首先來看一下初始化的邏輯:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) //說明此時正在初始化,讓出cpu資源 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //標識此時正在初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //未設值則賦默認值16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); //默認0.75倍擴容 } } finally { sizeCtl = sc; //擴容大小賦值 } break; } } return tab; }
其實看著注釋對照代碼還是很簡單的,就是初始化一下node數(shù)組大小,及擴容因子等。
再看協(xié)助擴容:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //這里就是判斷當前是否已經(jīng)在擴容中 int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //sizectl+1 transfer(tab, nextTab); //擴容 break; } } return nextTab; } return table; }
并發(fā)擴容:
這里主要做了一個校驗是否需要協(xié)助擴容,每多一個線程協(xié)助,sizectl+1,接下來主要看看擴容方法,這也是ConcurrentHashMap的點睛之筆:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating 說明是首個線程擴容 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //擴容兩倍新數(shù)組 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { //這個for循環(huán)主要做并發(fā)擴容時,多線程遷移節(jié)點控制 Node<K,V> f; int fh; while (advance) { //這個自旋主要是用于控制未被分配處理的桶(鏈表)的個數(shù) int nextIndex, nextBound; if (--i >= bound || finishing) //每次處理都減一,條件成立說明直接去擴容(只是控制下標)或已經(jīng)結束,當前線程退出擴容 advance = false; else if ((nextIndex = transferIndex) <= 0) { //成立說明已經(jīng)被分配完 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //cas修改下一個桶的位置,成功則更新下一個桶的位置 bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { //判斷是否結束 int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //查不到cas查詢,這個對性能的要求已經(jīng)到令人發(fā)指的地步了吧 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // advance = true; // already processed else { synchronized (f) { //鎖下面才是真正的節(jié)點遷移,可以發(fā)現(xiàn)這里鎖的是一個node節(jié)點 if (tabAt(tab, i) == f) { //鏈表遷移 Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) //這里的計算和hashMap查不多,ln是低位節(jié)點 ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //這里cas分別在高低位鏈表中,并將當前舊tab更新為forward節(jié)點 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //紅黑樹遷移 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
1.首個線程擴容,new 2倍大小Node數(shù)組
2.自旋并發(fā)擴容:
這里代碼寫的比較復雜,我也是花了挺長時間才理清楚這塊邏輯,我先解釋一下并發(fā)擴容的機制:
首先一個線程過來,他是需要去看他要在哪個位置(會去讀transferIndex的值)開始遷移,一個線程負責一塊區(qū)域,多個線程這里采用的是類似跳表的機制,這里有一個擴容因子stride,最小為16,每次跳的大小就是這個擴容因子控制的。然后要去cas嘗試是否可以占用這個節(jié)點(由于沒有加鎖,所以有可能其他線程先來占了),失敗繼續(xù)跳下一個節(jié)點,直到成功,每個線程負責的區(qū)域也就是下標為初始占用的地方(bound)到bound-stride的大小,如果當前線程把自己負責的區(qū)域擴容完,發(fā)現(xiàn)原本該下一個線程負責的節(jié)點沒有被占用,則還是會繼續(xù)往下擴容,否則就繼續(xù)往下跳直到結束,后面的線程也是如此。
了解完這些我們再來看代碼,首先看while (advance)里面:
1.首先判斷--i>=bound,這個是用來控制數(shù)組下標的,每次循環(huán)-1,當大于時,說明當前線程還在自己負責的擴容范圍,繼續(xù)擴容,finishing表示擴容結束。
2.當(nextIndex = transferIndex) <= 0條件成立時,說明所有下標都有線程負責擴容了,新來的線程就不需要操心了,同時也是將當前需擴容下標賦值給nextIndex
3.cas修改下一個桶的位置(即下一個線程開始擴容的下標),成功則說明自己是第一個占領的,接下來擴容就好了,失敗則表示已經(jīng)被占了,則繼續(xù)嘗試占領下一個山頭。
再看下一個if, if (i < 0 || i >= n || i + n >= nextn) 其實就是為了判斷當前擴容是否結束,其實里面就是根據(jù)sizectl大小cas計算,然后標記finishing=true,下一輪則直接return.
再往下看:校驗(f = tabAt(tab, i)) == null,這里如果下標為空,則cas設置一個forward節(jié)點占位
再往下看:else if ((fh = f.hash) == MOVED),這個表示當前節(jié)點是已擴容節(jié)點,置為true,回到開頭的地方去循環(huán)
最后便是真正的節(jié)點遷移,這里看注釋其實就差不多了,和hashMap單節(jié)點遷移是一樣的,拆分高低位遷移。
其實這里最核心也最難理解的不是遷移,而是如何控制多線程去并發(fā)的遷移還能互不影響,這里使用了大批量的cas,只在最關鍵的地方使用了synchronized,可以說在性能方面做到了極致。
其實理解了上面這些,再往下看就簡單了很多,協(xié)助擴容說完了,我們再看后面其實就是走下一輪循環(huán)鏈表新增節(jié)點或紅黑樹新增節(jié)點。接下來put方法還有一個核心就是addCount,我們先看代碼:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //這個if就是更新baseCount值,可以不關注 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { //主要看這里 Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //大小超出擴容參數(shù) && 數(shù)組不為空 && 長度未超出最大值,表示需要擴容 int rs = resizeStamp(n); if (sc < 0) { //表示已經(jīng)有線程在擴容/初始化 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) //表示其他線程正在初始化 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 協(xié)助擴容 transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //表示之前無其他線程擴容、初始化 transfer(tab, null); //直接擴容 s = sumCount(); } } }
你會發(fā)現(xiàn)除了將baseCount+1,就是判斷是否還要擴容,協(xié)助擴容,又聯(lián)系到上面去了。
put方法大體上說完了,哇,也太復雜了,那么接下來再看看get方法是啥樣的呢:
get:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //首先判斷hash計算當前下標下存在值 if ((eh = e.hash) == h) { //這里首先看第一個節(jié)點是否就是要找的值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //表示正在擴容或紅黑樹 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { //遍歷查詢 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
當hash小于0時,有可能為正在擴容(-1)或者紅黑樹(-2),這里主要介紹一下正在擴容時的查詢,直接看ForwardingNode.find:
Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { //首先將nextTable賦值給tab Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) 說明當前節(jié)點為空 return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) //首節(jié)點等于要查詢key return e; if (eh < 0) { if (e instanceof ForwardingNode) { //這種情況暫時還沒想到有什么情況會遇到 tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else //表示正在擴容的節(jié)點為紅黑樹 return e.find(h, k); } if ((e = e.next) == null) //查不到返回null return null; } } }
相比上面擴容,這些代碼確實很簡單了,全程沒用鎖,但也保證了并發(fā)及擴容時可以查到,這時候就有一個問題來了,那我如何能保證這些數(shù)據(jù)我都能實時查詢到呢,難道就不會出現(xiàn)數(shù)據(jù)在cpu緩存沒刷出來導致我查詢時有問題嗎,當然,doug lea大神肯定是會考慮到這些的,他用簡簡單單的一個volatile關鍵字完成了這一切,volatile關鍵字保證了數(shù)據(jù)的可見性,也就保證了我前面問題的并發(fā)安全,后續(xù)我也會補一章volitile的文章。