抽象隊列同步器(AQS-AbstractQueuedSynchronizer)
從名字上來理解:
- 抽象:是抽象類,具體由子類實現
- 隊列:數據結構是隊列,使用隊列存儲數據
- 同步:基于它可以實現同步功能
我們就從這幾個方面來入手解讀,但首先,我們得先知道以下幾個它的特點,以便于理解
AbstractQueuedSynchronizer特點
1.AQS可以實現獨占鎖和共享鎖。
2.獨占鎖exclusive是一個悲觀鎖。保證只有一個線程經過一個阻塞點,只有一個線程可以獲得鎖。
3.共享鎖shared是一個樂觀鎖。可以允許多個線程阻塞點,可以多個線程同時獲取到鎖。它允許一個資源可以被多個讀操作,或者被一個寫操作訪問,但是兩個操作不能同時訪問。
4.AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0無鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,可以確保對state的操作是安全的。
5.AQS是通過一個CLH隊列實現的(CLH鎖即Craig, Landin, and Hagersten (CLH) locks,CLH鎖是一個自旋鎖,能確保無饑餓性,提供先來先服務的公平性。CLH鎖也是一種基于鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。)
抽象
我們來扒一扒源碼可以看到它繼承于AbstractOwnableSynchronizer它是一個抽象類.
- public abstract class AbstractQueuedSynchronizer
- extends AbstractOwnableSynchronizer
- implements java.io.Serializable
AQS內部使用了一個volatile的變量state來作為資源的標識。同時定義了幾個獲取和改變state的protected方法,子類可以覆蓋這些方法來實現自己的邏輯.
可以看到類中為我們提供了幾個protected級別的方法,它們分別是:
- //創建一個隊列同步器實例,初始state是0
- protected AbstractQueuedSynchronizer() { }
- //返回同步狀態的當前值。
- protected final int getState() {
- return state;
- }
- //設置同步狀態的值
- protected final void setState(int newState) {
- state = newState;
- }
- //獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
- //獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- protected boolean tryRelease(int arg) {
- throw new UnsupportedOperationException();
- }
- //共享方式。嘗試獲取資源。負數表示失??;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源
- protected int tryAcquireShared(int arg) {
- throw new UnsupportedOperationException();
- }
- //共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
- protected boolean tryReleaseShared(int arg) {
- throw new UnsupportedOperationException();
- }
這些方法雖然都是protected方法,但是它們并沒有在AQS具體實現,而是直接拋出異常,AQS實現了一系列主要的邏輯 由此可知,AQS是一個抽象的用于構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的同步器,比如我們提到的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
我們自己也能利用AQS非常輕松容易地構造出自定義的同步器,只要子類實現它的幾個protected方法就可以了.
隊列
AQS類本身實現的是具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等)。它內部使用了一個先進先出(FIFO)的雙端隊列(CLH),并使用了兩個指針head和tail用于標識隊列的頭部和尾部。其數據結構如圖:
隊列并不是直接儲存線程,而是儲存擁有線程的Node節點。
我們來看看Node的結構:
- static final class Node {
- // 標記一個結點(對應的線程)在共享模式下等待
- static final Node SHARED = new Node();
- // 標記一個結點(對應的線程)在獨占模式下等待
- static final Node EXCLUSIVE = null;
- // waitStatus的值,表示該結點(對應的線程)已被取消
- static final int CANCELLED = 1;
- // waitStatus的值,表示后繼結點(對應的線程)需要被喚醒
- static final int SIGNAL = -1;
- // waitStatus的值,表示該結點(對應的線程)在等待某一條件
- static final int CONDITION = -2;
- //waitStatus的值,表示有資源可用,新head結點需要繼續喚醒后繼結點
- //(共享模式下,多線程并發釋放資源,而head喚醒其后繼結點后,
- //需要把多出來的資源留給后面的結點;設置新的head結點時,會繼續喚醒其后繼結點)
- static final int PROPAGATE = -3;
- // 等待狀態,取值范圍,-3,-2,-1,0,1
- volatile int waitStatus;
- volatile Node prev; // 前驅結點
- volatile Node next; // 后繼結點
- volatile Thread thread; // 結點對應的線程
- Node nextWaiter; // 等待隊列里下一個等待條件的結點
- // 判斷共享模式的方法
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- // 其它方法忽略,可以參考具體的源碼
- }
- // AQS里面的addWaiter私有方法
- private Node addWaiter(Node mode) {
- // 使用了Node的這個構造函數
- Node node = new Node(Thread.currentThread(), mode);
- // 其它代碼省略
- }
過Node我們可以實現兩個隊列,一是通過prev和next實現CLH隊列(線程同步隊列,雙向隊列),二是nextWaiter實現Condition條件上的等待線程隊列(單向隊列),這個Condition主要用在ReentrantLock類中
同步
兩種同步方式:
- 獨占模式(Exclusive):資源是獨占的,一次只能一個線程獲取。如ReentrantLock。
- 共享模式(Share):同時可以被多個線程獲取,具體的資源個數可以通過參數指定。如Semaphore/CountDownLatch。
同時實現兩種模式的同步類,如ReadWriteLock
獲取資源
獲取資源的入口是acquire(int arg)方法。arg是要獲取的資源的個數,在獨占模式下始終為1。
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
首先調用tryAcquire(arg)嘗試去獲取資源。前面提到了這個方法是在子類具體實現的如果獲取資源失敗,就通過addWaiter(Node.EXCLUSIVE)方法把這個線程插入到等待隊列中。其中傳入的參數代表要插入的Node是獨占式的。這個方法的具體實現:
- private Node addWaiter(Node mode) {
- // 生成該線程對應的Node節點
- Node node = new Node(Thread.currentThread(), mode);
- // 將Node插入隊列中
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- // 使用CAS嘗試,如果成功就返回
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 如果等待隊列為空或者上述CAS失敗,再自旋CAS插入
- enq(node);
- return node;
- }
- //AQS中會存在多個線程同時爭奪資源的情況,
- //因此肯定會出現多個線程同時插入節點的操作,
- //在這里是通過CAS自旋的方式保證了操作的線程安全性。
- // 自旋CAS插入等待隊列
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
若設置成功就代表自己獲取到了鎖,返回true。狀態為0設置1的動作在外部就有做過一次,內部再一次做只是提升概率,而且這樣的操作相對鎖來講不占開銷。如果狀態不是0,則判定當前線程是否為排它鎖的Owner,如果是Owner則嘗試將狀態增加acquires(也就是增加1),如果這個狀態值越界,則會拋出異常提示,若沒有越界,將狀態設置進去后返回true(實現了類似于偏向的功能,可重入,但是無需進一步征用)。如果狀態不是0,且自身不是owner,則返回false。
現在通過addWaiter方法,已經把一個Node放到等待隊列尾部了。而處于等待隊列的結點是從頭結點一個一個去獲取資源的。具體的實現我們來看看acquireQueued方法:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- // 自旋
- for (;;) {
- final Node p = node.predecessor();
- // 如果node的前驅結點p是head,表示node是第二個結點,就可以嘗試去獲取資源了
- if (p == head && tryAcquire(arg)) {
- // 拿到資源后,將head指向該結點。
- // 所以head所指的結點,就是當前獲取到資源的那個結點或null。
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 如果自己可以休息了,就進入waiting狀態,直到被unpark()
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
這里parkAndCheckInterrupt方法內部使用到了LockSupport.park(this),順便簡單介紹一下park。LockSupport類是Java 6 引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類里的函數,歸結到Unsafe里,只有兩個函數:park(boolean isAbsolute, long time):阻塞當前線程 unpark(Thread jthread):使給定的線程停止阻塞
所以結點進入等待隊列后,是調用park使它進入阻塞狀態的。只有頭結點的線程是處于活躍狀態的。
acquire方法 獲取資源的流程:
當然,獲取資源的方法除了acquire外,還有以下三個:
- acquireInterruptibly:申請可中斷的資源(獨占模式)
- acquireShared:申請共享模式的資源
- acquireSharedInterruptibly:申請可中斷的資源(共享模式)
可中斷的意思是,在線程中斷時可能會拋出InterruptedException
釋放資源
釋放資源相比于獲取資源來說,會簡單許多。在AQS中只有一小段實現。
源碼:
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
tryRelease方法 這個動作可以認為就是一個設置鎖狀態的操作,而且是將狀態減掉傳入的參數值(參數是1),如果結果狀態為0,就將排它鎖的Owner設置為null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增加1(當然可以自己修改這個值),在解鎖的時候減掉1,同一個鎖,在可以重入后,可能會被疊加為2、3、4這些值,只有unlock()的次數與lock()的次數對應才會將Owner線程設置為空,而且也只有這種情況下才會返回true。這一點大家寫代碼要注意,如果是在循環體中lock()或故意使用兩次以上的lock(),而最終只有一次unlock(),最終可能無法釋放鎖。導致死鎖.
- private void unparkSuccessor(Node node) {
- // 如果狀態是負數,嘗試把它設置為0
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 得到頭結點的后繼結點head.next
- Node s = node.next;
- // 如果這個后繼結點為空或者狀態大于0
- // 通過前面的定義我們知道,大于0只有一種可能,就是這個結點已被取消
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 等待隊列中所有還有用的結點,都向前移動
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- // 如果后繼結點不為空,
- if (s != null)
- LockSupport.unpark(s.thread);
- }
方法unparkSuccessor(Node),意味著真正要釋放鎖了,它傳入的是head節點,內部首先會發生的動作是獲取head節點的next節點,如果獲取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被掛起的線程.
原文地址:https://mp.weixin.qq.com/s/5BHdVpLrbjaxQ0CQ2UOi9A