免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
深入理解Java線程池:ThreadPoolExecutor

作者:idea buffer

http://www.ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/

線程池介紹

在web開發(fā)中,服務(wù)器需要接受并處理請(qǐng)求,所以會(huì)為一個(gè)請(qǐng)求來分配一個(gè)線程來進(jìn)行處理。如果每次請(qǐng)求都新創(chuàng)建一個(gè)線程的話實(shí)現(xiàn)起來非常簡(jiǎn)便,但是存在一個(gè)問題:

如果并發(fā)的請(qǐng)求數(shù)量非常多,但每個(gè)線程執(zhí)行的時(shí)間很短,這樣就會(huì)頻繁的創(chuàng)建和銷毀線程,如此一來會(huì)大大降低系統(tǒng)的效率??赡艹霈F(xiàn)服務(wù)器在為每個(gè)請(qǐng)求創(chuàng)建新線程和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源更多。

那么有沒有一種辦法使執(zhí)行完一個(gè)任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟搅硕鄠€(gè)任務(wù)上。

什么時(shí)候使用線程池?
  • 單個(gè)任務(wù)處理時(shí)間比較短

  • 需要處理的任務(wù)數(shù)量很大

使用線程池的好處

引用自 http://ifeve.com/java-threadpool/ 的說明:

  • 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。

  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。

Java中的線程池是用ThreadPoolExecutor類來實(shí)現(xiàn)的. 本文就結(jié)合JDK 1.8對(duì)該類的源碼來分析一下這個(gè)類內(nèi)部對(duì)于線程的創(chuàng)建, 管理以及后臺(tái)任務(wù)的調(diào)度等方面的執(zhí)行原理。

先看一下線程池的類圖:

Executor框架接口

Executor框架是一個(gè)根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架,目的是提供一種將”任務(wù)提交”與”任務(wù)如何運(yùn)行”分離開來的機(jī)制。

J.U.C中有三個(gè)Executor接口:
  • Executor:一個(gè)運(yùn)行新任務(wù)的簡(jiǎn)單接口;

  • ExecutorService:擴(kuò)展了Executor接口。添加了一些用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法;

  • ScheduledExecutorService:擴(kuò)展了ExecutorService。支持Future和定期執(zhí)行任務(wù)。

Executor接口

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor接口只有一個(gè)execute方法,用來替代通常創(chuàng)建或啟動(dòng)線程的方法。例如,使用Thread來創(chuàng)建并啟動(dòng)線程的代碼如下:

1
2
Thread t = new Thread();
t.start();

使用Executor來啟動(dòng)線程執(zhí)行任務(wù)的代碼如下:

1
2
Thread t = new Thread();
executor.execute(t);

對(duì)于不同的Executor實(shí)現(xiàn),execute()方法可能是創(chuàng)建一個(gè)新線程并立即啟動(dòng),也有可能是使用已有的工作線程來運(yùn)行傳入的任務(wù),也可能是根據(jù)設(shè)置線程池的容量或者阻塞隊(duì)列的容量來決定是否要將傳入的線程放入阻塞隊(duì)列中或者拒絕接收傳入的線程。

ExecutorService接口

ExecutorService接口繼承自Executor接口,提供了管理終止的方法,以及可為跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即時(shí)關(guān)閉,也就是shutDownNow()方法,則任務(wù)需要正確處理中斷。

ScheduledExecutorService接口

ScheduledExecutorService擴(kuò)展ExecutorService接口并增加了schedule方法。調(diào)用schedule方法可以在指定的延時(shí)后執(zhí)行一個(gè)Runnable或者Callable任務(wù)。ScheduledExecutorService接口還定義了按照指定時(shí)間間隔定期執(zhí)行任務(wù)的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也是實(shí)現(xiàn)了ExecutorService接口。

幾個(gè)重要的字段

ctl是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值,大約是5億。

下面再介紹下線程池的運(yùn)行狀態(tài). 線程池一共有五種狀態(tài), 分別是:

  1. RUNNING :能接受新提交的任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù);

  2. SHUTDOWN:關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)。在線程池處于 RUNNING 狀態(tài)時(shí),調(diào)用 shutdown()方法會(huì)使線程池進(jìn)入到該狀態(tài)。(finalize() 方法在執(zhí)行過程中也會(huì)調(diào)用shutdown()方法進(jìn)入該狀態(tài));

  3. STOP:不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài);

  4. TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)。

  5. TERMINATED:在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài),默認(rèn)terminated()方法中什么也沒有做。
    進(jìn)入TERMINATED的條件如下:

    • 線程池不是RUNNING狀態(tài);

    • 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);

    • 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;

    • workerCount為0;

    • 設(shè)置TIDYING狀態(tài)成功。

下圖為線程池的狀態(tài)轉(zhuǎn)換過程:

ctl相關(guān)方法

這里還有幾個(gè)對(duì)ctl進(jìn)行計(jì)算的方法:

  • runStateOf:獲取運(yùn)行狀態(tài);

  • workerCountOf:獲取活動(dòng)線程數(shù);

  • ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。

ThreadPoolExecutor構(gòu)造方法

構(gòu)造方法中的字段含義如下:

  • corePoolSize:核心線程數(shù)量,當(dāng)有新任務(wù)在execute()方法提交時(shí),會(huì)執(zhí)行以下判斷:
    所以,任務(wù)提交時(shí),判斷的順序?yàn)?corePoolSize –> workQueue –> maximumPoolSize。
    1. 如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理任務(wù),即使線程池中的其他線程是空閑的;

    2. 如果線程池中的線程數(shù)量大于等于 corePoolSize 且小于 maximumPoolSize,則只有當(dāng)workQueue滿時(shí)才創(chuàng)建新的線程去處理任務(wù);

    3. 如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池的大小是固定的,這時(shí)如果有新任務(wù)提交,若workQueue未滿,則將請(qǐng)求放入workQueue中,等待有空閑的線程去從workQueue中取任務(wù)并處理;

    4. 如果運(yùn)行的線程數(shù)量大于等于maximumPoolSize,這時(shí)如果workQueue已經(jīng)滿了,則通過handler所指定的策略來處理任務(wù);

  • maximumPoolSize:最大線程數(shù)量;

  • workQueue:等待隊(duì)列,當(dāng)任務(wù)提交時(shí),如果線程池中的線程數(shù)量大于等于corePoolSize的時(shí)候,把該任務(wù)封裝成一個(gè)Worker對(duì)象放入等待隊(duì)列;

  • workQueue:保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,當(dāng)提交一個(gè)新的任務(wù)到線程池以后, 線程池會(huì)根據(jù)當(dāng)前線程池中正在運(yùn)行著的線程的數(shù)量來決定對(duì)該任務(wù)的處理方式,主要有以下幾種處理方式:

    • 如果要想降低系統(tǒng)資源的消耗(包括CPU的使用率,操作系統(tǒng)資源的消耗,上下文環(huán)境切換的開銷等), 可以設(shè)置較大的隊(duì)列容量和較小的線程池容量, 但這樣也會(huì)降低線程處理任務(wù)的吞吐量。

    • 如果提交的任務(wù)經(jīng)常發(fā)生阻塞,那么可以考慮通過調(diào)用 setMaximumPoolSize() 方法來重新設(shè)定線程池的容量。

    • 如果隊(duì)列的容量設(shè)置的較小,通常需要將線程池的容量設(shè)置大一點(diǎn),這樣CPU的使用率會(huì)相對(duì)的高一些。但如果線程池的容量設(shè)置的過大,則在提交的任務(wù)數(shù)量太多的情況下,并發(fā)量會(huì)增加,那么線程之間的調(diào)度就是一個(gè)要考慮的問題,因?yàn)檫@樣反而有可能降低處理任務(wù)的吞吐量。

    1. 直接切換:這種方式常用的隊(duì)列是SynchronousQueue,但現(xiàn)在還沒有研究過該隊(duì)列,這里暫時(shí)還沒法介紹;

    2. 使用無界隊(duì)列:一般使用基于鏈表的阻塞隊(duì)列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創(chuàng)建的最大線程數(shù)就是corePoolSize,而maximumPoolSize就不會(huì)起作用了(后面也會(huì)說到)。當(dāng)線程池中所有的核心線程都是RUNNING狀態(tài)時(shí),這時(shí)一個(gè)新的任務(wù)提交就會(huì)放入等待隊(duì)列中。

    3. 使用有界隊(duì)列:一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數(shù)量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時(shí)這種方式也使得線程池對(duì)線程的調(diào)度變得更困難,因?yàn)榫€程池和隊(duì)列的容量都是有限的值,所以要想使線程池處理任務(wù)的吞吐率達(dá)到一個(gè)相對(duì)合理的范圍,又想使線程調(diào)度相對(duì)簡(jiǎn)單,并且還要盡可能的降低線程池對(duì)資源的消耗,就需要合理的設(shè)置這兩個(gè)數(shù)量。

  • keepAliveTime:線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過了keepAliveTime;

  • threadFactory:它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè)置了線程的名稱。

  • handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊(duì)列滿了并且沒有空閑的線程,這時(shí)如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:

    1. AbortPolicy:直接拋出異常,這是默認(rèn)策略;

    2. CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);

    3. DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);

    4. DiscardPolicy:直接丟棄任務(wù);

execute方法

execute()方法用來提交任務(wù),代碼如下:

簡(jiǎn)單來說,在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí),的執(zhí)行過程如下:
  1. 如果workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù);

  2. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中;

  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù);

  4. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。

這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個(gè)線程,但并沒有傳入任務(wù),因?yàn)槿蝿?wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時(shí)候,會(huì)直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時(shí)執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個(gè)線程來執(zhí)行任務(wù)。

execute方法執(zhí)行流程如下:

addWorker方法

addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize,代碼如下:

注意一下這里的t.start()這個(gè)語句,啟動(dòng)時(shí)會(huì)調(diào)用Worker類中的run方法,Worker本身實(shí)現(xiàn)了Runnable接口,所以一個(gè)Worker類型的對(duì)象也是一個(gè)線程。

Worker類

線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組Worker對(duì)象,看一下Worker的定義:

Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。
在調(diào)用構(gòu)造方法時(shí),需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個(gè)線程,newThread方法傳入的參數(shù)是this,因?yàn)閃orker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè)Worker對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用ReentrantLock來實(shí)現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
  1. lock方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;

  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;

  3. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù),這時(shí)可以對(duì)該線程進(jìn)行中斷;

  4. 線程池在執(zhí)行shutdown方法或tryTerminate方法時(shí)會(huì)調(diào)用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài);

  5. 之所以設(shè)置為不可重入,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像setCorePoolSize這樣的線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務(wù)中調(diào)用了如setCorePoolSize這類線程池控制的方法,會(huì)中斷正在運(yùn)行的線程。
所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。

此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢?是因?yàn)锳QS中默認(rèn)的state是0,如果剛創(chuàng)建了一個(gè)Worker對(duì)象,還沒有執(zhí)行任務(wù)時(shí),這時(shí)就不應(yīng)該被中斷,看一下tryAquire方法:

tryAcquire方法是根據(jù)state是否是0來判斷的,所以,setState(-1);將state設(shè)置為-1是為了禁止在執(zhí)行任務(wù)前對(duì)線程進(jìn)行中斷。
正因?yàn)槿绱?,在runWorker方法中會(huì)先調(diào)用Worker對(duì)象的unlock方法將state設(shè)置為0.

runWorker方法

在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:

這里說明一下第一個(gè)if判斷,目的是:
  • 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài);

  • 如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài);

這里要考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會(huì)把狀態(tài)設(shè)置為STOP,回顧一下STOP狀態(tài):

不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài)。

STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態(tài)時(shí)線程是非中斷狀態(tài)的,因?yàn)門hread.interrupted()方法會(huì)復(fù)位中斷的狀態(tài)。

總結(jié)一下runWorker方法的執(zhí)行過程:

  1. while循環(huán)不斷地通過getTask()方法獲取任務(wù);

  2. getTask()方法從阻塞隊(duì)列中取任務(wù);

  3. 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài);

  4. 調(diào)用task.run()執(zhí)行任務(wù);

  5. 如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;

  6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實(shí)現(xiàn)。
completedAbruptly變量來表示在執(zhí)行任務(wù)過程中是否出現(xiàn)了異常,在processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷。

getTask方法

getTask方法用來從阻塞隊(duì)列中取任務(wù),代碼如下:

這里重要的地方是第二個(gè)if判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時(shí),則可以增加工作線程,但這時(shí)如果超時(shí)沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。
什么時(shí)候會(huì)銷毀?當(dāng)然是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收。
getTask方法返回null時(shí),在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行processWorkerExit方法。

processWorkerExit方法

至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個(gè)工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入processWorkerExit方法,整個(gè)線程結(jié)束,如圖所示:

tryTerminate方法

tryTerminate方法根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池,代碼如下:

interruptIdleWorkers(ONLY_ONE);的作用是因?yàn)樵趃etTask方法中執(zhí)行workQueue.take()時(shí),如果不執(zhí)行中斷會(huì)一直阻塞。在下面介紹的shutdown方法中,會(huì)中斷所有空閑的工作線程,如果在執(zhí)行shutdown時(shí)工作線程沒有空閑,然后又去調(diào)用了getTask方法,這時(shí)如果workQueue中沒有任務(wù)了,調(diào)用workQueue.take()時(shí)就會(huì)一直阻塞。所以每次在工作線程結(jié)束時(shí)調(diào)用tryTerminate方法來嘗試中斷一個(gè)空閑工作線程,避免在隊(duì)列為空時(shí)取任務(wù)一直阻塞的情況。

shutdown方法

shutdown方法要將線程池切換到SHUTDOWN狀態(tài),并調(diào)用interruptIdleWorkers方法請(qǐng)求中斷所有空閑的worker,最后調(diào)用tryTerminate嘗試結(jié)束線程池。

這里思考一個(gè)問題:在runWorker方法中,執(zhí)行任務(wù)時(shí)對(duì)Worker對(duì)象w進(jìn)行了lock操作,為什么要在執(zhí)行任務(wù)的時(shí)候?qū)γ總€(gè)工作線程都加鎖呢?
下面仔細(xì)分析一下:
  • 在getTask方法中,如果這時(shí)線程池的狀態(tài)是SHUTDOWN并且workQueue為空,那么就應(yīng)該返回null來結(jié)束這個(gè)工作線程,而使線程池進(jìn)入SHUTDOWN狀態(tài)需要調(diào)用shutdown方法;

  • shutdown方法會(huì)調(diào)用interruptIdleWorkers來中斷空閑的線程,interruptIdleWorkers持有mainLock,會(huì)遍歷workers來逐個(gè)判斷工作線程是否空閑。但getTask方法中沒有mainLock;

  • 在getTask中,如果判斷當(dāng)前線程池狀態(tài)是RUNNING,并且阻塞隊(duì)列為空,那么會(huì)調(diào)用workQueue.take()進(jìn)行阻塞;

  • 如果在判斷當(dāng)前線程池狀態(tài)是RUNNING后,這時(shí)調(diào)用了shutdown方法把狀態(tài)改為了SHUTDOWN,這時(shí)如果不進(jìn)行中斷,那么當(dāng)前的工作線程在調(diào)用了workQueue.take()后會(huì)一直阻塞而不會(huì)被銷毀,因?yàn)樵赟HUTDOWN狀態(tài)下不允許再有新的任務(wù)添加到workQueue中,這樣一來線程池永遠(yuǎn)都關(guān)閉不了了;

  • 由上可知,shutdown方法與getTask方法(從隊(duì)列中獲取任務(wù)時(shí))存在競(jìng)態(tài)條件;

  • 解決這一問題就需要用到線程的中斷,也就是為什么要用interruptIdleWorkers方法。在調(diào)用workQueue.take()時(shí),如果發(fā)現(xiàn)當(dāng)前線程在執(zhí)行之前或者執(zhí)行期間是中斷狀態(tài),則會(huì)拋出InterruptedException,解除阻塞的狀態(tài);

  • 但是要中斷工作線程,還要判斷工作線程是否是空閑的,如果工作線程正在處理任務(wù),就不應(yīng)該發(fā)生中斷;

  • 所以Worker繼承自AQS,在工作線程處理任務(wù)時(shí)會(huì)進(jìn)行l(wèi)ock,interruptIdleWorkers在進(jìn)行中斷時(shí)會(huì)使用tryLock來判斷該工作線程是否正在處理任務(wù),如果tryLock返回true,說明該工作線程當(dāng)前未執(zhí)行任務(wù),這時(shí)才可以被中斷。
下面就來分析一下interruptIdleWorkers方法。

interruptIdleWorkers方法

interruptIdleWorkers遍歷workers中所有的工作線程,若線程沒有被中斷tryLock成功,就中斷該線程。
為什么需要持有mainLock?因?yàn)閣orkers是HashSet類型的,不能保證線程安全。

shutdownNow方法

shutdownNow方法與shutdown方法類似,不同的地方在于:
  1. 設(shè)置狀態(tài)為STOP;

  2. 中斷所有工作線程,無論是否是空閑的;

  3. 取出阻塞隊(duì)列中沒有被執(zhí)行的任務(wù)并返回。
shutdownNow方法執(zhí)行完之后調(diào)用tryTerminate方法,該方法在上文已經(jīng)分析過了,目的就是使線程池的狀態(tài)設(shè)置為TERMINATED。

線程池的監(jiān)控

通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用
  • getTaskCount:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù);

  • getCompletedTaskCount:線程池已完成的任務(wù)數(shù)量,該值小于等于taskCount;

  • getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過,也就是達(dá)到了maximumPoolSize;

  • getPoolSize:線程池當(dāng)前的線程數(shù)量;

  • getActiveCount:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量。
通過這些方法,可以對(duì)線程池進(jìn)行監(jiān)控,在ThreadPoolExecutor類中提供了幾個(gè)空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以擴(kuò)展這些方法在執(zhí)行前或執(zhí)行后增加一些新的操作,例如統(tǒng)計(jì)線程池的執(zhí)行任務(wù)的時(shí)間等,可以繼承自ThreadPoolExecutor來進(jìn)行擴(kuò)展。

總結(jié)

本文比較詳細(xì)的分析了線程池的工作流程,總體來說有如下幾個(gè)內(nèi)容:
  • 分析了線程的創(chuàng)建,任務(wù)的提交,狀態(tài)的轉(zhuǎn)換以及線程池的關(guān)閉;

  • 這里通過execute方法來展開線程池的工作流程,execute方法通過corePoolSize,maximumPoolSize以及阻塞隊(duì)列的大小來判斷決定傳入的任務(wù)應(yīng)該被立即執(zhí)行,還是應(yīng)該添加到阻塞隊(duì)列中,還是應(yīng)該拒絕任務(wù)。

  • 介紹了線程池關(guān)閉時(shí)的過程,也分析了shutdown方法與getTask方法存在競(jìng)態(tài)條件;

  • 在獲取任務(wù)時(shí),要通過線程池的狀態(tài)來判斷應(yīng)該結(jié)束工作線程還是阻塞線程等待新的任務(wù),也解釋了為什么關(guān)閉線程池時(shí)要中斷工作線程以及為什么每一個(gè)worker都需要lock。

在向線程池提交任務(wù)時(shí),除了execute方法,還有一個(gè)submit方法,submit方法會(huì)返回一個(gè)Future對(duì)象用于獲取返回值,有關(guān)Future和Callable請(qǐng)自行了解一下相關(guān)的文章,這里就不介紹了。

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Java線程池核心ThreadPoolExecutor的使用和原理分析
2 w字長(zhǎng)文帶你深入理解線程池
Java并發(fā)編程:線程池的使用
深入分析線程池的實(shí)現(xiàn)原理(文末送書)
【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor
面試官:你在項(xiàng)目中用過 多線程 嗎?
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服