一、前言
CountDownLatch維護了一個計數(shù)器(還是是state字段),調(diào)用countDown方法會將計數(shù)器減1,調(diào)用await方法會阻塞線程直到計數(shù)器變?yōu)?。可以用于實現(xiàn)一個線程等待所有子線程任務(wù)完成之后再繼續(xù)執(zhí)行的邏輯,也可以實現(xiàn)類似簡易CyclicBarrier的功能,達到讓多個線程等待同時開始執(zhí)行某一段邏輯目的。
二、使用
- 一個線程等待其它線程執(zhí)行完再繼續(xù)執(zhí)行
1
2
3
4
5
6
7
8
9
10
11
|
...... CountDownLatch cdl = new CountDownLatch( 10 ); ExecutorService es = Executors.newFixedThreadPool( 10 ); for ( int i = 0 ; i < 10 ; i++) { es.execute(() -> { //do something cdl.countDown(); }); } cdl.await(); ...... |
- 實現(xiàn)類似CyclicBarrier的功能,先await,再countDown
1
2
3
4
5
6
7
8
9
10
11
12
|
...... CountDownLatch cdl = new CountDownLatch( 1 ); ExecutorService es = Executors.newFixedThreadPool( 10 ); for ( int i = 0 ; i < 10 ; i++) { es.execute(() -> { cdl.await(); //do something }); } Thread.sleep(10000L); cdl.countDown(); ...... |
三、源碼分析
CountDownLatch的結(jié)構(gòu)和ReentrantLock、Semaphore的結(jié)構(gòu)類似,也是使用的內(nèi)部類Sync繼承AQS的方式,并且重寫了tryAcquireShared和tryReleaseShared方法。
還是首先來看構(gòu)造函數(shù):
1
2
3
4
|
public CountDownLatch( int count) { if (count < 0 ) throw new IllegalArgumentException( "count < 0" ); this .sync = new Sync(count); } |
需要傳入一個大于0的count,代表CountDownLatch計數(shù)器的初始值,通過Sync的構(gòu)造函數(shù)最終賦值給父類AQS的state字段。可一個看到這個state字段用法多多,在ReentrantLock中使用0和1來標(biāo)識鎖的狀態(tài),Semaphore中用來標(biāo)識信號量,此處又用來表示計數(shù)器。
CountDownLatch要通過await方法完成阻塞,先來看看這個方法是如何實現(xiàn)的:
1
2
3
|
public void await() throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } |
調(diào)用的是sync的acquireSharedInterruptibly方法,該方法定義在AQS中,Semaphore也調(diào)用的這個方法:
1
2
3
4
5
6
7
|
public final void acquireSharedInterruptibly( int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } |
這個方法的邏輯前面在解析SemaPhore的時候細(xì)說過了,這里不再贅述,主要就是兩個方法的調(diào)用,先通過tryAcquireShared方法嘗試獲取"許可",返回值代表此次獲取后的剩余量,如果大于等于0表示獲取成功,否則表示失敗。如果失敗,那么就會進入doAcquireSharedInterruptibly方法執(zhí)行入隊阻塞的邏輯。這里我們主要到CountDownLatch中看看tryAcquireShared方法的實現(xiàn):
1
2
3
|
protected int tryAcquireShared( int acquires) { return (getState() == 0 ) ? 1 : - 1 ; } |
和Semaphore的實現(xiàn)中每次將state減去requires不同,這里直接判斷state是否為0,如果為0那么返回1,表示獲取"許可"成功;如果不為0,表示失敗,則需要入隊阻塞。從這個tryAcquireShared方法就能看出CountDownLatch的邏輯了:等到state變?yōu)榱?,那么所有線程都能獲取運行許可。
那么我們接下來來到countDown方法:
1
2
3
|
public void countDown() { sync.releaseShared( 1 ); } |
調(diào)用的是sync的releaseShared方法,該方法定義在父類AQS中,Semaphore使用的也是這個方法:
1
2
3
4
5
6
7
8
|
public final boolean releaseShared( int arg) { if (tryReleaseShared(arg)) { //當(dāng)state從非 doReleaseShared(); return true ; } return false ; } |
前面提到了CountDownLatch也重寫了tryReleaseShared方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
protected boolean tryReleaseShared( int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0 ) //如果state等于0了直接返回false //保證在并發(fā)情況下,最多只會有一個線程返回true //也包括調(diào)用countDown的次數(shù)超過state的初始值 return false ; int nextc = c- 1 ; if (compareAndSetState(c, nextc)) //如果返回true,表示state從非0變?yōu)榱? //那么后續(xù)需要喚醒阻塞線程 return nextc == 0 ; } } |
Semaphore在釋放信號量的時候,是將獲取的許可歸還到state中,但是CountDownLatch沒有獲取許可的邏輯(獲取許可的時候是判斷state是否等于0),所以在countDown的時候沒有釋放的邏輯,就是將state減1,然后根據(jù)state減1之后的值是否為0判斷release是否成功,如果state本來大于0,經(jīng)過減1之后變?yōu)榱?,那么返回true。tryReleaseShared方法的返回值決定了后續(xù)需不需要調(diào)用doReleaseShared方法喚醒阻塞線程。
這里有個邏輯:如果state已經(jīng)為0,那么返回false。這個主要應(yīng)對兩種情況:
- 調(diào)用countDown的次數(shù)超過了state的初始值多
- 線程并發(fā)調(diào)用的時候保證只有一個線程去完成阻塞線程的喚醒操作
可以看到CountDownLatch沒有鎖的概念,countDown方法可以被一個線程重復(fù)調(diào)用,只需要對state做reduce操作,而不用關(guān)心是誰做的reduce。如果tryReleaseShared返回true,那么表示需要在后面進入doReleaseShared方法,該方法和Semaphore中調(diào)用的方法是同一個,主要是喚醒阻塞線程或者設(shè)置PROPAGAGE狀態(tài),這里也不再贅述~
阻塞線程被喚醒之后,會在doAcquireSharedInterruptibly方法中繼續(xù)循環(huán),雖然和Semaphore調(diào)用的是同樣的方法,但是這里有不一樣的地方,所以還是提一句。我們首先回到doAcquireSharedInterruptibly方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
private void doAcquireSharedInterruptibly( int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //如果head.next被unpark喚醒,說明此時state==0 //那么tryAcquireShared會返回1 int r = tryAcquireShared(arg); //r==1 if (r >= 0 ) { //node節(jié)點被喚醒后,還會繼續(xù)喚醒node.next //這樣依次傳遞,因為在這里的r一定為1 setHeadAndPropagate(node, r); p.next = null ; // help GC failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
當(dāng)head.next線程被unpark喚醒后,會進入tryAcquireShared方法判斷,由于此時state已經(jīng)為0(只有當(dāng)state變?yōu)?時,才會unpark喚醒線程),而前面提到了在CountDownLatch重寫的tryAcquireShared中,如果state==0,那么會返回1,所以會進入setHeadAndPropagate方法:
1
2
3
4
5
6
7
8
9
10
|
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } |
該方法在Semaphore中詳細(xì)介紹過,這里我們就站在CountDownLatch的角度來看看。其實很簡單了,注意此時該方法的propagate參數(shù)值是1,那么就會進入到下面的if邏輯里,繼續(xù)喚醒下一個node。當(dāng)下一個node對應(yīng)的線程被喚醒后,同樣會進入setHeadAndPropagate方法,propagage同樣為1,那么繼續(xù)喚醒下一個node,就這樣依次將整個CLH隊列的節(jié)點都喚醒。
四、總結(jié)
如果單獨把CountDownLatch拿出來看其實是很復(fù)雜的,只是CountDownLatch(包括Semaphore和ReentrantLock)都高度共用了AQS提供的一些方法,而這些方法在前面介紹Semaphore和ReentrantLock的時候已經(jīng)詳細(xì)分析過,所以到本文分析CountDownLatch的時候,只需要關(guān)注它內(nèi)部類Sync重寫的兩個方法:tryAcquireShared和tryReleaseShared,也就是"獲取許可"和"釋放許可"的邏輯。
CountDownLatch在await的邏輯里,如果當(dāng)前state的值大于0,那么會進入CLH隊列進行阻塞等待unpark喚醒(或者中斷喚醒);在countDown的邏輯里,就是簡單的將state-1,如果一個線程把state從1減為0,那么該線程就會負(fù)責(zé)喚醒head.next節(jié)點,head.next節(jié)點被喚醒后,又會在setHeadAndPropagate方法中喚醒next.next節(jié)點,這樣依次喚醒所有CLH隊列中的阻塞節(jié)點。當(dāng)然,如果線程被中斷喚醒,那么也會進入cancelAcquire中進行無效節(jié)點的移除邏輯。
到此這篇關(guān)于Java并發(fā)編程之CountDownLatch源碼解析的文章就介紹到這了,更多相關(guān)Java中CountDownLatch源碼解析內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://blog.csdn.net/huangzhilin2015/article/details/115725200