現實生活中我們經常會遇到這樣的情景,在進行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅游時要等全部人都到齊了才出發,比賽時要等運動員都上場后才開始。在juc包中為我們提供了一個同步工具類能夠很好的模擬這類場景,它就是cyclicbarrier類。利用cyclicbarrier類可以實現一組線程相互等待,當所有線程都到達某個屏障點后再進行后續的操作。下圖演示了這一過程。
在cyclicbarrier類的內部有一個計數器,每個線程在到達屏障點的時候都會調用await方法將自己阻塞,此時計數器會減1,當計數器減為0的時候所有因調用await方法而被阻塞的線程將被喚醒。這就是實現一組線程相互等待的原理,下面我們先看看cyclicbarrier有哪些成員變量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//同步操作鎖 private final reentrantlock lock = new reentrantlock(); //線程攔截器 private final condition trip = lock.newcondition(); //每次攔截的線程數 private final int parties; //換代前執行的任務 private final runnable barriercommand; //表示柵欄的當前代 private generation generation = new generation(); //計數器 private int count; //靜態內部類generation private static class generation { boolean broken = false ; } |
上面貼出了cyclicbarrier所有的成員變量,可以看到cyclicbarrier內部是通過條件隊列trip來對線程進行阻塞的,并且其內部維護了兩個int型的變量parties和count,parties表示每次攔截的線程數,該值在構造時進行賦值。count是內部計數器,它的初始值和parties相同,以后隨著每次await方法的調用而減1,直到減為0就將所有線程喚醒。cyclicbarrier有一個靜態內部類generation,該類的對象代表柵欄的當前代,就像玩游戲時代表的本局游戲,利用它可以實現循環等待。barriercommand表示換代前執行的任務,當count減為0時表示本局游戲結束,需要轉到下一局。在轉到下一局游戲之前會將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定barriercommand來執行自己的任務。接下來我們看看它的構造器。
1
2
3
4
5
6
7
8
9
10
11
12
|
//構造器1 public cyclicbarrier( int parties, runnable barrieraction) { if (parties <= 0 ) throw new illegalargumentexception(); this .parties = parties; this .count = parties; this .barriercommand = barrieraction; } //構造器2 public cyclicbarrier( int parties) { this (parties, null ); } |
cyclicbarrier有兩個構造器,其中構造器1是它的核心構造器,在這里你可以指定本局游戲的參與者數量(要攔截的線程數)以及本局結束時要執行的任務,還可以看到計數器count的初始值被設置為parties。cyclicbarrier類最主要的功能就是使先到達屏障點的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//非定時等待 public int await() throws interruptedexception, brokenbarrierexception { try { return dowait( false , 0l); } catch (timeoutexception toe) { throw new error(toe); } } //定時等待 public int await( long timeout, timeunit unit) throws interruptedexception, brokenbarrierexception, timeoutexception { return dowait( true , unit.tonanos(timeout)); } |
可以看到不管是定時等待還是非定時等待,它們都調用了dowait方法,只不過是傳入的參數不同而已。下面我們就來看看dowait方法都做了些什么。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
//核心等待方法 private int dowait( boolean timed, long nanos) throws interruptedexception, brokenbarrierexception, timeoutexception { final reentrantlock lock = this .lock; lock.lock(); try { final generation g = generation; //檢查當前柵欄是否被打翻 if (g.broken) { throw new brokenbarrierexception(); } //檢查當前線程是否被中斷 if (thread.interrupted()) { //如果當前線程被中斷會做以下三件事 //1.打翻當前柵欄 //2.喚醒攔截的所有線程 //3.拋出中斷異常 breakbarrier(); throw new interruptedexception(); } //每次都將計數器的值減1 int index = --count; //計數器的值減為0則需喚醒所有線程并轉換到下一代 if (index == 0 ) { boolean ranaction = false ; try { //喚醒所有線程前先執行指定的任務 final runnable command = barriercommand; if (command != null ) { command.run(); } ranaction = true ; //喚醒所有線程并轉到下一代 nextgeneration(); return 0 ; } finally { //確保在任務未成功執行時能將所有線程喚醒 if (!ranaction) { breakbarrier(); } } } //如果計數器不為0則執行此循環 for (;;) { try { //根據傳入的參數來決定是定時等待還是非定時等待 if (!timed) { trip.await(); } else if (nanos > 0l) { nanos = trip.awaitnanos(nanos); } } catch (interruptedexception ie) { //若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程 if (g == generation && ! g.broken) { breakbarrier(); throw ie; } else { //若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接調用中斷操作 thread.currentthread().interrupt(); } } //如果線程因為打翻柵欄操作而被喚醒則拋出異常 if (g.broken) { throw new brokenbarrierexception(); } //如果線程因為換代操作而被喚醒則返回計數器的值 if (g != generation) { return index; } //如果線程因為時間到了而被喚醒則打翻柵欄并拋出異常 if (timed && nanos <= 0l) { breakbarrier(); throw new timeoutexception(); } } } finally { lock.unlock(); } } |
上面貼出的代碼中注釋都比較詳細,我們只挑一些重要的來講。可以看到在dowait方法中每次都將count減1,減完后立馬進行判斷看看是否等于0,如果等于0的話就會先去執行之前指定好的任務,執行完之后再調用nextgeneration方法將柵欄轉到下一代,在該方法中會將所有線程喚醒,將計數器的值重新設為parties,最后會重新設置柵欄代次,在執行完nextgeneration方法之后就意味著游戲進入下一局。如果計數器此時還不等于0的話就進入for循環,根據參數來決定是調用trip.awaitnanos(nanos)還是trip.await()方法,這兩方法對應著定時和非定時等待。如果在等待過程中當前線程被中斷就會執行breakbarrier方法,該方法叫做打破柵欄,意味著游戲在中途被掐斷,設置generation的broken狀態為true并喚醒所有線程。同時這也說明在等待過程中有一個線程被中斷整盤游戲就結束,所有之前被阻塞的線程都會被喚醒。線程醒來后會執行下面三個判斷,看看是否因為調用breakbarrier方法而被喚醒,如果是則拋出異常;看看是否是正常的換代操作而被喚醒,如果是則返回計數器的值;看看是否因為超時而被喚醒,如果是的話就調用breakbarrier打破柵欄并拋出異常。這里還需要注意的是,如果其中有一個線程因為等待超時而退出,那么整盤游戲也會結束,其他線程都會被喚醒。下面貼出nextgeneration方法和breakbarrier方法的具體代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
//切換柵欄到下一代 private void nextgeneration() { //喚醒條件隊列所有線程 trip.signalall(); //設置計數器的值為需要攔截的線程數 count = parties; //重新設置柵欄代次 generation = new generation(); } //打翻當前柵欄 private void breakbarrier() { //將當前柵欄狀態設置為打翻 generation.broken = true ; //設置計數器的值為需要攔截的線程數 count = parties; //喚醒所有線程 trip.signalall(); } |
上面我們已經通過源碼將cyclicbarrier的原理基本都講清楚了,下面我們就通過一個賽馬的例子來深入掌握它的使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
class horse implements runnable { private static int counter = 0 ; private final int id = counter++; private int strides = 0 ; private static random rand = new random( 47 ); private static cyclicbarrier barrier; public horse(cyclicbarrier b) { barrier = b; } @override public void run() { try { while (!thread.interrupted()) { synchronized ( this ) { //賽馬每次隨機跑幾步 strides += rand.nextint( 3 ); } barrier.await(); } } catch (exception e) { e.printstacktrace(); } } public string tracks() { stringbuilder s = new stringbuilder(); for ( int i = 0 ; i < getstrides(); i++) { s.append( "*" ); } s.append(id); return s.tostring(); } public synchronized int getstrides() { return strides; } public string tostring() { return "horse " + id + " " ; } } public class horserace implements runnable { private static final int finish_line = 75 ; private static list<horse> horses = new arraylist<horse>(); private static executorservice exec = executors.newcachedthreadpool(); @override public void run() { stringbuilder s = new stringbuilder(); //打印賽道邊界 for ( int i = 0 ; i < finish_line; i++) { s.append( "=" ); } system.out.println(s); //打印賽馬軌跡 for (horse horse : horses) { system.out.println(horse.tracks()); } //判斷是否結束 for (horse horse : horses) { if (horse.getstrides() >= finish_line) { system.out.println(horse + "won!" ); exec.shutdownnow(); return ; } } //休息指定時間再到下一輪 try { timeunit.milliseconds.sleep( 200 ); } catch (interruptedexception e) { system.out.println( "barrier-action sleep interrupted" ); } } public static void main(string[] args) { cyclicbarrier barrier = new cyclicbarrier( 7 , new horserace()); for ( int i = 0 ; i < 7 ; i++) { horse horse = new horse(barrier); horses.add(horse); exec.execute(horse); } } } |
該賽馬程序主要是通過在控制臺不停的打印各賽馬的當前軌跡,以此達到動態顯示的效果。整場比賽有多個輪次,每一輪次各個賽馬都會隨機走上幾步然后調用await方法進行等待,當所有賽馬走完一輪的時候將會執行任務將所有賽馬的當前軌跡打印到控制臺上。這樣每一輪下來各賽馬的軌跡都在不停的增長,當其中某個賽馬的軌跡最先增長到指定的值的時候將會結束整場比賽,該賽馬成為整場比賽的勝利者!程序的運行結果如下:
至此我們難免會將cyclicbarrier與countdownlatch進行一番比較。這兩個類都可以實現一組線程在到達某個條件之前進行等待,它們內部都有一個計數器,當計數器的值不斷的減為0的時候所有阻塞的線程將會被喚醒。有區別的是cyclicbarrier的計數器由自己控制,而countdownlatch的計數器則由使用者來控制,在cyclicbarrier中線程調用await方法不僅會將自己阻塞還會將計數器減1,而在countdownlatch中線程調用await方法只是將自己阻塞而不會減少計數器的值。另外,countdownlatch只能攔截一輪,而cyclicbarrier可以實現循環攔截。一般來說用cyclicbarrier可以實現countdownlatch的功能,而反之則不能,例如上面的賽馬程序就只能使用cyclicbarrier來實現。總之,這兩個類的異同點大致如此,至于何時使用cyclicbarrier,何時使用countdownlatch,還需要讀者自己去拿捏。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/liuyun1995/p/8529360.html