作者丨紅橙Darren
https://www.jianshu.com/p/0b452a6e4f4e
五個線程同時往 HashMap 中 put 數(shù)據(jù)會發(fā)生什么?
ConcurrentHashMap 是怎么保證線程安全的?
在分析 HashMap 源碼時還遺留這兩個問題,這次我們站在 Java 多線程內(nèi)存模型和 synchronized 的實現(xiàn)原理,這兩個角度來徹底分析一下。至于 JDK 1.8 的紅黑樹不是本文探討的內(nèi)容。
1. Java 多線程內(nèi)存模型
五個線程同時往 HashMap 中 put 數(shù)據(jù)會出現(xiàn)兩種現(xiàn)象,大概率會出現(xiàn)數(shù)據(jù)丟失,小概率會出現(xiàn)死循環(huán),我們不妨寫個測試代碼自己驗證一下。那為什么會出現(xiàn)這兩種現(xiàn)象,我們先來回顧一下之前的Java 多線程內(nèi)存模型。請看圖:
Java內(nèi)存模型中規(guī)定了所有的變量都存儲在主內(nèi)存中,每條線程還有自己的工作內(nèi)存,線程的工作內(nèi)存中保存了該線程使用到的變量到主內(nèi)存副本拷貝,線程對變量的所有操作(讀取、賦值)都必須在工作內(nèi)存中進(jìn)行,而不能直接讀寫主內(nèi)存中的變量。不同線程之間無法直接訪問對方工作內(nèi)存中的變量,線程間變量值的傳遞均需要在主內(nèi)存來完成,線程、主內(nèi)存和工作內(nèi)存的交互關(guān)系如上圖所示。
現(xiàn)在我們來想象一下,假設(shè)線程 1 把數(shù)據(jù)讀到了自己的工作內(nèi)存中,在 tab 角標(biāo)為 1 的鏈表頭插入了一條新的數(shù)據(jù),倘若這時還沒來得及將新增的數(shù)據(jù)刷新到主內(nèi)中。接著線程 2 就把數(shù)據(jù)讀到了自己的工作內(nèi)存中,在 tab 角標(biāo)為 1 的鏈表頭插入了一條新的數(shù)據(jù)。接著線程 1 把新增數(shù)據(jù)刷新到主內(nèi)存中,線程 2 也把數(shù)據(jù)新增數(shù)據(jù)刷新到主內(nèi)存中,那么線程 2 就會覆蓋線程 1 的新增數(shù)據(jù),從而導(dǎo)致數(shù)據(jù)丟失的情況。這里需要注意的是,只有兩個線程都是操作 tab 的同一個 index 鏈表才會導(dǎo)致數(shù)據(jù)丟失的情況,如果不是同一個 index 鏈表就不會有覆蓋和丟失這一說。
關(guān)于 HashMap 的線程不安全問題,Java 給我們提供了三種方案,第一種是 HashTable ,第二種是 Collections.synchronizedMap() ,第三種是 ConcurrentHashMap 。而第一種和第二種都是通過用 synchronized 同步方法來保證線程安全,性能上有所欠缺不推薦大家使用。ConcurrentHashMap 在 JDK 1.8 之前采用的是 Segment 分段鎖來實現(xiàn)的,而 JDK 1.8 之后則采用 synchronized 和 CAS 來實現(xiàn)。
HashTable 通過鎖住整個 put 和 get 方法來實現(xiàn)線程安全并不是很合理,因為一個線程在 put 的時候,另外一個線程不能再 put 和 get 必須進(jìn)入等待狀態(tài)。同理一個線程在 get 的時候,另外一個線程也不能再 get 和 put 。上面通過分析只有兩個線程都是操作 tab 的同一個 index 鏈表才會導(dǎo)致數(shù)據(jù)丟失的情況,如果不是同一個 index 鏈表就不會有覆蓋和丟失這一說。因此也沒必要鎖住整個方法,只需要鎖住每個 tab 的 index 鏈即可。
ConcurrentHashMap 在 JDK 1.8 之前采用的是 Segment 繼承自 ReentrantLock 來鎖住 tab 的 index 鏈,而 JDK 1.8 之后則采用 synchronized 來實現(xiàn),這兩者又有什么區(qū)別?我們首先看下 synchronized 的底層是怎么實現(xiàn)線程安全的。Java中的每一個對象都可以作為鎖。具體表現(xiàn)有以下3種形式。
// 1.對于普通同步方法,鎖是當(dāng)前實例對象。this
public synchronized void method(){
}
// 2.對于靜態(tài)同步方法,鎖是當(dāng)前類的Class對象。this.class
public static synchronized void method(){
}
// 3.對于同步方法塊,鎖是Synchonized括號里配置的對象。object
public static synchronized void method(){
synchronized(object){
}
}
我們可能會想鎖到底存在哪里呢?鎖里面會存儲什么信息呢?其實 synchronized 同步的代碼塊,虛擬機(jī)在同步代碼塊開始前會插入一條 monitorenter 指令,在代碼塊的末尾會插入一條 monitorexit 指令。而每個對象的 Mark Word 頭信息里都會存儲 Monitor 信息,也就是當(dāng)前對象的鎖信息,當(dāng)然 Mark Word 頭信息還包含對象的 hashCode 和 GC 的分代年齡,具體請看下表:
Lock 的實現(xiàn)原理和 synchronized 有些類似,都是通過線程的原子性來保證線程同步,具體的實現(xiàn)的方式大家可以去看下 ReentrantLock 的源碼實現(xiàn)。那為什么在 JDK 1.8 之后要采用 synchronized 和 CAS 來實現(xiàn)?在 JDK 1.6 為了減少獲得鎖和釋放鎖帶來的性能消耗,引入了“偏向鎖”和“輕量級鎖”,級別從低到高依次是:無鎖狀態(tài)、偏向鎖狀態(tài)、輕量級鎖狀態(tài)和重量級鎖狀態(tài),這幾個狀態(tài)會隨著競爭情況逐漸升級。鎖可以升級但不能降級,意味著偏向鎖升級成輕量級鎖后不能降級成偏向鎖。這種鎖升級卻不能降級的策略,目的是為了提高獲得鎖和釋放鎖的效率。當(dāng)線程 1 進(jìn)入同步代碼塊遇到 monitorenter 指令,首先判斷鎖的狀態(tài)發(fā)現(xiàn)是 0 ,采用 CAS 將鎖的狀態(tài)設(shè)置為 1,偏向鎖設(shè)置為 1,鎖的標(biāo)致位設(shè)置為 1 ,繼續(xù)執(zhí)行同步代碼塊里面的指令。這是若線程 2 也來到了同步代碼塊,也會遇到 monitorenter 指令,首先判斷鎖的狀態(tài)發(fā)現(xiàn)是 1 進(jìn)入等待中,等線程 1 執(zhí)行完同步代碼塊遇到 monitorenter 指令,首先會清空鎖的狀態(tài)然后喚醒線程 2 。如此反復(fù)即可保證線程安全。
偏向鎖
大多數(shù)情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得,為了讓線程獲得鎖的代價更低而引入了偏向鎖。當(dāng)一個線程訪問同步塊并獲取鎖時,會在對象頭和棧幀中的鎖記錄里存儲鎖偏向的線程 ID,以后該線程在進(jìn)入和退出同步塊時不需要進(jìn)行 CAS 操作來加鎖和解鎖,只需簡單地測試一下對象頭的 Mark Word 里是否存儲著指向當(dāng)前線程的偏向鎖。如果測試成功,表示線程已經(jīng)獲得了鎖。如果測試失敗,則需要再測試一下 Mark Word 中偏向鎖的標(biāo)識是否設(shè)置成1(表示當(dāng)前是偏向鎖):如果沒有設(shè)置,則使用 CAS 競爭鎖;如果設(shè)置了,則嘗試使用CAS將對象頭的偏向鎖指向當(dāng)前線程。
輕量級鎖
線程在執(zhí)行同步塊之前,JVM 會先在當(dāng)前線程的棧楨中創(chuàng)建用于存儲鎖記錄的空間,并將對象頭中的 Mark Word 復(fù)制到鎖記錄中。然后線程嘗試使用 CAS 將對象頭中的 Mark Word 替換為指向鎖記錄的針。如果成功,當(dāng)前線程獲得鎖,如果失敗,表示其他線程競爭鎖,當(dāng)前線程便嘗試使用自旋來獲取鎖。
重量級鎖
輕量級鎖采用自旋的方式不斷的嘗試獲取鎖,如果長時間獲取不到鎖勢必會不斷消耗 CPU 的資源。所以當(dāng)線程競爭比較激烈或者線程遲遲獲取不到鎖,就會升級為重量級的鎖狀態(tài),此時線程是阻塞的,且響應(yīng)時間緩慢。
// volatile 保證可見性
transient volatile Node
// 新增元素的方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 二次 hash
int hash = spread(key.hashCode());
int binCount = 0;
for (Node
Node
// 如果 tab 為空,初始化 tab
if (tab == null || (n = tab.length) == 0){
tab = initTable();
}
// 當(dāng)前 tab 的 index 鏈表為 null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 鎖住當(dāng)前 tab 的 index 鏈表(分段鎖)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// ......
public V get(Object key) {
Node
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// CAS 操作
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍歷當(dāng)前列表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
最后值得一提的是 table 和 Node 對象中的 next 和 val 都是采用 volatile 來修飾的。
本文轉(zhuǎn)載自【程序員大咖】