condition介紹
上篇文章講了reentrantlock的加鎖和釋放鎖的使用,這篇文章是對reentrantlock的補充。reentrantlock#newcondition()可以創建condition,在reentrantlock加鎖過程中可以利用condition阻塞當前線程并臨時釋放鎖,待另外線程獲取到鎖并在邏輯后通知阻塞線程"激活"。condition常用在基于異步通信的同步機制實現中,比如dubbo中的請求和獲取應答結果的實現。
常用方法
condition中主要的方法有2個
- (1)await()方法可以阻塞當前線程,并釋放鎖。
- (2)在獲取鎖后可以調用signal()通知被await()阻塞的線程"激活"。
這里的await(),signal()必須在reentrantlock#lock()和reentrantlock#unlock()之間調用。
condition實現分析
condition的實現也是利用abstractqueuedsynchronizer隊列來實現,await()在被調用后先將當前線程加入到等待隊列中,然后釋放鎖,最后阻塞當前線程。signal()在被調用后會先獲取等待隊列中第一個節點,并將這個節點轉化成reentrantlock中的節點并加入到同步阻塞隊列的結尾,這樣此節點的上個節點線程釋放鎖后會激活此節點線程取來獲取鎖。
await()方法源碼分析
await()源碼如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public final void await() throws interruptedexception { //判斷是否當前線程是否被中斷中斷則拋出中斷異常 if (thread.interrupted()) throw new interruptedexception(); //加入等待隊列 node node = addconditionwaiter(); //釋放當前線程鎖 int savedstate = fullyrelease(node); int interruptmode = 0 ; //判斷是否在同步阻塞隊列,如果不在一直循環到被加入 while (!isonsyncqueue(node)) { //阻塞當前線程 locksupport.park( this ); //判斷是否被中斷 if ((interruptmode = checkinterruptwhilewaiting(node)) != 0 ) break ; } //獲取鎖,如果獲取中被中斷則設置中斷狀態 if (acquirequeued(node, savedstate) && interruptmode != throw_ie) interruptmode = reinterrupt; //清除等待隊列中被"激活"的節點 if (node.nextwaiter != null ) // clean up if cancelled unlinkcancelledwaiters(); //如果當前線程被中斷,處理中斷邏輯 if (interruptmode != 0 ) reportinterruptafterwait(interruptmode); } |
主要分以下幾步
- (1)先判斷是否當前線程是否被中斷中斷則拋出中斷異常如果未中斷調用addconditionwaiter()加入等待隊列
- (2)調用fullyrelease(node)釋放鎖使同步阻塞隊列的下個節點線程能獲取鎖。
- (3)調用isonsyncqueue(node)判斷是否在同步阻塞隊列,這里的加入同步阻塞隊列操作是在另一個線程調用signal()后加入,如果不在同步阻塞隊列會進行阻塞直到被激活。
- (4)如果被激活然后調用checkinterruptwhilewaiting(node)判斷是否被中斷并獲取中斷模式。
- (5)繼續調用isonsyncqueue(node)判斷是否在同步阻塞隊列。
- (6)是則調用acquirequeued(node, savedstate) 獲取鎖,這里如果獲取不到也會被阻塞,獲取不到原因是在第一次調用isonsyncqueue(node)前,可能另一個線程已經調用signal()后加入到同步阻塞隊列,然后調用acquirequeued(node, savedstate) 獲取不到鎖并阻塞。acquirequeued(node, savedstate)也會返回當前線程是否被中斷,如果被中斷設置中斷模式。
- (7)在激活后調用unlinkcancelledwaiters()清理等待隊列的已經被激活的節點。
- (8)最后判斷當前線程是否被中斷,如果被中斷則對中斷線程做處理。
下面來看下addconditionwaiter()實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
private node addconditionwaiter() { //獲取等待隊列尾部節點 node t = lastwaiter; //如果尾部狀態不為condition,如果已經被"激活",清理之,然后重新獲取尾部節點 if (t != null && t.waitstatus != node.condition) { unlinkcancelledwaiters(); t = lastwaiter; } //創建以當前線程為基礎的節點,并將節點模式設置成condition node node = new node(thread.currentthread(), node.condition); //如果尾節點不存在,說明隊列為空,將頭節點設置成當前節點 if (t == null ) firstwaiter = node; //如果尾節點存在,將此節點設置成尾節點的下個節點 else t.nextwaiter = node; //將尾節點設置成當前節點 lastwaiter = node; return node; } |
addconditionwaiter()的邏輯很簡單,就是創建以當前線程為基礎的節點并把節點加入等待隊列的尾部待其他線程處理。
下面來看下fullyrelease(node node)實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
final int fullyrelease(node node) { boolean failed = true ; try { //獲取阻塞隊列中當前線程節點的鎖狀態值 int savedstate = getstate(); //釋放當前線程節點鎖 if (release(savedstate)) { failed = false ; return savedstate; } else { throw new illegalmonitorstateexception(); } } finally { //釋放失敗講節點等待狀態設置成關閉 if (failed) node.waitstatus = node.cancelled; } } |
調用getstate()先獲取阻塞隊列中當前線程節點的鎖狀態值,這個值可能大于1表示多次重入,然后調用release(savedstate)釋放所有鎖,如果釋放成功返回鎖狀態值。
下面來看下isonsyncqueue(node node)實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
final boolean isonsyncqueue(node node) { //判斷當前節點是否是condition或者前置節點是否為空如果為空直接返回false if (node.waitstatus == node.condition || node.prev == null ) return false ; //如果下個節點存在,則在同步阻塞隊列中返回true if (node.next != null ) // if has successor, it must be on queue return true ; //遍歷查找當前節點是否在同步阻塞隊列中 return findnodefromtail(node); } private boolean findnodefromtail(node node) { node t = tail; for (;;) { if (t == node) return true ; if (t == null ) return false ; t = t.prev; } } |
此方法的功能是查找當前節點是否在同步阻塞隊列中,方法先是快速判斷,判斷不了再進行遍歷查找。
- (1)第一步先判斷次節點是否condition狀態或者前置節點是否存在,如果是表明不在隊列中返回false,阻塞隊列中的狀態一般是0或者signal狀態而且如果當前如果當前節點在隊列阻塞中且未被激活前置節點一定不為空。
- (2)第二步判斷節點的下個節點是否存在,如果存在則表明當前當前節點已加入到阻塞隊列中。
- (3)如果以上2點都沒法判斷,也有可能剛剛加入到同步阻塞隊列中,所以調用findnodefromtail(node node)做最后的遍歷查找。查找從隊列尾部開始查,從尾部開始查的原因是可能剛剛加入到同步阻塞隊列中,從尾部能快速定位。
下面看下checkinterruptwhilewaiting(node node)實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private int checkinterruptwhilewaiting(node node) { return thread.interrupted() ? (transferaftercancelledwait(node) ? throw_ie : reinterrupt) : 0 ; } final boolean transferaftercancelledwait(node node) { if (compareandsetwaitstatus(node, node.condition, 0 )) { enq(node); return true ; } while (!isonsyncqueue(node)) thread.yield(); return false ; } |
此方法在線程被激活后被調用,主要功能就是判斷被激活的線程是否被中斷。此方法會返回2種中斷狀態throw_ie和reinterrupt,throw_ie是調用signal()前被中斷返回,reinterrupt在調用signal()后被中斷返回。 此方法先判斷是否被標記中斷,是的話再調用transferaftercancelledwait(node)取判斷是那種中斷狀態,transferaftercancelledwait(node)方法分2步
- (1)用cas方式將節點狀態改錯等待狀態改成condition,并加入到同步阻塞隊列中返回true
- (2)如果不能加入到同步阻塞隊列就自旋一直等待加入
如果使用await()方法上面2步其實是沒什么作用其最后一定會返回false,因為await()被激活只能調用 signal()方法,而signal()方法肯定已經將節點加入到同步阻塞隊列中。所以以上邏輯是給await(long time, timeunit unit)等帶超時激活方法用的。
acquirequeued(node, savedstate)方法再上一章節已經講過這邊就不重復了,下面分析下unlinkcancelledwaiters()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private void unlinkcancelledwaiters() { //獲取等待隊列頭節點 node t = firstwaiter; node trail = null ; while (t != null ) { //獲取下個節點 node next = t.nextwaiter; //如果狀態不為condition說明已經加入阻塞隊列需要清理掉 if (t.waitstatus != node.condition) { t.nextwaiter = null ; if (trail == null ) firstwaiter = next; else //獲取下個節點 trail.nextwaiter = next; if (next == null ) lastwaiter = trail; } else trail = t; t = next; } } |
此方法就是從頭開始查找狀態不為condition的節點并清理,狀態不為condition節點說明此節點已經加入到阻塞隊列,已經不需要維護。
下面來看下reportinterruptafterwait(interruptmode)方法
1
2
3
4
5
6
7
8
9
|
private void reportinterruptafterwait( int interruptmode) throws interruptedexception { //如果是throw_ie模式直接拋出異常 if (interruptmode == throw_ie) throw new interruptedexception(); //如果是reinterrupt模式標記線程中斷由上層處理中斷 else if (interruptmode == reinterrupt) selfinterrupt(); } |
此方法處理中斷邏輯。如果是throw_ie模式直接拋出異常,如果是reinterrupt模式標記線程中斷由上層處理中斷。
signal()方法源碼分析
signal()源碼如下
1
2
3
4
5
6
7
8
9
|
public final void signal() { //是否當前線程持有鎖 if (!isheldexclusively()) throw new illegalmonitorstateexception(); node first = firstwaiter; //通知"激活"頭節點線程 if (first != null ) dosignal(first); } |
先調用isheldexclusively()判斷鎖是否被當前線程持有,然后檢查等待隊列是否為空,不為空就是可以取第一個節點調用dosignal(first)去"激活",這里激活不是真正的激活而只是將節點加入到同步阻塞隊列尾部,所以上下文中帶""的激活都是這種解釋。
下面看下isheldexclusively()實現
1
2
3
|
protected final boolean isheldexclusively() { return getexclusiveownerthread() == thread.currentthread(); } |
實現就是比較下當前線程和持有鎖的線程是否同一個
下面看下dosignal(first)的實現
1
2
3
4
5
6
7
8
9
10
11
|
private void dosignal(node first) { do { //頭指頭后移一位,如果后面的節點為空,則將尾指頭也指向空,說明隊列為空了 if ( (firstwaiter = first.nextwaiter) == null ) lastwaiter = null ; //清空頭節點的下個節點 first.nextwaiter = null ; //如果"激活"失敗者取下個繼續,直到成功或者遍歷完 } while (!transferforsignal(first) && (first = firstwaiter) != null ); } |
此方法就是取當前頭節點一直去嘗試"激活",直到成功或者遍歷完。
下面來看下transferforsignal(first)方法
1
2
3
4
5
6
7
8
9
10
11
12
|
final boolean transferforsignal(node node) { //將condition狀態設置成0 if (!compareandsetwaitstatus(node, node.condition, 0 )) return false ; //加入到同步阻塞隊列 node p = enq(node); int ws = p.waitstatus; //狀態異常直接激活 if (ws > 0 || !compareandsetwaitstatus(p, ws, node.signal)) locksupport.unpark(node.thread); return true ; } |
(1)此方法先先將condition狀態設置成0,因為如果是condition狀態加入到同步阻塞隊列,激活的時候是不識別的。
(2)加入到同步阻塞隊列的尾部。所以同步阻塞隊列中前面如果有多個在排隊,調用unlock()不會馬上激活此節點。
(3)狀態異常直接調用unpark激活,這邊按理說如果狀態異常情況下激活,await()在調用unlock()被激活后會進行相應的異常處理,但看await()代碼沒有處理則是正常執行。
這個方法主要就是把節點加入到同步阻塞隊列的,真正的激活則是調用unlock()去處理。
以上所述是小編給大家介紹的java并發編程之condition源碼分析詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://my.oschina.net/u/945573/blog/2995600