免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
并發(fā)編程面試必備之ConcurrentHashMap源碼解析

  ConcurrentHashMap在我的面試生涯中,10次有8次是會被問到的,記得剛畢業(yè)那會,被問到ConcurrentHashMap源碼的無助與苦澀,無奈只能網(wǎng)上找了一些教程,背一背,才算是蒙混過關,當然其實這種法子是不推崇的,而且不能真正理解源碼真諦,遇到高手還是很容易就問出來的,那么我們這篇就一起來了解下ConcurrentHashMap如何成為一個理(meng)解(hun)源(guo)碼(guan)的高手,建議對Map不是很了解的可以看看我前面寫的一篇HashMap源碼解析。其實網(wǎng)上找了很多資料,尤其是并發(fā)擴容這一塊,沒找到什么有用的資料,沒辦法,只能自己慢慢啃了蠻長時間的,所以這塊也會詳細寫出來。

  ConcurrentHashMap的數(shù)據(jù)結構和HashMap基本一致,這里就不重新畫了,copy一下之前畫的圖:

                                                             

     

 

 

那么問題來了,既然兩個差不多,為啥又要搞個ConcurrentHashMap呢,這就要說到兩者之間的區(qū)別到底在哪里:

既生HashMap何生ConcurrentHashMap:

  HashMap和ConcurrentHashMap的區(qū)別說起來很多,但在我看來,最本質的區(qū)別就是HashMap是線程不安全的,而ConcurrentHashMap是線程安全的,這時候可能有人要說話了,那HashTable不也是線程安全的嗎,如果只是為了線程安全,HashTable不就行了嗎,還非要搞一個ConcurrentHashMap,看看jdk1.8這玩意代碼都多少行了。

 既生HashTable何生ConcurrentHashMap:

   HashTable這里我就不贅述了,大體上就是HashMap加了個synchronized,區(qū)別有但不是很大,但ConcurrentHashMap從1.7的segment分段鎖,到1.8的node節(jié)點鎖,那代碼復雜度那么高,doug lea圖啥呢,這里我可以先給出個不標準答案: 性能,jdk代碼風格死扣的話,你會發(fā)現(xiàn)這幫人就是變態(tài),他可以多寫十行你看不懂的代碼,就為了提升幾毫秒的,更別說ConcurrentHashMap這么多的了。

  廢話不多說,還是先看代碼吧:

初始化不聊了,put時會先去初始化,我們直接看put方法:

put:

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException(); //非空校驗
        int hash = spread(key.hashCode()); //再hash
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {  //死循環(huán)
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)  //表示當前map為空,則需初始化操作
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //表示當前下標對應沒有值
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))  //直接cas設值(其實和hashMap差不多),cas失敗則下次循環(huán)繼續(xù)
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)  //ForwardingNode節(jié)點會進入
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {    //看到?jīng)]有,這里用的是synchronized,大神也覺得synchronized的性能至少不比ReentrantLock差吧
                    if (tabAt(tab, i) == f) {  //這里校驗是為了防止加鎖前被其他線程修改了
                        if (fh >= 0) {   
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { //節(jié)點存在,直接修改
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {  //節(jié)點不存在,加載鏈表尾部
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  //紅黑樹操作
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {   //節(jié)點數(shù)
                    if (binCount >= TREEIFY_THRESHOLD) // >=8,鏈表轉紅黑樹
                        treeifyBin(tab, i);
                    if (oldVal != null)  //節(jié)點存在,直接返回舊值
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);  
        return null;
    }

put直接調用了putVal方法,putVal方法在非空校驗,再hash計算(為了更均勻分布)之后,開始自旋,做正事了,這里面寫了幾個if else :

1.判斷map是否為空,為空初始化

2.定位到下標對應的節(jié)點,為空則說明當前節(jié)點鏈表未生成,new 一個cas塞進去

3.判斷是否是ForwardingNode節(jié)點,是則需要協(xié)助擴容

4.都不是則正常塞值,判斷是鏈表還是紅黑樹,走不同新增流程

接下來判斷是否需要轉紅黑樹,最后返回,節(jié)點存在則返回舊值,否則返回null,addCount方法我們后面會詳細介紹,其實就是count加一,并判斷是否需要擴容。

我們首先來看一下初始化的邏輯:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)   //說明此時正在初始化,讓出cpu資源
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //標識此時正在初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //未設值則賦默認值16
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);  //默認0.75倍擴容
                    }
                } finally {
                    sizeCtl = sc; //擴容大小賦值
                }
                break;
            }
        }
        return tab;
    }

其實看著注釋對照代碼還是很簡單的,就是初始化一下node數(shù)組大小,及擴容因子等。

 再看協(xié)助擴容:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {  //這里就是判斷當前是否已經(jīng)在擴容中
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {  //sizectl+1
                    transfer(tab, nextTab);  //擴容
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

并發(fā)擴容:

這里主要做了一個校驗是否需要協(xié)助擴容,每多一個線程協(xié)助,sizectl+1,接下來主要看看擴容方法,這也是ConcurrentHashMap的點睛之筆:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating 說明是首個線程擴容
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //擴容兩倍新數(shù)組
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {  //這個for循環(huán)主要做并發(fā)擴容時,多線程遷移節(jié)點控制
            Node<K,V> f; int fh;
            while (advance) {  //這個自旋主要是用于控制未被分配處理的桶(鏈表)的個數(shù)
                int nextIndex, nextBound;
                if (--i >= bound || finishing)  //每次處理都減一,條件成立說明直接去擴容(只是控制下標)或已經(jīng)結束,當前線程退出擴容
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {  //成立說明已經(jīng)被分配完
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {  //cas修改下一個桶的位置,成功則更新下一個桶的位置
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {  //判斷是否結束
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)    //查不到cas查詢,這個對性能的要求已經(jīng)到令人發(fā)指的地步了吧
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)     //
                advance = true; // already processed
            else {
                synchronized (f) {  //鎖下面才是真正的節(jié)點遷移,可以發(fā)現(xiàn)這里鎖的是一個node節(jié)點
                    if (tabAt(tab, i) == f) {  //鏈表遷移
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)  //這里的計算和hashMap查不多,ln是低位節(jié)點
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            
                            //這里cas分別在高低位鏈表中,并將當前舊tab更新為forward節(jié)點
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {  //紅黑樹遷移
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

1.首個線程擴容,new 2倍大小Node數(shù)組

2.自旋并發(fā)擴容:

這里代碼寫的比較復雜,我也是花了挺長時間才理清楚這塊邏輯,我先解釋一下并發(fā)擴容的機制:

  首先一個線程過來,他是需要去看他要在哪個位置(會去讀transferIndex的值)開始遷移,一個線程負責一塊區(qū)域,多個線程這里采用的是類似跳表的機制,這里有一個擴容因子stride,最小為16,每次跳的大小就是這個擴容因子控制的。然后要去cas嘗試是否可以占用這個節(jié)點(由于沒有加鎖,所以有可能其他線程先來占了),失敗繼續(xù)跳下一個節(jié)點,直到成功,每個線程負責的區(qū)域也就是下標為初始占用的地方(bound)到bound-stride的大小,如果當前線程把自己負責的區(qū)域擴容完,發(fā)現(xiàn)原本該下一個線程負責的節(jié)點沒有被占用,則還是會繼續(xù)往下擴容,否則就繼續(xù)往下跳直到結束,后面的線程也是如此。

了解完這些我們再來看代碼,首先看while (advance)里面:

1.首先判斷--i>=bound,這個是用來控制數(shù)組下標的,每次循環(huán)-1,當大于時,說明當前線程還在自己負責的擴容范圍,繼續(xù)擴容,finishing表示擴容結束。

2.當(nextIndex = transferIndex) <= 0條件成立時,說明所有下標都有線程負責擴容了,新來的線程就不需要操心了,同時也是將當前需擴容下標賦值給nextIndex

3.cas修改下一個桶的位置(即下一個線程開始擴容的下標),成功則說明自己是第一個占領的,接下來擴容就好了,失敗則表示已經(jīng)被占了,則繼續(xù)嘗試占領下一個山頭。

再看下一個if, if (i < 0 || i >= n || i + n >= nextn) 其實就是為了判斷當前擴容是否結束,其實里面就是根據(jù)sizectl大小cas計算,然后標記finishing=true,下一輪則直接return.

再往下看:校驗(f = tabAt(tab, i)) == null,這里如果下標為空,則cas設置一個forward節(jié)點占位

再往下看:else if ((fh = f.hash) == MOVED),這個表示當前節(jié)點是已擴容節(jié)點,置為true,回到開頭的地方去循環(huán)

最后便是真正的節(jié)點遷移,這里看注釋其實就差不多了,和hashMap單節(jié)點遷移是一樣的,拆分高低位遷移。

其實這里最核心也最難理解的不是遷移,而是如何控制多線程去并發(fā)的遷移還能互不影響,這里使用了大批量的cas,只在最關鍵的地方使用了synchronized,可以說在性能方面做到了極致。

 

其實理解了上面這些,再往下看就簡單了很多,協(xié)助擴容說完了,我們再看后面其實就是走下一輪循環(huán)鏈表新增節(jié)點或紅黑樹新增節(jié)點。接下來put方法還有一個核心就是addCount,我們先看代碼:

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //這個if就是更新baseCount值,可以不關注
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) { //主要看這里
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {  //大小超出擴容參數(shù) && 數(shù)組不為空 && 長度未超出最大值,表示需要擴容
                int rs = resizeStamp(n);
                if (sc < 0) {  //表示已經(jīng)有線程在擴容/初始化
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)   //表示其他線程正在初始化
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))  // 協(xié)助擴容
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2)) //表示之前無其他線程擴容、初始化
                    transfer(tab, null);  //直接擴容
                s = sumCount();
            }
        }
    }

你會發(fā)現(xiàn)除了將baseCount+1,就是判斷是否還要擴容,協(xié)助擴容,又聯(lián)系到上面去了。

put方法大體上說完了,哇,也太復雜了,那么接下來再看看get方法是啥樣的呢:

get:

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {    //首先判斷hash計算當前下標下存在值
            if ((eh = e.hash) == h) { //這里首先看第一個節(jié)點是否就是要找的值
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)   //表示正在擴容或紅黑樹
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {  //遍歷查詢
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

當hash小于0時,有可能為正在擴容(-1)或者紅黑樹(-2),這里主要介紹一下正在擴容時的查詢,直接看ForwardingNode.find:

Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {  //首先將nextTable賦值給tab
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)  說明當前節(jié)點為空
                    return null;
                for (;;) {  
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))  //首節(jié)點等于要查詢key
                        return e;
                    if (eh < 0) {  
                        if (e instanceof ForwardingNode) {  //這種情況暫時還沒想到有什么情況會遇到
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else   //表示正在擴容的節(jié)點為紅黑樹
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)  //查不到返回null
                        return null;
                }
            }
        }

相比上面擴容,這些代碼確實很簡單了,全程沒用鎖,但也保證了并發(fā)及擴容時可以查到,這時候就有一個問題來了,那我如何能保證這些數(shù)據(jù)我都能實時查詢到呢,難道就不會出現(xiàn)數(shù)據(jù)在cpu緩存沒刷出來導致我查詢時有問題嗎,當然,doug lea大神肯定是會考慮到這些的,他用簡簡單單的一個volatile關鍵字完成了這一切,volatile關鍵字保證了數(shù)據(jù)的可見性,也就保證了我前面問題的并發(fā)安全,后續(xù)我也會補一章volitile的文章。

本站僅提供存儲服務,所有內容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權內容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
【死磕Java并發(fā)】—–J.U.C之Java并發(fā)容器:ConcurrentHashMap
Java 7 / 8 中的 HashMap 和 ConcurrentHashMap 全解析( 下 )...
深入解析ConcurrentHashMap:感受并發(fā)編程智慧
我就知道面試官接下來要問我 ConcurrentHashMap 底層原理了
深入淺出ConcurrentHashMap1.8
談談 ConcurrentHashMap1.7 和 1.8 的不同實現(xiàn)
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服