以下是本篇文章的大綱
1 synchronized和lock
1.1 synchronized的局限性
1.2 lock簡介
2 aqs
3 lock()與unlock()實現原理
3.1 基礎知識
3.2 內部結構
3.3 nonfairsync
3.3.1 lock()
3.3.2 unlock()
3.3.3 小結
3.4 fairsync
4 超時機制
5 總結
1 synchronized和lock
1.1 synchronized的局限性
synchronized是java內置的關鍵字,它提供了一種獨占的加鎖方式。synchronized的獲取和釋放鎖由jvm實現,用戶不需要顯示的釋放鎖,非常方便。然而synchronized也有一定的局限性,例如:
當線程嘗試獲取鎖的時候,如果獲取不到鎖會一直阻塞。
如果獲取鎖的線程進入休眠或者阻塞,除非當前線程異常,否則其他線程嘗試獲取鎖必須一直等待。
jdk1.5之后發布,加入了doug lea實現的concurrent包。包內提供了lock類,用來提供更多擴展的加鎖功能。lock彌補了synchronized的局限,提供了更加細粒度的加鎖功能。
1.2 lock簡介
lock api如下
1
2
3
4
5
6
|
void lock(); void lockinterruptibly() throws interruptedexception; boolean trylock(); boolean trylock( long time, timeunit unit) throws interruptedexception; void unlock(); condition newcondition(); |
其中最常用的就是lock和unlock操作了。因為使用lock時,需要手動的釋放鎖,所以需要使用try..catch來包住業務代碼,并且在finally中釋放鎖。典型使用如下
1
2
3
4
5
6
7
8
9
10
11
|
private lock lock = new reentrantlock(); public void test(){ lock.lock(); try { dosomething(); } catch (exception e){ // ignored } finally { lock.unlock(); } } |
2 aqs
abstractqueuedsynchronizer簡稱aqs,是一個用于構建鎖和同步容器的框架。事實上concurrent包內許多類都是基于aqs構建,例如reentrantlock,semaphore,countdownlatch,reentrantreadwritelock,futuretask等。aqs解決了在實現同步容器時設計的大量細節問題。
aqs使用一個fifo的隊列表示排隊等待鎖的線程,隊列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何線程關聯。其他的節點與等待線程關聯,每個節點維護一個等待狀態waitstatus。如圖
aqs中還有一個表示狀態的字段state,例如reentrantlocky用它表示線程重入鎖的次數,semaphore用它表示剩余的許可數量,futuretask用它表示任務的狀態。對state變量值的更新都采用cas操作保證更新操作的原子性。
abstractqueuedsynchronizer繼承了abstractownablesynchronizer,這個類只有一個變量:exclusiveownerthread,表示當前占用該鎖的線程,并且提供了相應的get,set方法。
理解aqs可以幫助我們更好的理解jcu包中的同步容器。
3 lock()與unlock()實現原理
3.1 基礎知識
reentrantlock是lock的默認實現之一。那么lock()和unlock()是怎么實現的呢?首先我們要弄清楚幾個概念
可重入鎖。可重入鎖是指同一個線程可以多次獲取同一把鎖。reentrantlock和synchronized都是可重入鎖。
可中斷鎖。可中斷鎖是指線程嘗試獲取鎖的過程中,是否可以響應中斷。synchronized是不可中斷鎖,而reentrantlock則提供了中斷功能。
公平鎖與非公平鎖。公平鎖是指多個線程同時嘗試獲取同一把鎖時,獲取鎖的順序按照線程達到的順序,而非公平鎖則允許線程“插隊”。synchronized是非公平鎖,而reentrantlock的默認實現是非公平鎖,但是也可以設置為公平鎖。
cas操作(compareandswap)。cas操作簡單的說就是比較并交換。cas 操作包含三個操作數 —— 內存位置(v)、預期原值(a)和新值(b)。如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在 cas 指令之前返回該位置的值。cas 有效地說明了“我認為位置 v 應該包含值 a;如果包含該值,則將 b 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。” java并發包(java.util.concurrent)中大量使用了cas操作,涉及到并發的地方都調用了sun.misc.unsafe類方法進行cas操作。
3.2 內部結構
reentrantlock提供了兩個構造器,分別是
1
2
3
4
5
6
|
public reentrantlock() { sync = new nonfairsync(); } public reentrantlock( boolean fair) { sync = fair ? new fairsync() : new nonfairsync(); } |
默認構造器初始化為nonfairsync對象,即非公平鎖,而帶參數的構造器可以指定使用公平鎖和非公平鎖。由lock()和unlock的源碼可以看到,它們只是分別調用了sync對象的lock()和release(1)方法。
sync是reentrantlock的內部類,它的結構如下
可以看到sync擴展了abstractqueuedsynchronizer。
3.3 nonfairsync
我們從源代碼出發,分析非公平鎖獲取鎖和釋放鎖的過程。
3.3.1 lock()
lock()源碼如下
1
2
3
4
5
6
|
final void lock() { if (compareandsetstate( 0 , 1 )) setexclusiveownerthread(thread.currentthread()); else acquire( 1 ); } |
首先用一個cas操作,判斷state是否是0(表示當前鎖未被占用),如果是0則把它置為1,并且設置當前線程為該鎖的獨占線程,表示獲取鎖成功。當多個線程同時嘗試占用同一個鎖時,cas操作只能保證一個線程操作成功,剩下的只能乖乖的去排隊啦。
“非公平”即體現在這里,如果占用鎖的線程剛釋放鎖,state置為0,而排隊等待鎖的線程還未喚醒時,新來的線程就直接搶占了該鎖,那么就“插隊”了。
若當前有三個線程去競爭鎖,假設線程a的cas操作成功了,拿到了鎖開開心心的返回了,那么線程b和c則設置state失敗,走到了else里面。我們往下看acquire。
acquire(arg)
1
2
3
4
5
|
public final void acquire( int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); } |
代碼非常簡潔,但是背后的邏輯卻非常復雜,可見doug lea大神的編程功力。
1. 第一步。嘗試去獲取鎖。如果嘗試獲取鎖成功,方法直接返回。
tryacquire(arg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
final boolean nonfairtryacquire( int acquires) { //獲取當前線程 final thread current = thread.currentthread(); //獲取state變量值 int c = getstate(); if (c == 0 ) { //沒有線程占用鎖 if (compareandsetstate( 0 , acquires)) { //占用鎖成功,設置獨占線程為當前線程 setexclusiveownerthread(current); return true ; } } else if (current == getexclusiveownerthread()) { //當前線程已經占用該鎖 int nextc = c + acquires; if (nextc < 0 ) // overflow throw new error( "maximum lock count exceeded" ); // 更新state值為新的重入次數 setstate(nextc); return true ; } //獲取鎖失敗 return false ; } |
非公平鎖tryacquire的流程是:檢查state字段,若為0,表示鎖未被占用,那么嘗試占用,若不為0,檢查當前鎖是否被自己占用,若被自己占用,則更新state字段,表示重入鎖的次數。如果以上兩點都沒有成功,則獲取鎖失敗,返回false。
2. 第二步,入隊。由于上文中提到線程a已經占用了鎖,所以b和c執行tryacquire失敗,并且入等待隊列。如果線程a拿著鎖死死不放,那么b和c就會被掛起。
先看下入隊的過程。
先看addwaiter(node.exclusive)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/** * 將新節點和當前線程關聯并且入隊列 * @param mode 獨占/共享 * @return 新節點 */ private node addwaiter(node mode) { //初始化節點,設置關聯線程和模式(獨占 or 共享) node node = new node(thread.currentthread(), mode); // 獲取尾節點引用 node pred = tail; // 尾節點不為空,說明隊列已經初始化過 if (pred != null ) { node.prev = pred; // 設置新節點為尾節點 if (compareandsettail(pred, node)) { pred.next = node; return node; } } // 尾節點為空,說明隊列還未初始化,需要初始化head節點并入隊新節點 enq(node); return node; } |
b、c線程同時嘗試入隊列,由于隊列尚未初始化,tail==null,故至少會有一個線程會走到enq(node)。我們假設同時走到了enq(node)里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/** * 初始化隊列并且入隊新節點 */ private node enq( final node node) { //開始自旋 for (;;) { node t = tail; if (t == null ) { // must initialize // 如果tail為空,則新建一個head節點,并且tail指向head if (compareandsethead( new node())) tail = head; } else { node.prev = t; // tail不為空,將新節點入隊 if (compareandsettail(t, node)) { t.next = node; return t; } } } } |
這里體現了經典的自旋+cas組合來實現非阻塞的原子操作。由于compareandsethead的實現使用了unsafe類提供的cas操作,所以只有一個線程會創建head節點成功。假設線程b成功,之后b、c開始第二輪循環,此時tail已經不為空,兩個線程都走到else里面。假設b線程compareandsettail成功,那么b就可以返回了,c由于入隊失敗還需要第三輪循環。最終所有線程都可以成功入隊。
當b、c入等待隊列后,此時aqs隊列如下:
3. 第三步,掛起。b和c相繼執行acquirequeued(final node node, int arg)。這個方法讓已經入隊的線程嘗試獲取鎖,若失敗則會被掛起。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
/** * 已經入隊的線程嘗試獲取鎖 */ final boolean acquirequeued( final node node, int arg) { boolean failed = true ; //標記是否成功獲取鎖 try { boolean interrupted = false ; //標記線程是否被中斷過 for (;;) { final node p = node.predecessor(); //獲取前驅節點 //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取鎖 if (p == head && tryacquire(arg)) { sethead(node); // 獲取成功,將當前節點設置為head節點 p.next = null ; // 原head節點出隊,在某個時間點被gc回收 failed = false ; //獲取成功 return interrupted; //返回是否被中斷過 } // 判斷獲取失敗后是否可以掛起,若可以則掛起 if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) // 線程若被中斷,設置interrupted為true interrupted = true ; } } finally { if (failed) cancelacquire(node); } } |
code里的注釋已經很清晰的說明了acquirequeued的執行流程。假設b和c在競爭鎖的過程中a一直持有鎖,那么它們的tryacquire操作都會失敗,因此會走到第2個if語句中。我們再看下shouldparkafterfailedacquire和parkandcheckinterrupt都做了哪些事吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/** * 判斷當前線程獲取鎖失敗之后是否需要掛起. */ private static boolean shouldparkafterfailedacquire(node pred, node node) { //前驅節點的狀態 int ws = pred.waitstatus; if (ws == node.signal) // 前驅節點狀態為signal,返回true return true ; // 前驅節點狀態為cancelled if (ws > 0 ) { // 從隊尾向前尋找第一個狀態不為cancelled的節點 do { node.prev = pred = pred.prev; } while (pred.waitstatus > 0 ); pred.next = node; } else { // 將前驅節點的狀態設置為signal compareandsetwaitstatus(pred, ws, node.signal); } return false ; } /** * 掛起當前線程,返回線程中斷狀態并重置 */ private final boolean parkandcheckinterrupt() { locksupport.park( this ); return thread.interrupted(); } |
線程入隊后能夠掛起的前提是,它的前驅節點的狀態為signal,它的含義是“hi,前面的兄弟,如果你獲取鎖并且出隊后,記得把我喚醒!”。所以shouldparkafterfailedacquire會先判斷當前節點的前驅是否狀態符合要求,若符合則返回true,然后調用parkandcheckinterrupt,將自己掛起。如果不符合,再看前驅節點是否>0(cancelled),若是那么向前遍歷直到找到第一個符合要求的前驅,若不是則將前驅節點的狀態設置為signal。
整個流程中,如果前驅結點的狀態不是signal,那么自己就不能安心掛起,需要去找個安心的掛起點,同時可以再嘗試下看有沒有機會去嘗試競爭鎖。
最終隊列可能會如下圖所示
線程b和c都已經入隊,并且都被掛起。當線程a釋放鎖的時候,就會去喚醒線程b去獲取鎖啦。
3.3.2 unlock()
unlock相對于lock就簡單很多。源碼如下
1
2
3
4
5
6
7
8
9
10
11
12
|
public void unlock() { sync.release( 1 ); } public final boolean release( int arg) { if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0 ) unparksuccessor(h); return true ; } return false ; } |
如果理解了加鎖的過程,那么解鎖看起來就容易多了。流程大致為先嘗試釋放鎖,若釋放成功,那么查看頭結點的狀態是否為signal,如果是則喚醒頭結點的下個節點關聯的線程,如果釋放失敗那么返回false表示解鎖失敗。這里我們也發現了,每次都只喚起頭結點的下一個節點關聯的線程。
最后我們再看下tryrelease的執行過程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/** * 釋放當前線程占用的鎖 * @param releases * @return 是否釋放成功 */ protected final boolean tryrelease( int releases) { // 計算釋放后state值 int c = getstate() - releases; // 如果不是當前線程占用鎖,那么拋出異常 if (thread.currentthread() != getexclusiveownerthread()) throw new illegalmonitorstateexception(); boolean free = false ; if (c == 0 ) { // 鎖被重入次數為0,表示釋放成功 free = true ; // 清空獨占線程 setexclusiveownerthread( null ); } // 更新state值 setstate(c); return free; } |
這里入參為1。tryrelease的過程為:當前釋放鎖的線程若不持有鎖,則拋出異常。若持有鎖,計算釋放后的state值是否為0,若為0表示鎖已經被成功釋放,并且則清空獨占線程,最后更新state值,返回free。
3.3.3 小結
用一張流程圖總結一下非公平鎖的獲取鎖的過程。
3.4 fairsync
公平鎖和非公平鎖不同之處在于,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1),這里不再贅述。
4 超時機制
在reetrantlock的trylock(long timeout, timeunit unit) 提供了超時獲取鎖的功能。它的語義是在指定的時間內如果獲取到鎖就返回true,獲取不到則返回false。這種機制避免了線程無限期的等待鎖釋放。那么超時的功能是怎么實現的呢?我們還是用非公平鎖為例來一探究竟。
1
2
3
4
|
public boolean trylock( long timeout, timeunit unit) throws interruptedexception { return sync.tryacquirenanos( 1 , unit.tonanos(timeout)); } |
還是調用了內部類里面的方法。我們繼續向前探究
1
2
3
4
5
6
7
|
public final boolean tryacquirenanos( int arg, long nanostimeout) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); return tryacquire(arg) || doacquirenanos(arg, nanostimeout); } |
這里的語義是:如果線程被中斷了,那么直接拋出interruptedexception。如果未中斷,先嘗試獲取鎖,獲取成功就直接返回,獲取失敗則進入doacquirenanos。tryacquire我們已經看過,這里重點看一下doacquirenanos做了什么。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
/** * 在有限的時間內去競爭鎖 * @return 是否獲取成功 */ private boolean doacquirenanos( int arg, long nanostimeout) throws interruptedexception { // 起始時間 long lasttime = system.nanotime(); // 線程入隊 final node node = addwaiter(node.exclusive); boolean failed = true ; try { // 又是自旋! for (;;) { // 獲取前驅節點 final node p = node.predecessor(); // 如果前驅是頭節點并且占用鎖成功,則將當前節點變成頭結點 if (p == head && tryacquire(arg)) { sethead(node); p.next = null ; // help gc failed = false ; return true ; } // 如果已經超時,返回false if (nanostimeout <= 0 ) return false ; // 超時時間未到,且需要掛起 if (shouldparkafterfailedacquire(p, node) && nanostimeout > spinfortimeoutthreshold) // 阻塞當前線程直到超時時間到期 locksupport.parknanos( this , nanostimeout); long now = system.nanotime(); // 更新nanostimeout nanostimeout -= now - lasttime; lasttime = now; if (thread.interrupted()) //相應中斷 throw new interruptedexception(); } } finally { if (failed) cancelacquire(node); } } |
doacquirenanos的流程簡述為:線程先入等待隊列,然后開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列里找一個安全點把自己掛起直到超時時間過期。這里為什么還需要循環呢?因為當前線程節點的前驅狀態可能不是signal,那么在當前這一輪循環中線程不會被掛起,然后更新超時時間,開始新一輪的嘗試
5 總結
reentrantlock的核心功能講解差不多落下帷幕,理解aqs,就很容易理解reentrantlock的實現原理。文中慘雜著筆者的個人理解,如有不正之處,還望指正。
以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!
原文鏈接:http://www.cnblogs.com/maypattis/p/6403682.html