CyclicBarrier是java推出的一個(gè)并發(fā)編程工具,它用在多個(gè)線程之間協(xié)同工作。線程約定到達(dá)某個(gè)點(diǎn),到達(dá)這個(gè)點(diǎn)之后的線程都停下來,直到最后一個(gè)線程也到達(dá)了這個(gè)點(diǎn)之后,所有的線程才會(huì)得到釋放。常用的場景是:多個(gè)worker線程,每個(gè)線程都在循環(huán)地做一部分工作,并在最后用cyclicBarrier.await()設(shè)下約定點(diǎn),當(dāng)最后一個(gè)線程做完了工作也到達(dá)約定點(diǎn)后,所有線程得到釋放,開始下一輪工作。也就是下面這樣:
1 while(!done()){2 //working3 cyclicBarrier.await();4 }
CyclicBarrier還支持一個(gè)回調(diào)函數(shù),每當(dāng)一輪工作結(jié)束后,下一輪工作開始前,這個(gè)回調(diào)函數(shù)都會(huì)被調(diào)用一次。
但是,使用CyclicBarrier必須準(zhǔn)守最佳實(shí)踐的使用方法,否則,就可能達(dá)不到想要的效果。比如,下面這樣,就是一種典型的錯(cuò)誤使用方法:
private void process(CyclicBarrier cyclicBarrier) { final int n = 100; Runnable worker= new Runnable() { @Override public void run() { try { //模擬工作 Thread.sleep(3000); } catch (InterruptedException ex) { ex.printStackTrace(); } try { cyclicBarrier.await(); } catch (BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } System.out.println("Worker is done"); System.out.println("Thread of Worker is " Thread.currentThread().getId()); }; for (int i = 0; i < n; i ) { Thread t1 = new Thread(worker); Thread t2 = new Thread(worker); t1.start(); t2.start(); } }
在上面的代碼中,工作不在worker線程中循環(huán),而是在開啟工作的線程中循環(huán),也就是說,它會(huì)不斷地開啟新的worker線程。這會(huì)導(dǎo)致的一個(gè)問題是,上一輪的回調(diào)還沒執(zhí)行完成,下一輪的工作就已經(jīng)開始了。
那么為什么呢?下面來分析一下原因。
首先,要知道CyclicBarrier是如何做到在上一輪工作結(jié)束后下一輪工作開始前執(zhí)行回調(diào)函數(shù)的。查看jdoc穩(wěn)定,里面有這么一句話“A?CyclicBarrier?supports an optional?Runnable
?command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.?”這是描述回調(diào)函數(shù)的,從描述中可以看到,回調(diào)函數(shù)是在最后一個(gè)線程到達(dá)約定點(diǎn)后,線程釋放前被執(zhí)行的。也就是說,回調(diào)函數(shù)的執(zhí)行時(shí)間發(fā)生在下一輪工作前,這是通過在執(zhí)行完回調(diào)函數(shù)再釋放工作線程來實(shí)現(xiàn)的。
然后,我們再來看看上面錯(cuò)誤的使用方法。在錯(cuò)誤的使用方法中,主線程的每一輪循環(huán)中都開啟了新的worker線程,這樣在回調(diào)函數(shù)結(jié)束之前,前面開啟的worker線程確實(shí)沒有得到釋放,但是,新開啟的工作線程卻完全可以執(zhí)行下一輪工作,這就是為什么在回調(diào)函數(shù)執(zhí)行完畢之前,新一輪的工作就已經(jīng)開始了的原因。并且,錯(cuò)誤方法中的每一個(gè)工作線程只執(zhí)行一輪工作就結(jié)束了,每一輪工作之間的線程互不影響,這也就失去了協(xié)作性,因此,千萬要避免寫出這種代碼。
關(guān)于CyclicBarrier使用的最佳時(shí)間,基本上就是官方示例中的用法了,如下:
1 class Solver { 2 final int N; 3 final float[][] data; 4 final CyclicBarrier barrier; 5 6 class Worker implements Runnable { 7 int myRow; 8 Worker(int row) { myRow = row; } 9 public void run() {10 while (!done()) {11 processRow(myRow);12 13 try {14 barrier.await();15 } catch (InterruptedException ex) {16 return;17 } catch (BrokenBarrierException ex) {18 return;19 }20 }21 }22 }23 24 public Solver(float[][] matrix) {25 data = matrix;26 N = matrix.length;27 barrier = new CyclicBarrier(N,28 new Runnable() {29 public void run() {30 mergeRows(...);31 }32 });33 for (int i = 0; i < N; i)34 new Thread(new Worker(i)).start();35 36 waitUntilDone();37 }38 }
最后在有一個(gè)問題是,回調(diào)函數(shù)是在哪一個(gè)線程里執(zhí)行的?
根據(jù)我的demo測試發(fā)現(xiàn),是在第一個(gè)到達(dá)的線程中執(zhí)行的。當(dāng)然,官方并沒有明確規(guī)定這一點(diǎn),也許以后會(huì)有變化吧,所以,我們也不能以來這一特征。我的demo如下:
public class Demo1 { public static main(String[] args){ Demo1 demo = new Demo1(); demo1.showInfThreadWhenDirectly(); }
private void process(CyclicBarrier cyclicBarrier) { final int n = 100; Runnable worker= new Runnable() { @Override public void run() { for (int i = 0; i < n; i ) { try { Thread.sleep(3000); } catch (InterruptedException ex) { ex.printStackTrace(); } try { int arrival_index=cyclicBarrier.await(); if(0==arrival_index){ System.out.println("first arrival Thread in this iteration is: " Thread.currentThread().getId()); } } catch (BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } System.out.println("Worker is done"); System.out.println("Thread of Worker is " Thread.currentThread().getId()); } }; Thread t1 = new Thread(worker); Thread t2 = new Thread(worker); t1.start(); t2.start(); } public void showInfThreadWhenDirectly(){ CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("[Directly] Thread in invert call function is" Thread.currentThread().getId())); process(cyclicBarrier); System.out.println("[Directly] main Thread is " Thread.currentThread().getId()); }}
輸出結(jié)果如下:
[Directly] main Thread is 1[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is11first arrival Thread in this iteration is: 11[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is10first arrival Thread in this iteration is: 10[Directly] Thread in invert call function is11first arrival Thread in this iteration is: 11
?
另外,官方還有一段:“
If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of?
await()
?returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:if (barrier.await() == 0) { // log the completion of this iteration }?”
意思是說,如果回調(diào)動(dòng)作“arrier action”不需要在所有工作線程都停止的狀態(tài)下執(zhí)行的話,那么可以隨便找一個(gè)工作線程去做這個(gè)動(dòng)作。為了支持這個(gè),CyclicBarrier 的await( )方法有一個(gè)返回值,返回的就是當(dāng)前線程是第幾個(gè)到達(dá)約定點(diǎn)(barrier)的。
來源:http://www.icode9.com/content-4-162451.html