一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - JUC解析-AQS抽象隊列同步器

JUC解析-AQS抽象隊列同步器

2021-04-14 01:28java寶典iTengyu Java教程

本文主要為大家介紹JUC解析-AQS抽象隊列同步器,我們就從這幾個方面來入手解讀,但首先,我們得先知道以下幾個它的特點,以便于理解

 JUC解析-AQS抽象隊列同步器

抽象隊列同步器(AQS-AbstractQueuedSynchronizer)

從名字上來理解:

  • 抽象:是抽象類,具體由子類實現
  • 隊列:數據結構是隊列,使用隊列存儲數據
  • 同步:基于它可以實現同步功能

我們就從這幾個方面來入手解讀,但首先,我們得先知道以下幾個它的特點,以便于理解

AbstractQueuedSynchronizer特點

1.AQS可以實現獨占鎖和共享鎖。

2.獨占鎖exclusive是一個悲觀鎖。保證只有一個線程經過一個阻塞點,只有一個線程可以獲得鎖。

3.共享鎖shared是一個樂觀鎖。可以允許多個線程阻塞點,可以多個線程同時獲取到鎖。它允許一個資源可以被多個讀操作,或者被一個寫操作訪問,但是兩個操作不能同時訪問。

4.AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0無鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,可以確保對state的操作是安全的。

5.AQS是通過一個CLH隊列實現的(CLH鎖即Craig, Landin, and Hagersten (CLH) locks,CLH鎖是一個自旋鎖,能確保無饑餓性,提供先來先服務的公平性。CLH鎖也是一種基于鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。)

抽象

我們來扒一扒源碼可以看到它繼承于AbstractOwnableSynchronizer它是一個抽象類.

  1. public abstract class AbstractQueuedSynchronizer 
  2.     extends AbstractOwnableSynchronizer 
  3.     implements java.io.Serializable 

AQS內部使用了一個volatile的變量state來作為資源的標識。同時定義了幾個獲取和改變state的protected方法,子類可以覆蓋這些方法來實現自己的邏輯.

可以看到類中為我們提供了幾個protected級別的方法,它們分別是:

  1. //創建一個隊列同步器實例,初始state是0 
  2. protected AbstractQueuedSynchronizer() { } 
  3.  
  4. //返回同步狀態的當前值。 
  5. protected final int getState() { 
  6.         return state; 
  7.  
  8. //設置同步狀態的值 
  9. protected final void setState(int newState) { 
  10.         state = newState; 
  11.     } 
  12.  
  13. //獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。 
  14. protected boolean tryAcquire(int arg) { 
  15.         throw new UnsupportedOperationException(); 
  16.     }     
  17.      
  18.      
  19. //獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。 
  20. protected boolean tryRelease(int arg) { 
  21.         throw new UnsupportedOperationException(); 
  22.     } 
  23.      
  24.      
  25. //共享方式。嘗試獲取資源。負數表示失??;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源 
  26. protected int tryAcquireShared(int arg) { 
  27.         throw new UnsupportedOperationException(); 
  28.     }     
  29.      
  30.      
  31. //共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。 
  32. protected boolean tryReleaseShared(int arg) { 
  33.         throw new UnsupportedOperationException(); 
  34.     } 
  35.      

這些方法雖然都是protected方法,但是它們并沒有在AQS具體實現,而是直接拋出異常,AQS實現了一系列主要的邏輯 由此可知,AQS是一個抽象的用于構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的同步器,比如我們提到的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。

我們自己也能利用AQS非常輕松容易地構造出自定義的同步器,只要子類實現它的幾個protected方法就可以了.

隊列

AQS類本身實現的是具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等)。它內部使用了一個先進先出(FIFO)的雙端隊列(CLH),并使用了兩個指針head和tail用于標識隊列的頭部和尾部。其數據結構如圖:

JUC解析-AQS抽象隊列同步器

隊列并不是直接儲存線程,而是儲存擁有線程的Node節點。

我們來看看Node的結構:

  1. static final class Node { 
  2.     // 標記一個結點(對應的線程)在共享模式下等待 
  3.     static final Node SHARED = new Node(); 
  4.     // 標記一個結點(對應的線程)在獨占模式下等待 
  5.     static final Node EXCLUSIVE = null;  
  6.  
  7.     // waitStatus的值,表示該結點(對應的線程)已被取消 
  8.     static final int CANCELLED = 1;  
  9.     // waitStatus的值,表示后繼結點(對應的線程)需要被喚醒 
  10.     static final int SIGNAL = -1; 
  11.     // waitStatus的值,表示該結點(對應的線程)在等待某一條件 
  12.     static final int CONDITION = -2; 
  13.      
  14.     //waitStatus的值,表示有資源可用,新head結點需要繼續喚醒后繼結點 
  15.     //(共享模式下,多線程并發釋放資源,而head喚醒其后繼結點后, 
  16.     //需要把多出來的資源留給后面的結點;設置新的head結點時,會繼續喚醒其后繼結點) 
  17.     static final int PROPAGATE = -3; 
  18.  
  19.     // 等待狀態,取值范圍,-3,-2,-1,0,1 
  20.     volatile int waitStatus; 
  21.     volatile Node prev; // 前驅結點 
  22.     volatile Node next; // 后繼結點 
  23.     volatile Thread thread; // 結點對應的線程 
  24.     Node nextWaiter; // 等待隊列里下一個等待條件的結點 
  25.  
  26.  
  27.     // 判斷共享模式的方法 
  28.     final boolean isShared() { 
  29.         return nextWaiter == SHARED; 
  30.     } 
  31.  
  32.     Node(Thread thread, Node mode) {     // Used by addWaiter 
  33.         this.nextWaiter = mode; 
  34.         this.thread = thread; 
  35.     } 
  36.  
  37.     // 其它方法忽略,可以參考具體的源碼 
  38.  
  39. // AQS里面的addWaiter私有方法 
  40. private Node addWaiter(Node mode) { 
  41.     // 使用了Node的這個構造函數 
  42.     Node node = new Node(Thread.currentThread(), mode); 
  43.     // 其它代碼省略 

過Node我們可以實現兩個隊列,一是通過prev和next實現CLH隊列(線程同步隊列,雙向隊列),二是nextWaiter實現Condition條件上的等待線程隊列(單向隊列),這個Condition主要用在ReentrantLock類中

同步

兩種同步方式:

  • 獨占模式(Exclusive):資源是獨占的,一次只能一個線程獲取。如ReentrantLock。
  • 共享模式(Share):同時可以被多個線程獲取,具體的資源個數可以通過參數指定。如Semaphore/CountDownLatch。

同時實現兩種模式的同步類,如ReadWriteLock

獲取資源

獲取資源的入口是acquire(int arg)方法。arg是要獲取的資源的個數,在獨占模式下始終為1。

  1. public final void acquire(int arg) { 
  2.     if (!tryAcquire(arg) && 
  3.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
  4.         selfInterrupt(); 

首先調用tryAcquire(arg)嘗試去獲取資源。前面提到了這個方法是在子類具體實現的如果獲取資源失敗,就通過addWaiter(Node.EXCLUSIVE)方法把這個線程插入到等待隊列中。其中傳入的參數代表要插入的Node是獨占式的。這個方法的具體實現:

  1. private Node addWaiter(Node mode) { 
  2.     // 生成該線程對應的Node節點 
  3.     Node node = new Node(Thread.currentThread(), mode); 
  4.     // 將Node插入隊列中 
  5.     Node pred = tail; 
  6.     if (pred != null) { 
  7.         node.prev = pred; 
  8.         // 使用CAS嘗試,如果成功就返回 
  9.         if (compareAndSetTail(pred, node)) { 
  10.             pred.next = node; 
  11.             return node; 
  12.         } 
  13.     } 
  14.     // 如果等待隊列為空或者上述CAS失敗,再自旋CAS插入 
  15.     enq(node); 
  16.     return node; 
  17.  
  18. //AQS中會存在多個線程同時爭奪資源的情況, 
  19. //因此肯定會出現多個線程同時插入節點的操作, 
  20. //在這里是通過CAS自旋的方式保證了操作的線程安全性。 
  21.  
  22. // 自旋CAS插入等待隊列 
  23. private Node enq(final Node node) { 
  24.     for (;;) { 
  25.         Node t = tail; 
  26.         if (t == null) { // Must initialize 
  27.             if (compareAndSetHead(new Node())) 
  28.                 tail = head; 
  29.         } else { 
  30.             node.prev = t; 
  31.             if (compareAndSetTail(t, node)) { 
  32.                 t.next = node; 
  33.                 return t; 
  34.             } 
  35.         } 
  36.     } 

若設置成功就代表自己獲取到了鎖,返回true。狀態為0設置1的動作在外部就有做過一次,內部再一次做只是提升概率,而且這樣的操作相對鎖來講不占開銷。如果狀態不是0,則判定當前線程是否為排它鎖的Owner,如果是Owner則嘗試將狀態增加acquires(也就是增加1),如果這個狀態值越界,則會拋出異常提示,若沒有越界,將狀態設置進去后返回true(實現了類似于偏向的功能,可重入,但是無需進一步征用)。如果狀態不是0,且自身不是owner,則返回false。

現在通過addWaiter方法,已經把一個Node放到等待隊列尾部了。而處于等待隊列的結點是從頭結點一個一個去獲取資源的。具體的實現我們來看看acquireQueued方法:

  1. final boolean acquireQueued(final Node node, int arg) { 
  2.     boolean failed = true
  3.     try { 
  4.         boolean interrupted = false
  5.         // 自旋 
  6.         for (;;) { 
  7.             final Node p = node.predecessor(); 
  8.             // 如果node的前驅結點p是head,表示node是第二個結點,就可以嘗試去獲取資源了 
  9.             if (p == head && tryAcquire(arg)) { 
  10.                 // 拿到資源后,將head指向該結點。 
  11.                 // 所以head所指的結點,就是當前獲取到資源的那個結點或null。 
  12.                 setHead(node);  
  13.                 p.next = null; // help GC 
  14.                 failed = false
  15.                 return interrupted; 
  16.             } 
  17.             // 如果自己可以休息了,就進入waiting狀態,直到被unpark() 
  18.             if (shouldParkAfterFailedAcquire(p, node) && 
  19.                 parkAndCheckInterrupt()) 
  20.                 interrupted = true
  21.         } 
  22.     } finally { 
  23.         if (failed) 
  24.             cancelAcquire(node); 
  25.     } 

這里parkAndCheckInterrupt方法內部使用到了LockSupport.park(this),順便簡單介紹一下park。LockSupport類是Java 6 引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類里的函數,歸結到Unsafe里,只有兩個函數:park(boolean isAbsolute, long time):阻塞當前線程 unpark(Thread jthread):使給定的線程停止阻塞

所以結點進入等待隊列后,是調用park使它進入阻塞狀態的。只有頭結點的線程是處于活躍狀態的。

acquire方法 獲取資源的流程:

JUC解析-AQS抽象隊列同步器

當然,獲取資源的方法除了acquire外,還有以下三個:

  • acquireInterruptibly:申請可中斷的資源(獨占模式)
  • acquireShared:申請共享模式的資源
  • acquireSharedInterruptibly:申請可中斷的資源(共享模式)

可中斷的意思是,在線程中斷時可能會拋出InterruptedException

釋放資源

釋放資源相比于獲取資源來說,會簡單許多。在AQS中只有一小段實現。

源碼:

  1. public final boolean release(int arg) { 
  2.     if (tryRelease(arg)) { 
  3.         Node h = head; 
  4.         if (h != null && h.waitStatus != 0) 
  5.             unparkSuccessor(h); 
  6.         return true
  7.     } 
  8.     return false

tryRelease方法 這個動作可以認為就是一個設置鎖狀態的操作,而且是將狀態減掉傳入的參數值(參數是1),如果結果狀態為0,就將排它鎖的Owner設置為null,以使得其它的線程有機會進行執行。

在排它鎖中,加鎖的時候狀態會增加1(當然可以自己修改這個值),在解鎖的時候減掉1,同一個鎖,在可以重入后,可能會被疊加為2、3、4這些值,只有unlock()的次數與lock()的次數對應才會將Owner線程設置為空,而且也只有這種情況下才會返回true。這一點大家寫代碼要注意,如果是在循環體中lock()或故意使用兩次以上的lock(),而最終只有一次unlock(),最終可能無法釋放鎖。導致死鎖.

  1. private void unparkSuccessor(Node node) { 
  2.     // 如果狀態是負數,嘗試把它設置為0 
  3.     int ws = node.waitStatus; 
  4.     if (ws < 0) 
  5.         compareAndSetWaitStatus(node, ws, 0); 
  6.     // 得到頭結點的后繼結點head.next 
  7.     Node s = node.next
  8.     // 如果這個后繼結點為空或者狀態大于0 
  9.     // 通過前面的定義我們知道,大于0只有一種可能,就是這個結點已被取消 
  10.     if (s == null || s.waitStatus > 0) { 
  11.         s = null
  12.         // 等待隊列中所有還有用的結點,都向前移動 
  13.         for (Node t = tail; t != null && t != node; t = t.prev) 
  14.             if (t.waitStatus <= 0) 
  15.                 s = t; 
  16.     } 
  17.     // 如果后繼結點不為空, 
  18.     if (s != null
  19.         LockSupport.unpark(s.thread); 

方法unparkSuccessor(Node),意味著真正要釋放鎖了,它傳入的是head節點,內部首先會發生的動作是獲取head節點的next節點,如果獲取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被掛起的線程.

原文地址:https://mp.weixin.qq.com/s/5BHdVpLrbjaxQ0CQ2UOi9A

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 特黄特色大片免费视频播放 | 褪色的憎恨 | 日本一区二区三区久久 | 俄罗斯女同和女同xx | hezyo加勒比一区二区三区 | 大胸纲手被羞羞漫画网站 | 日本一区二区三区在线 观看网站 | 国产丰满美女做爰 | 舔到喷水| 国产一区二区三区福利 | 国产婷婷成人久久av免费高清 | 欧美日韩亚洲一区二区三区在线观看 | 国产色司机在线视频免费观看 | 日韩日韩日韩手机看片自拍 | 亚洲精品无码不卡在线观看 | 国产精品成人扳一级aa毛片 | 国产精品色拉拉免费看 | 日本一区二区三区久久精品 | 欧美人交性视频在线香蕉 | 欧美a欧美1级| zoz.zzz色| 免费成年人在线视频 | 国产高清在线精品一区二区三区 | 99视频在线免费观看 | 久久中文电影 | www免费视频com | 国产精品久久久久久久久 | 日韩一区二区三区免费 | 婷婷去我也去 | 恩爱夫妇交换小说 | 波多野结衣家庭教师 | 丝瓜秋葵番茄绿巨人在线观看 | 久久成人精品免费播放 | 欧美成人精品第一区二区三区 | 免费激情小视频 | 亚洲 欧美 国产 综合首页 | 欧美视频精品一区二区三区 | 午夜片无码区在线观看 | 男人捅女人漫画 | 福利视频导航大全 | 呜呜别塞了啊抽插 |