免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
我就知道面試官接下來要問我 ConcurrentHashMap 底層原理了

文章目錄

這篇文章,我打算從以下幾個(gè)方面來講。

1)多線程下的 HashMap 有什么問題?

2)怎樣保證線程安全,為什么選用 ConcurrentHashMap?

3)ConcurrentHashMap 1.7 源碼解析

  • 底層存儲(chǔ)結(jié)構(gòu)
  • 常用變量
  • 構(gòu)造函數(shù)
  • put() 方法
  • ensureSegment() 方法
  • scanAndLockForPut() 方法
  • rehash() 擴(kuò)容機(jī)制
  • get() 獲取元素方法
  • remove() 方法
  • size() 方法是怎么統(tǒng)計(jì)元素個(gè)數(shù)的

4)ConcurrentHashMap 1.8 源碼解析

  • put()方法詳解
  • initTable()初始化表
  • addCount()方法
  • fullAddCount()方法
  • transfer()是怎樣擴(kuò)容和遷移元素的
  • helpTransfer()方法幫助遷移元素

多線程下 HashMap 有什么問題?

在上一篇文章中,已經(jīng)講解了 HashMap 1.7 死循環(huán)的成因,也正因?yàn)槿绱?,我們才說 HashMap 在多線程下是不安全的。但是,在JDK1.8 的 HashMap 改為采用尾插法,已經(jīng)不存在死循環(huán)的問題了,為什么也會(huì)線程不安全呢?

我們以 put 方法為例(1.8),

假如現(xiàn)在有兩個(gè)線程都執(zhí)行到了上圖中的劃線處。當(dāng)線程一判斷為空之后,CPU 時(shí)間片到了,被掛起。線程二也執(zhí)行到此處判斷為空,繼續(xù)執(zhí)行下一句,創(chuàng)建了一個(gè)新節(jié)點(diǎn),插入到此下標(biāo)位置。然后,線程一解掛,同樣認(rèn)為此下標(biāo)的元素為空,因此也創(chuàng)建了一個(gè)新節(jié)點(diǎn)放在此下標(biāo)處,因此造成了元素的覆蓋。

所以,可以看到不管是 JDK1.7 還是 1.8 的 HashMap 都存在線程安全的問題。那么,在多線程環(huán)境下,應(yīng)該怎樣去保證線程安全呢?

怎樣保證線程安全,為什么選用 ConcurrentHashMap?

首先,你可能想到,在多線程環(huán)境下用 Hashtable 來解決線程安全的問題。這樣確實(shí)是可以的,但是同樣的它也有缺點(diǎn),我們看下最常用的 put 方法和 get 方法。

Hashtable-put
Hatable-get

可以看到,不管是往 map 里邊添加元素還是獲取元素,都會(huì)用 synchronized 關(guān)鍵字加鎖。當(dāng)有多個(gè)元素之前存在資源競(jìng)爭(zhēng)時(shí),只能有一個(gè)線程可以獲取到鎖,操作資源。更不能忍的是,一個(gè)簡(jiǎn)單的讀取操作,互相之間又不影響,為什么也不能同時(shí)進(jìn)行呢?

所以,hashtable 的缺點(diǎn)顯而易見,它不管是 get 還是 put 操作,都是鎖住了整個(gè) table,效率低下,因此 并不適合高并發(fā)場(chǎng)景。

也許,你還會(huì)想起來一個(gè)集合工具類 Collections,生成一個(gè)SynchronizedMap。其實(shí),它和 Hashtable 差不多,同樣的原因,鎖住整張表,效率低下。

所以,思考一下,既然鎖住整張表的話,并發(fā)效率低下,那我把整張表分成 N 個(gè)部分,并使元素盡量均勻的分布到每個(gè)部分中,分別給他們加鎖,互相之間并不影響,這種方式豈不是更好 。這就是在 JDK1.7 中 ConcurrentHashMap 采用的方案,被叫做鎖分段技術(shù),每個(gè)部分就是一個(gè) Segment(段)。

但是,在JDK1.8中,完全重構(gòu)了,采用的是 Synchronized + CAS ,把鎖的粒度進(jìn)一步降低,而放棄了 Segment 分段。(此時(shí)的 Synchronized 已經(jīng)升級(jí)了,效率得到了很大提升,鎖升級(jí)可以了解一下)

ConcurrentHashMap 1.7 源碼解析

我們看下在 JDK1.7中 ConcurrentHashMap 是怎么實(shí)現(xiàn)的。墻裂建議,在本文之前了解一下多線程的基本知識(shí),如JMM內(nèi)存模型,volatile關(guān)鍵字作用,CAS和自旋,ReentranLock重入鎖。

底層存儲(chǔ)結(jié)構(gòu)

在 JDK1.7中,本質(zhì)上還是采用鏈表+數(shù)組的形式存儲(chǔ)鍵值對(duì)的。但是,為了提高并發(fā),把原來的整個(gè) table 劃分為 n 個(gè) Segment 。所以,從整體來看,它是一個(gè)由 Segment 組成的數(shù)組。然后,每個(gè) Segment 里邊是由 HashEntry 組成的數(shù)組,每個(gè) HashEntry之間又可以形成鏈表。我們可以把每個(gè) Segment 看成是一個(gè)小的 HashMap,其內(nèi)部結(jié)構(gòu)和 HashMap 是一模一樣的。

當(dāng)對(duì)某個(gè) Segment 加鎖時(shí),如圖中 Segment2,并不會(huì)影響到其他 Segment 的讀寫。每個(gè) Segment 內(nèi)部自己操作自己的數(shù)據(jù)。這樣一來,我們要做的就是盡可能的讓元素均勻的分布在不同的 Segment中。最理想的狀態(tài)是,所有執(zhí)行的線程操作的元素都是不同的 Segment,這樣就可以降低鎖的競(jìng)爭(zhēng)。

廢話了這么多,還是來看底層源碼吧,因?yàn)樗械乃枷攵荚诖a里體現(xiàn)。借用 Linus的一句話,“No BB . Show me the code ” (改編版,哈哈)

常用變量

先看下 1.7 中常用的變量和內(nèi)部類都有哪些,這有助于我們了解 ConcurrentHashMap 的整體結(jié)構(gòu)。

//默認(rèn)初始化容量,這個(gè)和 HashMap中的容量是一個(gè)概念,表示的是整個(gè) Map的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

//默認(rèn)加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//默認(rèn)的并發(fā)級(jí)別,這個(gè)參數(shù)決定了 Segment 數(shù)組的長(zhǎng)度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//最大的容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//每個(gè)Segment中table數(shù)組的最小長(zhǎng)度為2,且必須是2的n次冪。
//由于每個(gè)Segment是懶加載的,用的時(shí)候才會(huì)初始化,因此為了避免使用時(shí)立即調(diào)整大小,設(shè)定了最小容量2
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用于限制Segment數(shù)量的最大值,必須是2的n次冪
static final int MAX_SEGMENTS = 1 << 16// slightly conservative

//在size方法和containsValue方法,會(huì)優(yōu)先采用樂觀的方式不加鎖,直到重試次數(shù)達(dá)到2,才會(huì)對(duì)所有Segment加鎖
//這個(gè)值的設(shè)定,是為了避免無限次的重試。后邊size方法會(huì)詳講怎么實(shí)現(xiàn)樂觀機(jī)制的。
static final int RETRIES_BEFORE_LOCK = 2;

//segment掩碼值,用于根據(jù)元素的hash值定位所在的 Segment 下標(biāo)。后邊會(huì)細(xì)講
final int segmentMask;

//和 segmentMask 配合使用來定位 Segment 的數(shù)組下標(biāo),后邊講。
final int segmentShift;

// Segment 組成的數(shù)組,每一個(gè) Segment 都可以看做是一個(gè)特殊的 HashMap
final Segment<K,V>[] segments;

//Segment 對(duì)象,繼承自 ReentrantLock 可重入鎖。
//其內(nèi)部的屬性和方法和 HashMap 神似,只是多了一些拓展功能。
static final class Segment<K,Vextends ReentrantLock implements Serializable {
 
 //這是在 scanAndLockForPut 方法中用到的一個(gè)參數(shù),用于計(jì)算最大重試次數(shù)
 //獲取當(dāng)前可用的處理器的數(shù)量,若大于1,則返回64,否則返回1。
 static final int MAX_SCAN_RETRIES =
  Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

 //用于表示每個(gè)Segment中的 table,是一個(gè)用HashEntry組成的數(shù)組。
 transient volatile HashEntry<K,V>[] table;

 //Segment中的元素個(gè)數(shù),每個(gè)Segment單獨(dú)計(jì)數(shù)(下邊的幾個(gè)參數(shù)同樣的都是單獨(dú)計(jì)數(shù))
 transient int count;

 //每次 table 結(jié)構(gòu)修改時(shí),如put,remove等,此變量都會(huì)自增
 transient int modCount;

 //當(dāng)前Segment擴(kuò)容的閾值,同HashMap計(jì)算方法一樣也是容量乘以加載因子
 //需要知道的是,每個(gè)Segment都是單獨(dú)處理擴(kuò)容的,互相之間不會(huì)產(chǎn)生影響
 transient int threshold;

 //加載因子
 final float loadFactor;

 //Segment構(gòu)造函數(shù)
 Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
  this.loadFactor = lf;
  this.threshold = threshold;
  this.table = tab;
 }
 
 ...
 // put(),remove(),rehash() 方法都在此類定義
}

// HashEntry,存在于每個(gè)Segment中,它就類似于HashMap中的Node,用于存儲(chǔ)鍵值對(duì)的具體數(shù)據(jù)和維護(hù)單向鏈表的關(guān)系
static final class HashEntry<K,V{
 //每個(gè)key通過哈希運(yùn)算后的結(jié)果,用的是 Wang/Jenkins hash 的變種算法,此處不細(xì)講,感興趣的可自行查閱相關(guān)資料
 final int hash;
 final K key;
 //value和next都用 volatile 修飾,用于保證內(nèi)存可見性和禁止指令重排序
 volatile V value;
 //指向下一個(gè)節(jié)點(diǎn)
 volatile HashEntry<K,V> next;

 HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
  this.hash = hash;
  this.key = key;
  this.value = value;
  this.next = next;
 }
}

構(gòu)造函數(shù)

ConcurrentHashMap 有五種構(gòu)造函數(shù),但是最終都會(huì)調(diào)用同一個(gè)構(gòu)造函數(shù),所以只需要搞明白這一個(gè)核心的構(gòu)造函數(shù)就可以了。

PS: 文章注釋中 (1)(2)(3) 等序號(hào)都是用來方便做標(biāo)記,不是計(jì)算值

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel)
 
{
 //檢驗(yàn)參數(shù)是否合法。值得說的是,并發(fā)級(jí)別一定要大于0,否則就沒辦法實(shí)現(xiàn)分段鎖了。
 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  throw new IllegalArgumentException();
 //并發(fā)級(jí)別不能超過最大值
 if (concurrencyLevel > MAX_SEGMENTS)
  concurrencyLevel = MAX_SEGMENTS;
 // Find power-of-two sizes best matching arguments
 //偏移量,是為了對(duì)hash值做位移操作,計(jì)算元素所在的Segment下標(biāo),put方法詳講
 int sshift = 0;
 //用于設(shè)定最終Segment數(shù)組的長(zhǎng)度,必須是2的n次冪
 int ssize = 1;
 //這里就是計(jì)算 sshift 和 ssize 值的過程  (1) 
 while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
 }
 this.segmentShift = 32 - sshift;
 //Segment的掩碼
 this.segmentMask = ssize - 1;
 if (initialCapacity > MAXIMUM_CAPACITY)
  initialCapacity = MAXIMUM_CAPACITY;
 //c用于輔助計(jì)算cap的值   (2)
 int c = initialCapacity / ssize;
 if (c * ssize < initialCapacity)
  ++c;
 // cap 用于確定某個(gè)Segment的容量,即Segment中HashEntry數(shù)組的長(zhǎng)度
 int cap = MIN_SEGMENT_TABLE_CAPACITY;
 //(3)
 while (cap < c)
  cap <<= 1;
 // create segments and segments[0]
 //這里用 loadFactor做為加載因子,cap乘以加載因子作為擴(kuò)容閾值,創(chuàng)建長(zhǎng)度為cap的HashEntry數(shù)組,
 //三個(gè)參數(shù),創(chuàng)建一個(gè)Segment對(duì)象,保存到S0對(duì)象中。后邊在 ensureSegment 方法會(huì)用到S0作為原型對(duì)象去創(chuàng)建對(duì)應(yīng)的Segment。
 Segment<K,V> s0 =
  new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
       (HashEntry<K,V>[])new HashEntry[cap]);
 //創(chuàng)建出長(zhǎng)度為 ssize 的一個(gè) Segment數(shù)組
 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
 //把S0存到Segment數(shù)組中去。在這里,我們就可以發(fā)現(xiàn),此時(shí)只是創(chuàng)建了一個(gè)Segment數(shù)組,
 //但是并沒有把數(shù)組中的每個(gè)Segment對(duì)象創(chuàng)建出來,僅僅創(chuàng)建了一個(gè)Segment用來作為原型對(duì)象。
 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
 this.segments = ss;
}    
```須是2的n次冪
 int ssize = 1;

上邊的注釋中留了 (1)(2)(3) 三個(gè)地方還沒有細(xì)說。我們現(xiàn)在假設(shè)一組數(shù)據(jù),把涉及到的幾個(gè)變量計(jì)算出來,就能明白這些參數(shù)的含義了。

```java
//假設(shè)調(diào)用了默認(rèn)構(gòu)造,都用的是默認(rèn)參數(shù),即 initialCapacity 和 concurrencyLevel 都是16
//(1)  sshift 和 ssize 值的計(jì)算過程為,每次循環(huán),都會(huì)把 sshift 自增1,并且 ssize 左移一位,即乘以2,
//直到 ssize 的值大于等于 concurrencyLevel 的值 16。
sshfit=0,1,2,3,4
ssize=1,2,4,8,16
//可以看到,初始他們的值分別是0和1,最終結(jié)果是4和16
//sshfit是為了輔助計(jì)算segmentShift值,ssize是為了確定Segment數(shù)組長(zhǎng)度。
//(2)  此時(shí),計(jì)算c的值,
c = 16/16 = 1;
//判斷 c * 16 < 16 是否為真,真的話 c 自增1,此處為false,因此 c的值為1不變。
//(3)  此時(shí),由于c為1, cap為2 ,因此判斷 cap < c 為false,最終cap為2。
//總結(jié)一下,以上三個(gè)步驟,最終都是為了確定以下幾個(gè)關(guān)鍵參數(shù)的值,
//確定 segmentShift ,這個(gè)用于后邊計(jì)算hash值的偏移量,此處即為 32-4=28,
//確定 ssize,必須是一個(gè)大于等于 concurrencyLevel 的一個(gè)2的n次冪值
//確定 cap,必須是一個(gè)大于等于2的一個(gè)2的n次冪值
//感興趣的小伙伴,還可以用另外幾組參數(shù)來計(jì)算上邊的參數(shù)值,可以加深理解參數(shù)的含義。
//例如initialCapacity和concurrencyLevel分別傳入10和5,或者傳入33和16

put()方法

put 方法的總體流程是,

  1. 通過哈希算法計(jì)算出當(dāng)前 key 的 hash 值
  2. 通過這個(gè) hash 值找到它所對(duì)應(yīng)的 Segment 數(shù)組的下標(biāo)
  3. 再通過 hash 值計(jì)算出它在對(duì)應(yīng) Segment 的 HashEntry數(shù)組 的下標(biāo)
  4. 找到合適的位置插入元素
//這是Map的put方法
public V put(K key, V value) {
 Segment<K,V> s;
 //不支持value為空
 if (value == null)
  throw new NullPointerException();
 //通過 Wang/Jenkins 算法的一個(gè)變種算法,計(jì)算出當(dāng)前key對(duì)應(yīng)的hash值
 int hash = hash(key);
 //上邊我們計(jì)算出的 segmentShift為28,因此hash值右移28位,說明此時(shí)用的是hash的高4位,
 //然后把它和掩碼15進(jìn)行與運(yùn)算,得到的值一定是一個(gè) 0000 ~ 1111 范圍內(nèi)的值,即 0~15 。
 int j = (hash >>> segmentShift) & segmentMask;
 //這里是用Unsafe類的原子操作找到Segment數(shù)組中j下標(biāo)的 Segment 對(duì)象
 if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
   (segments, (j << SSHIFT) + SBASE)) == null//  in ensureSegment
  //初始化j下標(biāo)的Segment
  s = ensureSegment(j);
 //在此Segment中添加元素
 return s.put(key, hash, value, false);
}

上邊有一個(gè)這樣的方法, UNSAFE.getObject (segments, (j << SSHIFT) + SBASE。它是為了通過Unsafe這個(gè)類,找到 j 最新的實(shí)際值。這個(gè)計(jì)算 (j << SSHIFT) + SBASE ,在后邊非常常見,我們只需要知道它代表的是 j 的一個(gè)偏移量,通過偏移量,就可以得到 j 的實(shí)際值。可以類比,AQS 中的 CAS 操作。Unsafe中的操作,都需要一個(gè)偏移量,看下圖,

(j << SSHIFT) + SBASE 就相當(dāng)于圖中的 stateOffset偏移量。只不過圖中是 CAS 設(shè)置新值,而我們這里是取 j 的最新值。后邊很多這樣的計(jì)算方式,就不贅述了。接著看 s.put 方法,這才是最終確定元素位置的方法。

//Segment中的 put 方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 //這里通過tryLock嘗試加鎖,如果加鎖成功,返回null,否則執(zhí)行 scanAndLockForPut方法
 //這里說明一下,tryLock 和 lock 是 ReentrantLock 中的方法,
 //區(qū)別是 tryLock 不會(huì)阻塞,搶鎖成功就返回true,失敗就立馬返回false,
 //而 lock 方法是,搶鎖成功則返回,失敗則會(huì)進(jìn)入同步隊(duì)列,阻塞等待獲取鎖。
 HashEntry<K,V> node = tryLock() ? null :
  scanAndLockForPut(key, hash, value);
 V oldValue;
 try {
  //當(dāng)前Segment的table數(shù)組
  HashEntry<K,V>[] tab = table;
  //這里就是通過hash值,與tab數(shù)組長(zhǎng)度取模,找到其所在HashEntry數(shù)組的下標(biāo)
  int index = (tab.length - 1) & hash;
  //當(dāng)前下標(biāo)位置的第一個(gè)HashEntry節(jié)點(diǎn)
  HashEntry<K,V> first = entryAt(tab, index);
  for (HashEntry<K,V> e = first;;) {
   //如果第一個(gè)節(jié)點(diǎn)不為空
   if (e != null) {
    K k;
    //并且第一個(gè)節(jié)點(diǎn),就是要插入的節(jié)點(diǎn),則替換value值,否則繼續(xù)向后查找
    if ((k = e.key) == key ||
     (e.hash == hash && key.equals(k))) {
     //替換舊值
     oldValue = e.value;
     if (!onlyIfAbsent) {
      e.value = value;
      ++modCount;
     }
     break;
    }
    e = e.next;
   }
   //說明當(dāng)前index位置不存在任何節(jié)點(diǎn),此時(shí)first為null,
   //或者當(dāng)前index存在一條鏈表,并且已經(jīng)遍歷完了還沒找到相等的key,此時(shí)first就是鏈表第一個(gè)元素
   else {
    //如果node不為空,則直接頭插
    if (node != null)
     node.setNext(first);
    //否則,創(chuàng)建一個(gè)新的node,并頭插
    else
     node = new HashEntry<K,V>(hash, key, value, first);
    int c = count + 1;
    //如果當(dāng)前Segment中的元素大于閾值,并且tab長(zhǎng)度沒有超過容量最大值,則擴(kuò)容
    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
     rehash(node);
    //否則,就把當(dāng)前node設(shè)置為index下標(biāo)位置新的頭結(jié)點(diǎn)
    else
     setEntryAt(tab, index, node);
    ++modCount;
    //更新count值
    count = c;
    //這種情況說明舊值肯定為空
    oldValue = null;
    break;
   }
  }
 } finally {
  //需要注意ReentrantLock必須手動(dòng)解鎖
  unlock();
 }
 //返回舊值
 return oldValue;
}

這里說明一下計(jì)算 Segment 數(shù)組下標(biāo)和計(jì)算 HashEntry 數(shù)組下標(biāo)的不同點(diǎn):

//下邊的hash值是通過哈希運(yùn)算后的hash值,不是hashCode
//計(jì)算 Segment 下標(biāo)
 (hash >>> segmentShift) & segmentMask 
 //計(jì)算 HashEntry 數(shù)組下標(biāo)
 (tab.length - 1) & hash

思考一下,為什么它們的算法不一樣呢?計(jì)算 Segment 數(shù)組下標(biāo)是用的 hash值高幾位(這里以高 4 位為例)和掩碼做與運(yùn)算,而計(jì)算 HashEntry 數(shù)組下標(biāo)是直接用的 hash 值和數(shù)組長(zhǎng)度減1做與運(yùn)算。

我的理解是,這是為了盡量避免當(dāng)前 hash 值計(jì)算出來的 Segment 數(shù)組下標(biāo)和計(jì)算出來的 HashEntry 數(shù)組下標(biāo)趨于相同。簡(jiǎn)單說,就是為了避免分配到同一個(gè) Segment 中的元素扎堆現(xiàn)象,即避免它們都被分配到同一條鏈表上,導(dǎo)致鏈表過長(zhǎng)。同時(shí),也是為了減少并發(fā)。下面做一個(gè)運(yùn)算,幫助理解一下(假設(shè)不用高 4 位運(yùn)算,而是正常情況都用低位做運(yùn)算)。

//我們以并發(fā)級(jí)別16,HashEntry數(shù)組容量 4 為例,則它們參與運(yùn)算的掩碼分別為 15 和 3
//hash值
0110 1101 0110 1111 0110 1110 0010 0010
//segmentMask = 15   ,標(biāo)記為 (1)
0000 0000 0000 0000 0000 0000 0000 1111
//tab.length - 1 = 3     ,標(biāo)記為 (2)
0000 0000 0000 0000 0000 0000 0000 0011
//用 hash 分別和 15 ,3 做與運(yùn)算,會(huì)發(fā)現(xiàn)得到的結(jié)果是一樣,都是十進(jìn)制 2.
//這表明,當(dāng)前 hash值被分配到下標(biāo)為 2 的 Segment 中,同時(shí),被分配到下標(biāo)為 2 的 HashEntry 數(shù)組中
//現(xiàn)在若有另外一個(gè) hash 值 h2,和第一個(gè)hash值,高位不同,但是低4位相同,
1010 1101 0110 1111 0110 1110 0010 0010
//我們會(huì)發(fā)現(xiàn),最后它也會(huì)被分配到下標(biāo)為 2 的 Segment 和 HashEntry 數(shù)組,就會(huì)和第一個(gè)元素形成鏈表。
//所以,為了避免這種扎堆現(xiàn)象,讓元素盡量均勻分配,就讓 hash 的高 4 位和 (1)處做與 運(yùn)算,而用低位和 (2)處做與運(yùn)算
//這樣計(jì)算后,它們所在的Segment下標(biāo)分別為 6(0110), 10(1010),即使它們?cè)贖ashEntry數(shù)組中的下標(biāo)都為 2(0010),也無所謂
//因?yàn)樗鼈儾⒉辉谝粋€(gè) Segment 中,也就不會(huì)在同一個(gè) HashEntry 數(shù)組中,更不會(huì)形成鏈表。
//更重要的是,它們不會(huì)有并發(fā),因?yàn)樵诟髯圆煌?nbsp;Segment 自己操作自己的加鎖解鎖,互不影響

可能有的小伙伴就會(huì)打岔了,那如果兩個(gè) hash 值,低位和高位都相同,怎么辦呢。如果是這樣,我只能說,這個(gè) hash 算法也太爛了吧。(這里的 hash 算法也會(huì)盡量避免這種情況,當(dāng)然只是減少幾率,并不能杜絕)

我有個(gè)大膽的想法,這里的高低位不同的計(jì)算方式,是不是后邊 1.8 HashMap 讓 hash 高低位做異或運(yùn)算的引子呢?不得而知。。

put 方法比較簡(jiǎn)單,只要能看懂 HashMap 中的 put 方法,這里也沒問題。主要是它調(diào)用的子方法比較復(fù)雜,下邊一個(gè)一個(gè)講解。

ensureSegment()方法

回到 Map的 put 方法,判斷 j 下標(biāo)的 Segment為空后,則需要調(diào)用此方法,初始化一個(gè) Segment 對(duì)象,以確保拿到的對(duì)象一定是不為空的,否則無法執(zhí)行s.put了。

//k為 (hash >>> segmentShift) & segmentMask 算法計(jì)算出來的值
private Segment<K,V> ensureSegment(int k) {
 final Segment<K,V>[] ss = this.segments;
 //u代表 k 的偏移量,用于通過 UNSAFE 獲取主內(nèi)存最新的實(shí)際 K 值
 long u = (k << SSHIFT) + SBASE; // raw offset
 Segment<K,V> seg;
 //從內(nèi)存中取到最新的下標(biāo)位置的 Segment 對(duì)象,判斷是否為空,(1)
 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  //之前構(gòu)造函數(shù)說了,s0是作為一個(gè)原型對(duì)象,用于創(chuàng)建新的 Segment 對(duì)象
  Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  //容量
  int cap = proto.table.length;
  //加載因子
  float lf = proto.loadFactor;
  //擴(kuò)容閾值
  int threshold = (int)(cap * lf);
  //把 Segment 對(duì)應(yīng)的 HashEntry 數(shù)組先創(chuàng)建出來
  HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  //再次檢查 K 下標(biāo)位置的 Segment 是否為空, (2)
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
   == null) { // recheck
   //此處把 Segment 對(duì)象創(chuàng)建出來,并賦值給 s,
   Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
   //循環(huán)檢查 K 下標(biāo)位置的 Segment 是否為空, (3)
   //若不為空,則說明有其它線程搶先創(chuàng)建成功,并且已經(jīng)成功同步到主內(nèi)存中了,
   //則把它取出來,并返回
   while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
       == null) {
    //CAS,若當(dāng)前下標(biāo)的Segment對(duì)象為空,就把它替換為最新創(chuàng)建出來的 s 對(duì)象。
    //若成功,就跳出循環(huán),否則,就一直自旋直到成功,或者 seg 不為空(其他線程成功導(dǎo)致)。
    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
     break;
   }
  }
 }
 return seg;
}

可以發(fā)現(xiàn),我標(biāo)注了上邊 (1)(2)(3) 個(gè)地方,每次都判斷最新的Segment是否為空。可能有的小伙伴就會(huì)迷惑,為什么做這么多次判斷,我直接去自旋不就好了,反正最后都要自旋的。

我的理解是,在多線程環(huán)境下,因?yàn)椴淮_定是什么時(shí)候會(huì)有其它線程 CAS 成功,有可能發(fā)生在以上的任意時(shí)刻。所以,只要發(fā)現(xiàn)一旦內(nèi)存中的對(duì)象已經(jīng)存在了,則說明已經(jīng)有其它線程把Segment對(duì)象創(chuàng)建好,并CAS成功同步到主內(nèi)存了。此時(shí),就可以直接返回,而不需要往下執(zhí)行了。這樣做,是為了代碼執(zhí)行效率考慮。

scanAndLockForPut()方法

put 方法第一步搶鎖失敗之后,就會(huì)執(zhí)行此方法,

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 //根據(jù)hash值定位到它對(duì)應(yīng)的HashEntry數(shù)組的下標(biāo)位置,并找到鏈表的第一個(gè)節(jié)點(diǎn)
 //注意,這個(gè)操作會(huì)從主內(nèi)存中獲取到最新的狀態(tài),以確保獲取到的first是最新值
 HashEntry<K,V> first = entryForHash(this, hash);
 HashEntry<K,V> e = first;
 HashEntry<K,V> node = null;
 //重試次數(shù),初始化為 -1
 int retries = -1// negative while locating node
 //若搶鎖失敗,就一直循環(huán),直到成功獲取到鎖。有三種情況
 while (!tryLock()) {
  HashEntry<K,V> f; // to recheck first below
  //1.若 retries 小于0,
  if (retries < 0) {
   if (e == null) {
    //若 e 節(jié)點(diǎn)和 node 都為空,則創(chuàng)建一個(gè) node 節(jié)點(diǎn)。這里只是預(yù)測(cè)性的創(chuàng)建一個(gè)node節(jié)點(diǎn)
    if (node == null// speculatively create node
     node = new HashEntry<K,V>(hash, key, value, null);
    retries = 0;
   }
   //如當(dāng)前遍歷到的 e 節(jié)點(diǎn)不為空,則判斷它的key是否等于傳進(jìn)來的key,若是則把 retries 設(shè)為0
   else if (key.equals(e.key))
    retries = 0;
   //否則,繼續(xù)向后遍歷節(jié)點(diǎn)
   else
    e = e.next;
  }
  //2.若是重試次數(shù)超過了最大嘗試次數(shù),則調(diào)用lock方法加鎖。表明不再重試,我下定決心了一定要獲取到鎖。
  //要么當(dāng)前線程可以獲取到鎖,要么獲取不到就去排隊(duì)等待獲取鎖。獲取成功后,再 break。
  else if (++retries > MAX_SCAN_RETRIES) {
   lock();
   break;
  }
  //3.若 retries 的值為偶數(shù),并且從內(nèi)存中再次獲取到最新的頭節(jié)點(diǎn),判斷若不等于first
  //則說明有其他線程修改了當(dāng)前下標(biāo)位置的頭結(jié)點(diǎn),于是需要更新頭結(jié)點(diǎn)信息。
  else if ((retries & 1) == 0 &&
     (f = entryForHash(this, hash)) != first) {
   //更新頭結(jié)點(diǎn)信息,并把重試次數(shù)重置為 -1,繼續(xù)下一次循環(huán),從最新的頭結(jié)點(diǎn)遍歷當(dāng)前鏈表。
   e = first = f; // re-traverse if entry changed
   retries = -1;
  }
 }
 return node;
}

這個(gè)方法邏輯比較復(fù)雜,會(huì)一直循環(huán)嘗試獲取鎖,若獲取成功,則返回。否則的話,每次循環(huán)時(shí),都會(huì)同時(shí)遍歷當(dāng)前鏈表。若遍歷完了一次,還沒找到和key相等的節(jié)點(diǎn),就會(huì)預(yù)先創(chuàng)建一個(gè)節(jié)點(diǎn)。注意,這里只是預(yù)測(cè)性的創(chuàng)建一個(gè)新節(jié)點(diǎn),也有可能在這之前,就已經(jīng)獲取鎖成功了。

同時(shí),當(dāng)重試次每偶數(shù)次時(shí),就會(huì)檢查一次當(dāng)前最新的頭結(jié)點(diǎn)是否被改變。因?yàn)槿粲凶兓脑挘€需要從最新的頭結(jié)點(diǎn)開始遍歷鏈表。

還有一種情況,就是循環(huán)次數(shù)達(dá)到了最大限制,則停止循環(huán),用阻塞的方式去獲取鎖。這時(shí),也就停止了遍歷鏈表的動(dòng)作,當(dāng)前線程也不會(huì)再做其他預(yù)熱(warm up)的事情。

關(guān)于為什么預(yù)測(cè)性的創(chuàng)建新節(jié)點(diǎn),源碼中原話是這樣的:

Since traversal speed doesn't matter, we might as well help warm up the associated code and accesses as well.

解釋一下就是,因?yàn)楸闅v速度無所謂,所以,我們可以預(yù)先(warm up)做一些相關(guān)聯(lián)代碼的準(zhǔn)備工作。這里相關(guān)聯(lián)代碼,指的就是循環(huán)中,在獲取鎖成功或者調(diào)用 lock 方法之前做的這些事情,當(dāng)然也包括創(chuàng)建新節(jié)點(diǎn)。

在put 方法中可以看到,有一句是判斷 node 是否為空,若創(chuàng)建了,就直接頭插。否則的話,它也會(huì)自己創(chuàng)建這個(gè)新節(jié)點(diǎn)。

scanAndLockForPut 這個(gè)方法可以確保返回時(shí),當(dāng)前線程一定是獲取到鎖的狀態(tài)。

rehash()方法

當(dāng) put 方法時(shí),發(fā)現(xiàn)元素個(gè)數(shù)超過了閾值,則會(huì)擴(kuò)容。需要注意的是,每個(gè)Segment只管它自己的擴(kuò)容,互相之間并不影響。換句話說,可以出現(xiàn)這個(gè) Segment的長(zhǎng)度為2,另一個(gè)Segment的長(zhǎng)度為4的情況(只要是2的n次冪)。


//node為創(chuàng)建的新節(jié)點(diǎn)
private void rehash(HashEntry<K,V> node) {
 //當(dāng)前Segment中的舊表
 HashEntry<K,V>[] oldTable = table;
 //舊的容量
 int oldCapacity = oldTable.length;
 //新容量為舊容量的2倍
 int newCapacity = oldCapacity << 1;
 //更新新的閾值
 threshold = (int)(newCapacity * loadFactor);
 //用新的容量創(chuàng)建一個(gè)新的 HashEntry 數(shù)組
 HashEntry<K,V>[] newTable =
  (HashEntry<K,V>[]) new HashEntry[newCapacity];
 //當(dāng)前的掩碼,用于計(jì)算節(jié)點(diǎn)在新數(shù)組中的下標(biāo)
 int sizeMask = newCapacity - 1;
 //遍歷舊表
 for (int i = 0; i < oldCapacity ; i++) {
  HashEntry<K,V> e = oldTable[i];
  //如果e不為空,說明當(dāng)前鏈表不為空
  if (e != null) {
   HashEntry<K,V> next = e.next;
   //計(jì)算hash值再新數(shù)組中的下標(biāo)位置
   int idx = e.hash & sizeMask;
   //如果e不為空,且它的下一個(gè)節(jié)點(diǎn)為空,則說明這條鏈表只有一個(gè)節(jié)點(diǎn),
   //直接把這個(gè)節(jié)點(diǎn)放到新數(shù)組的對(duì)應(yīng)下標(biāo)位置即可
   if (next == null)   //  Single node on list
    newTable[idx] = e;
   //否則,處理當(dāng)前鏈表的節(jié)點(diǎn)遷移操作
   else { // Reuse consecutive sequence at same slot
    //記錄上一次遍歷到的節(jié)點(diǎn)
    HashEntry<K,V> lastRun = e;
    //對(duì)應(yīng)上一次遍歷到的節(jié)點(diǎn)在新數(shù)組中的新下標(biāo)
    int lastIdx = idx;
    for (HashEntry<K,V> last = next;
      last != null;
      last = last.next) {
     //計(jì)算當(dāng)前遍歷到的節(jié)點(diǎn)的新下標(biāo)
     int k = last.hash & sizeMask;
     //若 k 不等于 lastIdx,則說明此次遍歷到的節(jié)點(diǎn)和上次遍歷到的節(jié)點(diǎn)不在同一個(gè)下標(biāo)位置
     //需要把 lastRun 和 lastIdx 更新為當(dāng)前遍歷到的節(jié)點(diǎn)和下標(biāo)值。
     //若相同,則不處理,繼續(xù)下一次 for 循環(huán)。
     if (k != lastIdx) {
      lastIdx = k;
      lastRun = last;
     }
    }
    //把和 lastRun 節(jié)點(diǎn)的下標(biāo)位置相同的鏈表最末尾的幾個(gè)連續(xù)的節(jié)點(diǎn)放到新數(shù)組的對(duì)應(yīng)下標(biāo)位置
    newTable[lastIdx] = lastRun;
    //再把剩余的節(jié)點(diǎn),復(fù)制到新數(shù)組
    //從舊數(shù)組的頭結(jié)點(diǎn)開始遍歷,直到 lastRun 節(jié)點(diǎn),因?yàn)?lastRun節(jié)點(diǎn)后邊的節(jié)點(diǎn)都已經(jīng)遷移完成了。
    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
     V v = p.value;
     int h = p.hash;
     int k = h & sizeMask;
     HashEntry<K,V> n = newTable[k];
     //用的是復(fù)制節(jié)點(diǎn)信息的方式,并不是把原來的節(jié)點(diǎn)直接遷移,區(qū)別于lastRun處理方式
     newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
    }
   }
  }
 }
 //所有節(jié)點(diǎn)都遷移完成之后,再處理傳進(jìn)來的新的node節(jié)點(diǎn),把它頭插到對(duì)應(yīng)的下標(biāo)位置
 int nodeIndex = node.hash & sizeMask; // add the new node
 //頭插node節(jié)點(diǎn)
 node.setNext(newTable[nodeIndex]);
 newTable[nodeIndex] = node;
 //更新當(dāng)前Segment的table信息
 table = newTable;
}

上邊的遷移過程和 lastRun 和 lastIdx 變量可能不太好理解,我畫個(gè)圖就明白了。以其中一條鏈表處理方式為例。

從頭結(jié)點(diǎn)開始向后遍歷,找到當(dāng)前鏈表的最后幾個(gè)下標(biāo)相同的連續(xù)的節(jié)點(diǎn)。如上圖,雖然開頭出現(xiàn)了有兩個(gè)節(jié)點(diǎn)的下標(biāo)都是 k2, 但是中間出現(xiàn)一個(gè)不同的下標(biāo) k1,打斷了下標(biāo)連續(xù)相同,因此從下一個(gè)k2,又重新開始算。好在后邊三個(gè)連續(xù)的節(jié)點(diǎn)下標(biāo)都是相同的,因此倒數(shù)第三個(gè)節(jié)點(diǎn)被標(biāo)記為 lastRun,且變量無變化。

從lastRun節(jié)點(diǎn)到尾結(jié)點(diǎn)的這部分就可以整體遷移到新數(shù)組的對(duì)應(yīng)下標(biāo)位置了,因?yàn)樗鼈兊南聵?biāo)都是相同的,可以這樣統(tǒng)一處理。

另外從頭結(jié)點(diǎn)到 lastRun 之前的節(jié)點(diǎn),無法統(tǒng)一處理,只能一個(gè)一個(gè)去復(fù)制了。且注意,這里不是直接遷移,而是復(fù)制節(jié)點(diǎn)到新的數(shù)組,舊的節(jié)點(diǎn)會(huì)在不久的將來,因?yàn)闆]有引用指向,被 JVM 垃圾回收處理掉。

(不知道為啥這個(gè)方法名起為 rehash,其實(shí)擴(kuò)容時(shí) hash 值并沒有重新計(jì)算,變化的只是它們所在的下標(biāo)而已。我猜測(cè),可能是,借用了 1.7 HashMap 中的說法吧。。。)

get()

put 方法搞明白了之后,其實(shí) get 方法就很好理解了。也是先定位到 Segment,然后再定位到 HashEntry 。

public V get(Object key) {
 Segment<K,V> s; // manually integrate access methods to reduce overhead
 HashEntry<K,V>[] tab;
 //計(jì)算hash值
 int h = hash(key);
 //同樣的先定位到 key 所在的Segment ,然后從主內(nèi)存中取出最新的節(jié)點(diǎn)
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
  (tab = s.table) != null) {
  //若Segment不為空,且鏈表也不為空,則遍歷查找節(jié)點(diǎn)
  for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    e != null; e = e.next) {
   K k;
   //找到則返回它的 value 值,否則返回 null
   if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    return e.value;
  }
 }
 return null;
}

remove()

remove 方法和 put 方法類似,也不用做過多特殊的介紹,

public V remove(Object key) {
 int hash = hash(key);
 //定位到Segment
 Segment<K,V> s = segmentForHash(hash);
 //若 s為空,則返回 null,否則執(zhí)行 remove
 return s == null ? null : s.remove(key, hash, null);
}

public boolean remove(Object key, Object value) {
 int hash = hash(key);
 Segment<K,V> s;
 return value != null && (s = segmentForHash(hash)) != null &&
  s.remove(key, hash, value) != null;
}

final V remove(Object key, int hash, Object value) {
 //嘗試加鎖,若失敗,則執(zhí)行 scanAndLock ,此方法和 scanAndLockForPut 方法類似
 if (!tryLock())
  scanAndLock(key, hash);
 V oldValue = null;
 try {
  HashEntry<K,V>[] tab = table;
  int index = (tab.length - 1) & hash;
  //從主內(nèi)存中獲取對(duì)應(yīng) table 的最新的頭結(jié)點(diǎn)
  HashEntry<K,V> e = entryAt(tab, index);
  HashEntry<K,V> pred = null;
  while (e != null) {
   K k;
   HashEntry<K,V> next = e.next;
   //匹配到 key
   if ((k = e.key) == key ||
    (e.hash == hash && key.equals(k))) {
    V v = e.value;
    // value 為空,或者 value 也匹配成功
    if (value == null || value == v || value.equals(v)) {
     if (pred == null)
      setEntryAt(tab, index, next);
     else
      pred.setNext(next);
     ++modCount;
     --count;
     oldValue = v;
    }
    break;
   }
   pred = e;
   e = next;
  }
 } finally {
  unlock();
 }
 return oldValue;
}

size()

size 方法需要重點(diǎn)說明一下。愛思考的小伙伴可能就會(huì)想到,并發(fā)情況下,有可能在統(tǒng)計(jì)期間,數(shù)組元素個(gè)數(shù)不停的變化,而且,整個(gè)表還被分成了 N個(gè) Segment,怎樣統(tǒng)計(jì)才能保證結(jié)果的準(zhǔn)確性呢?我們一起來看下吧。

public int size() {
 // Try a few times to get accurate count. On failure due to
 // continuous async changes in table, resort to locking.
 //segment數(shù)組
 final Segment<K,V>[] segments = this.segments;
 //統(tǒng)計(jì)所有Segment中元素的總個(gè)數(shù)
 int size;
 //如果size大小超過32位,則標(biāo)記為溢出為true
 boolean overflow; 
 //統(tǒng)計(jì)每個(gè)Segment中的 modcount 之和
 long sum;         
 //上次記錄的 sum 值
 long last = 0L;   
 //重試次數(shù),初始化為 -1
 int retries = -1
 try {
  for (;;) {
   //如果超過重試次數(shù),則不再重試,而是把所有Segment都加鎖,再統(tǒng)計(jì) size
   if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
     //強(qiáng)制加鎖
     ensureSegment(j).lock(); // force creation
   }
   sum = 0L;
   size = 0;
   overflow = false;
   //遍歷所有Segment
   for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    //若當(dāng)前遍歷到的Segment不為空,則統(tǒng)計(jì)它的 modCount 和 count 元素個(gè)數(shù)
    if (seg != null) {
     //累加當(dāng)前Segment的結(jié)構(gòu)修改次數(shù),如put,remove等操作都會(huì)影響modCount
     sum += seg.modCount;
     int c = seg.count;
     //若當(dāng)前Segment的元素個(gè)數(shù) c 小于0 或者 size 加上 c 的結(jié)果小于0,則認(rèn)為溢出
     //因?yàn)槿舫^了 int 最大值,就會(huì)返回負(fù)數(shù)
     if (c < 0 || (size += c) < 0)
      overflow = true;
    }
   }
   //當(dāng)此次嘗試,統(tǒng)計(jì)的 sum 值和上次統(tǒng)計(jì)的值相同,則說明這段時(shí)間內(nèi),
   //并沒有任何一個(gè) Segment 的結(jié)構(gòu)發(fā)生改變,就可以返回最后的統(tǒng)計(jì)結(jié)果
   if (sum == last)
    break;
   //不相等,則說明有 Segment 結(jié)構(gòu)發(fā)生了改變,則記錄最新的結(jié)構(gòu)變化次數(shù)之和 sum,
   //并賦值給 last,用于下次重試的比較。
   last = sum;
  }
 } finally {
  //如果超過了指定重試次數(shù),則說明表中的所有Segment都被加鎖了,因此需要把它們都解鎖
  if (retries > RETRIES_BEFORE_LOCK) {
   for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
  }
 }
 //若結(jié)果溢出,則返回 int 最大值,否則正常返回 size 值 
 return overflow ? Integer.MAX_VALUE : size;
}

其實(shí)源碼中前兩行的注釋也說的非常清楚了。我們先采用樂觀的方式,認(rèn)為在統(tǒng)計(jì) size 的過程中,并沒有發(fā)生 put, remove 等會(huì)改變 Segment 結(jié)構(gòu)的操作。但是,如果發(fā)生了,就需要重試。如果重試2次都不成功(執(zhí)行三次,第一次不能叫做重試),就只能強(qiáng)制把所有 Segment 都加鎖之后,再統(tǒng)計(jì)了,以此來得到準(zhǔn)確的結(jié)果。

ConcurrentHashMap 1.8 源碼分析

需要說明的是,JDK 1.8 的 CHM(ConcurrentHashMap) 實(shí)現(xiàn),完全重構(gòu)了 1.7 。不再有 Segment 的概念,只是為了兼容 1.7 才申明了一下,并沒有用到。因此,不再使用分段鎖,而是給數(shù)組中的每一個(gè)頭節(jié)點(diǎn)(為了方便,以后都叫桶)都加鎖,鎖的粒度降低了。并且,用的是 Synchronized 鎖。

可能有的小伙伴就有疑惑了,不是都說同步鎖是重量級(jí)鎖嗎,這樣不是會(huì)影響并發(fā)效率嗎?

確實(shí)之前同步鎖是一個(gè)重量級(jí)鎖,但是在 JDK1.6 之后進(jìn)行了各種優(yōu)化之后,它已經(jīng)不再那么重了。引入了偏向鎖,輕量級(jí)鎖,以及鎖升級(jí)的概念,而且,據(jù)說在更細(xì)粒度的代碼層面上,同步鎖已經(jīng)可以媲美 Lock 鎖,甚至是趕超了。除此之外,它還有很多優(yōu)點(diǎn),這里不再展開了。感興趣的可以自行查閱同步鎖的鎖升級(jí)過程,以及它和 Lock 鎖的區(qū)別。

在 1.8 CHM 中,底層存儲(chǔ)結(jié)構(gòu)和 1.8 的 HashMap 是一樣的,都是數(shù)組+鏈表+紅黑樹。不同的就是,多了一些并發(fā)的處理。

文章開頭我們提到了,在 1.8 HashMap 中的線程安全問題,就是因?yàn)樵诙鄠€(gè)線程同時(shí)操作同一個(gè)桶的頭結(jié)點(diǎn)時(shí),會(huì)發(fā)生值的覆蓋情況。那么,順著這個(gè)思路,我們看一下在 CHM 中它是怎么避免這種情況發(fā)生的吧。

PS:由于1.8的 CHM 和 HashMap 結(jié)構(gòu)和基本屬性變量,還有初始化邏輯都差不多,只是多了一些并發(fā)情況需要用到的參數(shù)和內(nèi)部類,因此,不再單獨(dú)拎出來介紹。在方法中用到的時(shí)候,再詳細(xì)解釋。

put()方法

因此,從 put 方法開始,我們看下,它在插入新元素的時(shí)候,是怎么保證線程安全的吧。

public V put(K key, V value) {
 return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
 //可以看到,在并發(fā)情況下,key 和 value 都是不支持為空的。
 if (key == null || value == nullthrow new NullPointerException();
 //這里和1.8 HashMap 的hash 方法大同小異,只是多了一個(gè)操作,如下
 //( h ^ (h >>> 16)) & HASH_BITS;  HASH_BITS = 0x7fffffff;
 // 0x7fffffff ,二進(jìn)制為 0111 1111 1111 1111 1111 1111 1111 1111 。
 //所以,hash值除了做了高低位異或運(yùn)算,還多了一步,保證最高位的 1 個(gè) bit 位總是0。
 //這里,我并沒有明白它的意圖,僅僅是保證計(jì)算出來的hash值不超過 Integer 最大值,且不為負(fù)數(shù)嗎。
 //同 HashMap 的hash 方法對(duì)比一下,會(huì)發(fā)現(xiàn)連源碼注釋都是相同的,并沒有多說明其它的。
 //我個(gè)人認(rèn)為意義不大,因?yàn)樽詈?nbsp;hash 是為了和 capacity -1 做與運(yùn)算,而 capacity 最大值為 1<<30,
 //即 0100 0000 0000 0000 0000 0000 0000 0000 ,減1為 0011 1111 1111 1111 1111 1111 1111 1111。
 //即使 hash 最高位為 1(無所謂0),也不影響最后的結(jié)果,最高位也總會(huì)是0.
 int hash = spread(key.hashCode());
 //用來計(jì)算當(dāng)前鏈表上的元素個(gè)數(shù)
 int binCount = 0;
 for (Node<K,V>[] tab = table;;) {
  Node<K,V> f; int n, i, fh;
  //如果表為空,則說明還未初始化。
  if (tab == null || (n = tab.length) == 0)
   //初始化表,只有一個(gè)線程可以初始化成功。
   tab = initTable();
  //若表已經(jīng)初始化,則找到當(dāng)前 key 所在的桶,并且判斷是否為空
  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
   //若當(dāng)前桶為空,則通過 CAS 原子操作,把新節(jié)點(diǎn)插入到此位置,
   //這保證了只有一個(gè)線程可以 CAS 成功,其它線程都會(huì)失敗。
   if (casTabAt(tab, i, null,
       new Node<K,V>(hash, key, value, null)))
    break;                   // no lock when adding to empty bin
  }
  //若所在桶不為空,則判斷節(jié)點(diǎn)的 hash 值是否為 MOVED(值是-1)
  else if ((fh = f.hash) == MOVED)
   //若為-1,說明當(dāng)前數(shù)組正在進(jìn)行擴(kuò)容,則需要當(dāng)前線程幫忙遷移數(shù)據(jù)
   tab = helpTransfer(tab, f);
  else {
   V oldVal = null;
   //這里用加同步鎖的方式,來保證線程安全,給桶中第一個(gè)節(jié)點(diǎn)對(duì)象加鎖
   synchronized (f) {
    //recheck 一下,保證當(dāng)前桶的第一個(gè)節(jié)點(diǎn)無變化,后邊很多這樣類似的操作,不再贅述
    if (tabAt(tab, i) == f) {
     //如果hash值大于等于0,說明是正常的鏈表結(jié)構(gòu)
     if (fh >= 0) {
      binCount = 1;
      //從頭結(jié)點(diǎn)開始遍歷,每遍歷一次,binCount計(jì)數(shù)加1
      for (Node<K,V> e = f;; ++binCount) {
       K ek;
       //如果找到了和當(dāng)前 key 相同的節(jié)點(diǎn),則用新值替換舊值
       if (e.hash == hash &&
        ((ek = e.key) == key ||
         (ek != null && key.equals(ek)))) {
        oldVal = e.val;
        if (!onlyIfAbsent)
         e.val = value;
        break;
       }
       Node<K,V> pred = e;
       //若遍歷到了尾結(jié)點(diǎn),則把新節(jié)點(diǎn)尾插進(jìn)去
       if ((e = e.next) == null) {
        pred.next = new Node<K,V>(hash, key,
                value, null);
        break;
       }
      }
     }
     //否則判斷是否是樹節(jié)點(diǎn)。這里提一下,TreeBin只是頭結(jié)點(diǎn)對(duì)TreeNode的再封裝
     else if (f instanceof TreeBin) {
      Node<K,V> p;
      binCount = 2;
      if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                value)) != null) {
       oldVal = p.val;
       if (!onlyIfAbsent)
        p.val = value;
      }
     }
    }
   }
   //注意下,這個(gè)判斷是在同步鎖外部,因?yàn)?nbsp;treeifyBin內(nèi)部也有同步鎖,并不影響
   if (binCount != 0) {
    //如果節(jié)點(diǎn)個(gè)數(shù)大于等于 8,則轉(zhuǎn)化為紅黑樹
    if (binCount >= TREEIFY_THRESHOLD)
     treeifyBin(tab, i);
    //把舊節(jié)點(diǎn)值返回
    if (oldVal != null)
     return oldVal;
    break;
   }
  }
 }
 //給元素個(gè)數(shù)加 1,并有可能會(huì)觸發(fā)擴(kuò)容,比較復(fù)雜,稍后細(xì)講
 addCount(1L, binCount);
 return null;
}

initTable()方法

先看下當(dāng)數(shù)組為空時(shí),是怎么初始化表的。

private final Node<K,V>[] initTable() {
 Node<K,V>[] tab; int sc;
 //循環(huán)判斷表是否為空,直到初始化成功為止。
 while ((tab = table) == null || tab.length == 0) {
  //sizeCtl 這個(gè)值有很多情況,默認(rèn)值為0,
  //當(dāng)為 -1 時(shí),說明有其它線程正在對(duì)表進(jìn)行初始化操作
  //當(dāng)表初始化成功后,又會(huì)把它設(shè)置為擴(kuò)容閾值
  //當(dāng)為一個(gè)小于 -1 的負(fù)數(shù),用來表示當(dāng)前有幾個(gè)線程正在幫助擴(kuò)容(后邊細(xì)講)
  if ((sc = sizeCtl) < 0)
   //若 sc 小于0,其實(shí)在這里就是-1,因?yàn)榇藭r(shí)表是空的,不會(huì)發(fā)生擴(kuò)容,sc只能為正數(shù)或者-1
   //因此,當(dāng)前線程放棄 CPU 時(shí)間片,只是自旋。
   Thread.yield(); // lost initialization race; just spin
  //通過 CAS 把 sc 的值設(shè)置為-1,表明當(dāng)前線程正在進(jìn)行表的初始化,其它失敗的線程就會(huì)自旋
  else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
   try {
    //重新檢查一下表是否為空
    if ((tab = table) == null || tab.length == 0) {
     //如果sc大于0,則為sc,否則返回默認(rèn)容量 16。
     //當(dāng)調(diào)用有參構(gòu)造創(chuàng)建 Map 時(shí),sc的值是大于0的。
     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
     @SuppressWarnings('unchecked')
     //創(chuàng)建數(shù)組
     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
     table = tab = nt;
     //n減去 1/4 n ,即為 0.75n ,表示擴(kuò)容閾值
     sc = n - (n >>> 2);
    }
   } finally {
    //更新 sizeCtl 為擴(kuò)容閾值
    sizeCtl = sc;
   }
   //若當(dāng)前線程初始化表成功,則跳出循環(huán)。其它自旋的線程因?yàn)榕袛鄶?shù)組不為空,也會(huì)停止自旋
   break;
  }
 }
 return tab;
}

addCount()方法

若 put 方法元素插入成功之后,則會(huì)調(diào)用此方法,傳入?yún)?shù)為 addCount(1L, binCount)。這個(gè)方法的目的很簡(jiǎn)單,就是把整個(gè) table 的元素個(gè)數(shù)加 1 。但是,實(shí)現(xiàn)比較難。

我們先思考一下,如果讓我們自己去實(shí)現(xiàn)這樣的統(tǒng)計(jì)元素個(gè)數(shù),怎么實(shí)現(xiàn)?

類比 1.8 的 HashMap ,我們可以搞一個(gè) size 變量來存儲(chǔ)個(gè)數(shù)統(tǒng)計(jì)。但是,這是在多線程環(huán)境下,需要考慮并發(fā)的問題。因此,可以把 size 設(shè)置為 volatile 的,保證可見性,然后通過 CAS 樂觀鎖來自增 1。

這樣雖然也可以實(shí)現(xiàn)。但是,設(shè)想一下現(xiàn)在有非常多的線程,都在同一時(shí)間操作這個(gè) size 變量,將會(huì)造成特別嚴(yán)重的競(jìng)爭(zhēng)。所以,基于此,這里做了更好的優(yōu)化。讓這些競(jìng)爭(zhēng)的線程,分散到不同的對(duì)象里邊,單獨(dú)操作它自己的數(shù)據(jù)(計(jì)數(shù)變量),用這樣的方式盡量降低競(jìng)爭(zhēng)。到最后需要統(tǒng)計(jì) size 的時(shí)候,再把所有對(duì)象里邊的計(jì)數(shù)相加就可以了。

上邊提到的 size ,在此用 baseCount 表示。分散到的對(duì)象用 CounterCell 表示,對(duì)象里邊的計(jì)數(shù)變量用 value 表示。注意這里的變量都是 volatile 修飾的。

當(dāng)需要修改元素?cái)?shù)量時(shí),線程會(huì)先去 CAS 修改 baseCount 加1,若成功即返回。若失敗,則線程被分配到某個(gè) CounterCell ,然后操作 value 加1。若成功,則返回。否則,給當(dāng)前線程重新分配一個(gè) CounterCell,再嘗試給 value 加1。(這里簡(jiǎn)略的說,實(shí)際更復(fù)雜)

CounterCell 會(huì)組成一個(gè)數(shù)組,也會(huì)涉及到擴(kuò)容問題。所以,先畫一個(gè)示意圖幫助理解一下。

//線程被分配到的格子
@sun.misc.Contended static final class CounterCell {
 //此格子內(nèi)記錄的 value 值
    volatile long value;
    CounterCell(long x) { value = x; }
}

//用來存儲(chǔ)線程和線程生成的隨機(jī)數(shù)的對(duì)應(yīng)關(guān)系
static final int getProbe() {
 return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// x為1,check代表鏈表上的元素個(gè)數(shù)
private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 //此處要進(jìn)入if有兩種情況
 //1.數(shù)組不為空,說明數(shù)組已經(jīng)被創(chuàng)建好了。
 //2.若數(shù)組為空,說明數(shù)組還未創(chuàng)建,很有可能競(jìng)爭(zhēng)的線程非常少,因此就直接 CAS 操作 baseCount
 //若 CAS 成功,則方法跳轉(zhuǎn)到 (2)處,若失敗,則需要考慮給當(dāng)前線程分配一個(gè)格子(指CounterCell對(duì)象)
 if ((as = counterCells) != null ||
  !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  CounterCell a; long v; int m;
  //字面意思,是無競(jìng)爭(zhēng),這里先標(biāo)記為 true,表示還沒有產(chǎn)生線程競(jìng)爭(zhēng)
  boolean uncontended = true;
  //這里有三種情況,會(huì)進(jìn)入 fullAddCount 方法
  //1.若數(shù)組為空,進(jìn)方法 (1)
  //2.ThreadLocalRandom.getProbe() 方法會(huì)給當(dāng)前線程生成一個(gè)隨機(jī)數(shù)(可以簡(jiǎn)單的認(rèn)為也是一個(gè)hash值)
  //然后用隨機(jī)數(shù)與數(shù)組長(zhǎng)度取模,計(jì)算它所在的格子。若當(dāng)前線程所分配到的格子為空,進(jìn)方法 (1)。
  //3.若數(shù)組不為空,且線程所在格子不為空,則嘗試 CAS 修改此格子對(duì)應(yīng)的 value 值加1。
  //若修改成功,則跳轉(zhuǎn)到 (3),若失敗,則把 uncontended 值設(shè)為 fasle,說明產(chǎn)生了競(jìng)爭(zhēng),然后進(jìn)方法 (1)
  if (as == null || (m = as.length - 1) < 0 ||
   (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
   !(uncontended =
     U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
   //方法(1), 這個(gè)方法的目的是讓當(dāng)前線程一定把 1 加成功。情況更多,更復(fù)雜,稍后講。
   fullAddCount(x, uncontended);
   return;
  }
  //(3)能走到這,說明數(shù)組不為空,且修改 baseCount失敗,
  //且線程被分配到的格子不為空,且修改 value 成功。
  //但是這里沒明白為什么小于等于1,就直接返回了,這里我懷疑之前的方法漏掉了binCount=0的情況。
  //而且此處若返回了,后邊怎么判斷擴(kuò)容?(存疑)
  if (check <= 1)
   return;
  //計(jì)算總共的元素個(gè)數(shù)
  s = sumCount();
 }
 //(2)這里用于檢查是否需要擴(kuò)容(下邊這部分很多邏輯不懂的話,等后邊講完擴(kuò)容,再回來看就理解了)
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  //若元素個(gè)數(shù)達(dá)到擴(kuò)容閾值,且tab不為空,且tab數(shù)組長(zhǎng)度小于最大容量
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
      (n = tab.length) < MAXIMUM_CAPACITY) {
   //這里假設(shè)數(shù)組長(zhǎng)度n就為16,這個(gè)方法返回的是一個(gè)固定值,用于當(dāng)做一個(gè)擴(kuò)容的校驗(yàn)標(biāo)識(shí)
   //可以跳轉(zhuǎn)到最后,看詳細(xì)計(jì)算過程,0000 0000 0000 0000 1000 0000 0001 1011
   int rs = resizeStamp(n);
   //若sc小于0,說明正在擴(kuò)容
   if (sc < 0) {
       //sc的結(jié)構(gòu)類似這樣,1000 0000 0001 1011 0000 0000 0000 0001
    //sc的高16位是數(shù)據(jù)校驗(yàn)標(biāo)識(shí),低16位代表當(dāng)前有幾個(gè)線程正在幫助擴(kuò)容,RESIZE_STAMP_SHIFT=16
    //因此判斷校驗(yàn)標(biāo)識(shí)是否相等,不相等則退出循環(huán)
    //sc == rs + 1,sc == rs + MAX_RESIZERS 這兩個(gè)應(yīng)該是用來判斷擴(kuò)容是否已經(jīng)完成,但是計(jì)算方法存疑
    //感興趣的可以看這個(gè)地址,應(yīng)該是一個(gè) bug ,
    // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
    //nextTable=null 說明需要擴(kuò)容的新數(shù)組還未創(chuàng)建完成
    //transferIndex這個(gè)參數(shù)小于等于0,說明已經(jīng)不需要其它線程幫助擴(kuò)容了,
    //但是并不說明已經(jīng)擴(kuò)容完成,因?yàn)橛锌赡苓€有線程正在遷移元素。稍后擴(kuò)容細(xì)講就明白了。
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
     transferIndex <= 0)
     break;
    //到這里說明當(dāng)前線程可以幫助擴(kuò)容,因此sc值加一,代表擴(kuò)容的線程數(shù)加1
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   }
   //當(dāng)sc大于0,說明sc代表擴(kuò)容閾值,因此第一次擴(kuò)容之前肯定走這個(gè)分支,用于初始化新表 nextTable
   //rs<<16
   //1000 0000 0001 1011 0000 0000 0000 0000
   //+2
   //1000 0000 0001 1011 0000 0000 0000 0010
   //這個(gè)值,轉(zhuǎn)為十進(jìn)制就是 -2145714174,用于標(biāo)識(shí),這是擴(kuò)容時(shí),初始化新表的狀態(tài),
   //擴(kuò)容時(shí),需要用到這個(gè)參數(shù)校驗(yàn)是否所有線程都全部幫助擴(kuò)容完成。
   else if (U.compareAndSwapInt(this, SIZECTL, sc,
           (rs << RESIZE_STAMP_SHIFT) + 2))
    //擴(kuò)容,第二個(gè)參數(shù)代表新表,傳入null,則說明是第一次初始化新表(nextTable)
    transfer(tab, null);
   s = sumCount();
  }
 }
}

//計(jì)算表中的元素總個(gè)數(shù)
final long sumCount() {
 CounterCell[] as = counterCells; CounterCell a;
 //baseCount,以這個(gè)值作為累加基準(zhǔn)
 long sum = baseCount;
 if (as != null) {
  //遍歷 counterCells 數(shù)組,得到每個(gè)對(duì)象中的value值
  for (int i = 0; i < as.length; ++i) {
   if ((a = as[i]) != null)
    //累加 value 值
    sum += a.value;
  }
 }
 //此時(shí)得到的就是元素總個(gè)數(shù)
 return sum;


//擴(kuò)容時(shí)的校驗(yàn)標(biāo)識(shí)
static final int resizeStamp(int n) {
 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

//Integer.numberOfLeadingZeros方法的作用是返回 n 的最高位為1的前面的0的個(gè)數(shù)
//n=16,
0000 0000 0000 0000 0000 0000 0001 0000
//前面有27個(gè)0,即27
0000 0000 0000 0000 0000 0000 0001 1011
//RESIZE_STAMP_BITS為16,然后 1<<(16-1),即 1<<15
0000 0000 0000 0000 1000 0000 0000 0000
//它們做或運(yùn)算,得到 rs 的值
0000 0000 0000 0000 1000 0000 0001 1011

fullAddCount()方法

上邊的 addCount 方法還沒完,別忘了有可能元素個(gè)數(shù)加 1 的操作還未成功,就走到 fullAddCount 這個(gè)方法了??捶椒?,就知道了,全力增加計(jì)數(shù)值,一定要成功(奧利給)。這個(gè)方法和擴(kuò)容遷移方法是最難的,保持耐心~

//傳過來的參數(shù)分別為 1 , false
private final void fullAddCount(long x, boolean wasUncontended) {
 int h;
 //如果當(dāng)前線程的隨機(jī)數(shù)為0,則強(qiáng)制初始化一個(gè)值
 if ((h = ThreadLocalRandom.getProbe()) == 0) {
  ThreadLocalRandom.localInit();      // force initialization
  h = ThreadLocalRandom.getProbe();
  //此時(shí)把 wasUncontended 設(shè)為true,認(rèn)為無競(jìng)爭(zhēng)
  wasUncontended = true;
 }
 //用來表示比 contend(競(jìng)爭(zhēng))更嚴(yán)重的碰撞,若為true,表示可能需要擴(kuò)容,以減少碰撞沖突
 boolean collide = false;                // True if last slot nonempty
 //循環(huán)內(nèi),外層if判斷分三種情況,內(nèi)層判斷又分為六種情況
 for (;;) {
  CounterCell[] as; CounterCell a; int n; long v;
  //1. 若counterCells數(shù)組不為空。  建議先看下邊的2和3兩種情況,再回頭看這個(gè)。 
  if ((as = counterCells) != null && (n = as.length) > 0) {
   // (1) 若當(dāng)前線程所在的格子(CounterCell對(duì)象)為空
   if ((a = as[(n - 1) & h]) == null) {
    if (cellsBusy == 0) {    
     //若無鎖,則樂觀的創(chuàng)建一個(gè) CounterCell 對(duì)象。
     CounterCell r = new CounterCell(x); 
     //嘗試加鎖
     if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 01)) {
      boolean created = false;
      //加鎖成功后,再 recheck 一下數(shù)組是否不為空,且當(dāng)前格子為空
      try {               
       CounterCell[] rs; int m, j;
       if ((rs = counterCells) != null &&
        (m = rs.length) > 0 &&
        rs[j = (m - 1) & h] == null) {
        //把新創(chuàng)建的對(duì)象賦值給當(dāng)前格子
        rs[j] = r;
        created = true;
       }
      } finally {
       //手動(dòng)釋放鎖
       cellsBusy = 0;
      }
      //若當(dāng)前格子創(chuàng)建成功,且上邊的賦值成功,則說明加1成功,退出循環(huán)
      if (created)
       break;
      //否則,繼續(xù)下次循環(huán)
      continue;           // Slot is now non-empty
     }
    }
    //若cellsBusy=1,說明有其它線程搶鎖成功?;蛘呷魮屾i的 CAS 操作失敗,都會(huì)走到這里,
    //則當(dāng)前線程需跳轉(zhuǎn)到(9)重新生成隨機(jī)數(shù),進(jìn)行下次循環(huán)判斷。
    collide = false;
   }
   /**
   *后邊這幾種情況,都是數(shù)組和當(dāng)前隨機(jī)到的格子都不為空的情況。
   *且注意每種情況,若執(zhí)行成功,且不break,continue,則都會(huì)執(zhí)行(9),重新生成隨機(jī)數(shù),進(jìn)入下次循環(huán)判斷
   */

   // (2) 到這,說明當(dāng)前方法在被調(diào)用之前已經(jīng) CAS 失敗過一次,若不明白可回頭看下 addCount 方法,
   //為了減少競(jìng)爭(zhēng),則跳轉(zhuǎn)到⑨處重新生成隨機(jī)數(shù),并把 wasUncontended 設(shè)置為true ,認(rèn)為下一次不會(huì)產(chǎn)生競(jìng)爭(zhēng)
   else if (!wasUncontended)       // CAS already known to fail
    wasUncontended = true;      // Continue after rehash
   // (3) 若 wasUncontended 為 true 無競(jìng)爭(zhēng),則嘗試一次 CAS。若成功,則結(jié)束循環(huán),若失敗則判斷后邊的 (4)(5)(6)。
   else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
    break;
   // (4) 結(jié)合 (6) 一起看,(4)(5)(6)都是 wasUncontended=true,且CAS修改value失敗的情況。
   //若數(shù)組有變化,或者數(shù)組長(zhǎng)度大于等于當(dāng)前CPU的核心數(shù),則把 collide 改為 false
   //因?yàn)閿?shù)組若有變化,說明是由擴(kuò)容引起的;長(zhǎng)度超限,則說明已經(jīng)無法擴(kuò)容,只能認(rèn)為無碰撞。
   //這里很有意思,認(rèn)真思考一下,當(dāng)擴(kuò)容超限后,則會(huì)達(dá)到一個(gè)平衡,即 (4)(5) 反復(fù)執(zhí)行,直到 (3) 中CAS成功,跳出循環(huán)。
   else if (counterCells != as || n >= NCPU)
    collide = false;            // At max size or stale
   // (5) 若數(shù)組無變化,且數(shù)組長(zhǎng)度小于CPU核心數(shù)時(shí),且 collide 為 false,就把它改為 true,說明下次循環(huán)可能需要擴(kuò)容
   else if (!collide)
    collide = true;
   // (6) 若數(shù)組無變化,且數(shù)組長(zhǎng)度小于CPU核心數(shù)時(shí),且 collide 為 true,說明沖突比較嚴(yán)重,需要擴(kuò)容了。
   else if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 01)) {
    try {
     //recheck
     if (counterCells == as) {// Expand table unless stale
      //創(chuàng)建一個(gè)容量為原來兩倍的數(shù)組
      CounterCell[] rs = new CounterCell[n << 1];
      //轉(zhuǎn)移舊數(shù)組的值
      for (int i = 0; i < n; ++i)
       rs[i] = as[i];
      //更新數(shù)組
      counterCells = rs;
     }
    } finally {
     cellsBusy = 0;
    }
    //認(rèn)為擴(kuò)容后,下次不會(huì)產(chǎn)生沖突了,和(4)處邏輯照應(yīng)
    collide = false;
    //當(dāng)次擴(kuò)容后,就不需要重新生成隨機(jī)數(shù)了
    continue;                   // Retry with expanded table
   }
   // (9),重新生成一個(gè)隨機(jī)數(shù),進(jìn)行下一次循環(huán)判斷
   h = ThreadLocalRandom.advanceProbe(h);
  }
  //2.這里的 cellsBusy 參數(shù)非常有意思,是一個(gè)volatile的 int值,用來表示自旋鎖的標(biāo)志,
  //可以類比 AQS 中的 state 參數(shù),用來控制鎖之間的競(jìng)爭(zhēng),并且是獨(dú)占模式。簡(jiǎn)化版的AQS。
  //cellsBusy 若為0,說明無鎖,線程都可以搶鎖,若為1,表示已經(jīng)有線程拿到了鎖,則其它線程不能搶鎖。
  else if (cellsBusy == 0 && counterCells == as &&
     U.compareAndSwapInt(this, CELLSBUSY, 01)) {
   boolean init = false;
   try {    
    //這里再重新檢測(cè)下 counterCells 數(shù)組引用是否有變化
    if (counterCells == as) {
     //初始化一個(gè)長(zhǎng)度為 2 的數(shù)組
     CounterCell[] rs = new CounterCell[2];
     //根據(jù)當(dāng)前線程的隨機(jī)數(shù)值,計(jì)算下標(biāo),只有兩個(gè)結(jié)果 0 或 1,并初始化對(duì)象
     rs[h & 1] = new CounterCell(x);
     //更新數(shù)組引用
     counterCells = rs;
     //初始化成功的標(biāo)志
     init = true;
    }
   } finally {
    //別忘了,需要手動(dòng)解鎖。
    cellsBusy = 0;
   }
   //若初始化成功,則說明當(dāng)前加1的操作也已經(jīng)完成了,則退出整個(gè)循環(huán)。
   if (init)
    break;
  }
  //3.到這,說明數(shù)組為空,且 2 搶鎖失敗,則嘗試直接去修改 baseCount 的值,
  //若成功,也說明加1操作成功,則退出循環(huán)。
  else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
   break;                          // Fall back on using base
 }
}

不得不佩服 Doug Lea 大神,思維這么縝密,如果是我的話,直接一個(gè) CAS 完事。(手動(dòng)攤手~)

transfer()方法

需要說明的一點(diǎn)是,雖然我們一直在說幫助擴(kuò)容,其實(shí)更準(zhǔn)確的說應(yīng)該是幫助遷移元素。因?yàn)閿U(kuò)容的第一次初始化新表(擴(kuò)容后的新表)這個(gè)動(dòng)作,只能由一個(gè)線程完成。其他線程都是在幫助遷移元素到新數(shù)組。

這里還是先看下遷移的示意圖,幫助理解。

擴(kuò)容

為了方便,上邊以原數(shù)組長(zhǎng)度 8 為例。在元素遷移的時(shí)候,所有線程都遵循從后向前推進(jìn)的規(guī)則,即如圖A線程是第一個(gè)進(jìn)來的線程,會(huì)從下標(biāo)為7的位置,開始遷移數(shù)據(jù)。

而且當(dāng)前線程遷移時(shí)會(huì)確定一個(gè)范圍,限定它此次遷移的數(shù)據(jù)范圍,如圖 A 線程只能遷移 bound=6到 i=7 這兩個(gè)數(shù)據(jù)。

此時(shí),其它線程就不能遷移這部分?jǐn)?shù)據(jù)了,只能繼續(xù)向前推進(jìn),尋找其它可以遷移的數(shù)據(jù)范圍,且每次推進(jìn)的步長(zhǎng)為固定值 stride(此處假設(shè)為2)。如圖中 B線程發(fā)現(xiàn) A 線程正在遷移6,7的數(shù)據(jù),因此只能向前尋找,然后遷移 bound=4 到 i=5 的這兩個(gè)數(shù)據(jù)。

當(dāng)每個(gè)線程遷移完成它的范圍內(nèi)數(shù)據(jù)時(shí),都會(huì)繼續(xù)向前推進(jìn)。那什么時(shí)候是個(gè)頭呢?

這就需要維護(hù)一個(gè)全局的變量 transferIndex,來表示所有線程總共推進(jìn)到的元素下標(biāo)位置。如圖,線程 A 第一次遷移成功后又向前推進(jìn),然后遷移2,3 的數(shù)據(jù)。此時(shí),若沒有其他線程在幫助遷移,則 transferIndex 即為2。

剩余部分等待下一個(gè)線程來遷移,或者有任何的 A 和B線程已經(jīng)遷移完成,也可以推進(jìn)到這里幫助遷移。直到 transferIndex=0 。(會(huì)做一些其他校驗(yàn)來判斷是否遷移全部完成,看代碼)。

//這個(gè)類是一個(gè)標(biāo)志,用來代表當(dāng)前桶(數(shù)組中的某個(gè)下標(biāo)位置)的元素已經(jīng)全部遷移完成
static final class ForwardingNode<K,Vextends Node<K,V{
 final Node<K,V>[] nextTable;
 ForwardingNode(Node<K,V>[] tab) {
  //把當(dāng)前桶的頭結(jié)點(diǎn)的 hash 值設(shè)置為 -1,表明已經(jīng)遷移完成,
  //這個(gè)節(jié)點(diǎn)中并不存儲(chǔ)有效的數(shù)據(jù)
  super(MOVED, nullnullnull);
  this.nextTable = tab;
 }
}

//遷移數(shù)據(jù)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
 int n = tab.length, stride;
 //根據(jù)當(dāng)前CPU核心數(shù),確定每次推進(jìn)的步長(zhǎng),最小值為16.(為了方便我們以2為例)
 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  stride = MIN_TRANSFER_STRIDE; // subdivide range
 //從 addCount 方法,只會(huì)有一個(gè)線程跳轉(zhuǎn)到這里,初始化新數(shù)組
 if (nextTab == null) {            // initiating
  try {
   @SuppressWarnings('unchecked')
   //新數(shù)組長(zhǎng)度為原數(shù)組的兩倍
   Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
   nextTab = nt;
  } catch (Throwable ex) {      // try to cope with OOME
   sizeCtl = Integer.MAX_VALUE;
   return;
  }
  //用 nextTable 指代新數(shù)組
  nextTable = nextTab;
  //這里就把推進(jìn)的下標(biāo)值初始化為原數(shù)組長(zhǎng)度(以16為例)
  transferIndex = n;
 }
 //新數(shù)組長(zhǎng)度
 int nextn = nextTab.length;
 //創(chuàng)建一個(gè)標(biāo)志類
 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 //是否向前推進(jìn)的標(biāo)志
 boolean advance = true;
 //是否所有線程都全部遷移完成的標(biāo)志
 boolean finishing = false// to ensure sweep before committing nextTab
 //i 代表當(dāng)前線程正在遷移的桶的下標(biāo),bound代表它本次可以遷移的范圍下限
 for (int i = 0, bound = 0;;) {
  Node<K,V> f; int fh;
  //需要向前推進(jìn)
  while (advance) {
   int nextIndex, nextBound;
   //(1) 先看 (3) 。i每次自減 1,直到 bound。若超過bound范圍,或者finishing標(biāo)志為true,則不用向前推進(jìn)。
   //若未全部完成遷移,且 i 并未走到 bound,則跳轉(zhuǎn)到 (7),處理當(dāng)前桶的元素遷移。
   if (--i >= bound || finishing)
    advance = false;
   //(2) 每次執(zhí)行,都會(huì)把 transferIndex 最新的值同步給 nextIndex
   //若 transferIndex小于等于0,則說明原數(shù)組中的每個(gè)桶位置,都有線程在處理遷移了,
   //于是,需要跳出while循環(huán),并把 i設(shè)為 -1,以跳轉(zhuǎn)到④判斷在處理的線程是否已經(jīng)全部完成。
   else if ((nextIndex = transferIndex) <= 0) {
    i = -1;
    advance = false;
   }
   //(3) 第一個(gè)線程會(huì)先走到這里,確定它的數(shù)據(jù)遷移范圍。(2)處會(huì)更新 nextIndex為 transferIndex 的最新值
   //因此第一次 nextIndex=n=16,nextBound代表當(dāng)次遷移的數(shù)據(jù)范圍下限,減去步長(zhǎng)即可,
   //所以,第一次時(shí),nextIndex=16,nextBound=16-2=14。后續(xù),每次都會(huì)間隔一個(gè)步長(zhǎng)。
   else if (U.compareAndSwapInt
      (this, TRANSFERINDEX, nextIndex,
       nextBound = (nextIndex > stride ?
           nextIndex - stride : 0))) {
    //bound代表當(dāng)次數(shù)據(jù)遷移下限
    bound = nextBound;
    //第一次的i為15,因?yàn)殚L(zhǎng)度16的數(shù)組,最后一個(gè)元素的下標(biāo)為15
    i = nextIndex - 1;
    //表明不需要向前推進(jìn),只有當(dāng)把當(dāng)前范圍內(nèi)的數(shù)據(jù)全部遷移完成后,才可以向前推進(jìn)
    advance = false;
   }
  }
  //(4)
  if (i < 0 || i >= n || i + n >= nextn) {
   int sc;
   //若全部線程遷移完成
   if (finishing) {
    nextTable = null;
    //更新table為新表
    table = nextTab;
    //擴(kuò)容閾值改為原來數(shù)組長(zhǎng)度的 3/2 ,即新長(zhǎng)度的 3/4,也就是新數(shù)組長(zhǎng)度的0.75倍
    sizeCtl = (n << 1) - (n >>> 1);
    return;
   }
   //到這,說明當(dāng)前線程已經(jīng)完成了自己的所有遷移(無論參與了幾次遷移),
   //則把 sc 減1,表明參與擴(kuò)容的線程數(shù)減少 1。
   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    //在 addCount 方法最后,我們強(qiáng)調(diào),遷移開始時(shí),會(huì)設(shè)置 sc=(rs << RESIZE_STAMP_SHIFT) + 2
    //每當(dāng)有一個(gè)線程參與遷移,sc 就會(huì)加 1,每當(dāng)有一個(gè)線程完成遷移,sc 就會(huì)減 1。
    //因此,這里就是去校驗(yàn)當(dāng)前 sc 是否和初始值是否相等。相等,則說明全部線程遷移完成。
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
     return;
    //只有此處,才會(huì)把finishing 設(shè)置為true。
    finishing = advance = true;
    //這里非常有意思,會(huì)把 i 從 -1 修改為16,
    //目的就是,讓 i 再?gòu)暮笙蚯皰呙枰槐閿?shù)組,檢查是否所有的桶都已被遷移完成,參看 (6)
    i = n; // recheck before commit
   }
  }
  //(5) 若i的位置元素為空,則說明當(dāng)前桶的元素已經(jīng)被遷移完成,就把頭結(jié)點(diǎn)設(shè)置為fwd標(biāo)志。
  else if ((f = tabAt(tab, i)) == null)
   advance = casTabAt(tab, i, null, fwd);
  //(6) 若當(dāng)前桶的頭結(jié)點(diǎn)是 ForwardingNode ,說明遷移完成,則向前推進(jìn) 
  else if ((fh = f.hash) == MOVED)
   advance = true// already processed
  //(7) 處理當(dāng)前桶的數(shù)據(jù)遷移。
  else {
   synchronized (f) {  //給頭結(jié)點(diǎn)加鎖
    if (tabAt(tab, i) == f) {
     Node<K,V> ln, hn;
     //若hash值大于等于0,則說明是普通鏈表節(jié)點(diǎn)
     if (fh >= 0) {
      int runBit = fh & n;
      //這里是 1.7 的 CHM 的 rehash 方法和 1.8 HashMap的 resize 方法的結(jié)合體。
      //會(huì)分成兩條鏈表,一條鏈表和原來的下標(biāo)相同,另一條鏈表是原來的下標(biāo)加數(shù)組長(zhǎng)度的位置
      //然后找到 lastRun 節(jié)點(diǎn),從它到尾結(jié)點(diǎn)整體遷移。
      //lastRun前邊的節(jié)點(diǎn)則單個(gè)遷移,但是需要注意的是,這里是頭插法。
      //另外還有一點(diǎn)和1.7不同,1.7 lastRun前邊的節(jié)點(diǎn)是復(fù)制過去的,而這里是直接遷移的,沒有復(fù)制操作。
      //所以,最后會(huì)有兩條鏈表,一條鏈表從 lastRun到尾結(jié)點(diǎn)是正序的,而lastRun之前的元素是倒序的,
      //另外一條鏈表,從頭結(jié)點(diǎn)開始就是倒敘的??聪聢D。
      Node<K,V> lastRun = f;
      for (Node<K,V> p = f.next; p != null; p = p.next) {
       int b = p.hash & n;
       if (b != runBit) {
        runBit = b;
        lastRun = p;
       }
      }
      if (runBit == 0) {
       ln = lastRun;
       hn = null;
      }
      else {
       hn = lastRun;
       ln = null;
      }
      for (Node<K,V> p = f; p != lastRun; p = p.next) {
       int ph = p.hash; K pk = p.key; V pv = p.val;
       if ((ph & n) == 0)
        ln = new Node<K,V>(ph, pk, pv, ln);
       else
        hn = new Node<K,V>(ph, pk, pv, hn);
      }
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
     //樹節(jié)點(diǎn)
     else if (f instanceof TreeBin) {
      TreeBin<K,V> t = (TreeBin<K,V>)f;
      TreeNode<K,V> lo = null, loTail = null;
      TreeNode<K,V> hi = null, hiTail = null;
      int lc = 0, hc = 0;
      for (Node<K,V> e = t.first; e != null; e = e.next) {
       int h = e.hash;
       TreeNode<K,V> p = new TreeNode<K,V>
        (h, e.key, e.val, nullnull);
       if ((h & n) == 0) {
        if ((p.prev = loTail) == null)
         lo = p;
        else
         loTail.next = p;
        loTail = p;
        ++lc;
       }
       else {
        if ((p.prev = hiTail) == null)
         hi = p;
        else
         hiTail.next = p;
        hiTail = p;
        ++hc;
       }
      }
      ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
       (hc != 0) ? new TreeBin<K,V>(lo) : t;
      hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
       (lc != 0) ? new TreeBin<K,V>(hi) : t;
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
    }
   }
  }
 }
}

遷移后的新數(shù)組鏈表方向示意圖,以 runBit =0 為例。

helpTransfer()方法

最后再看 put 方法中的這個(gè)方法,就比較簡(jiǎn)單了。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 Node<K,V>[] nextTab; int sc;
 //頭結(jié)點(diǎn)為 ForwardingNode ,并且新數(shù)組已經(jīng)初始化
 if (tab != null && (f instanceof ForwardingNode) &&
  (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  int rs = resizeStamp(tab.length);
  while (nextTab == nextTable && table == tab &&
      (sc = sizeCtl) < 0) {
   //若校驗(yàn)標(biāo)識(shí)失敗,或者已經(jīng)擴(kuò)容完成,或推進(jìn)下標(biāo)到頭,則退出
   if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
   //當(dāng)前線程需要幫助遷移,sc值加1
   if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
    transfer(tab, nextTab);
    break;
   }
  }
  return nextTab;
 }
 return table;
}

JDK1.8 的 CHM 最主要的邏輯基本上都講完了,其它方法原理類同。1.8 的 ConcurrentHashMap 實(shí)現(xiàn)原理還是比較簡(jiǎn)單的,但是代碼實(shí)現(xiàn)比較復(fù)雜。相對(duì)于 1.7 來說,鎖的粒度降低了,效率也提高了。

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
探索 ConcurrentHashMap 高并發(fā)性的實(shí)現(xiàn)機(jī)制
面試 ConcurrentHashMap ,看這一篇就夠了!_51CTO博客_concurrenthashmap面試題
ConcurrentHashMap實(shí)現(xiàn)原理及源碼分析
HashMap?面試?我是誰?
ConcurrentHashMap
Java7/8中的HashMap和ConcurrentHashMap源碼分析,看了都說好
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服