前言:
在 Java并發內存模型詳情了解到多進程(線程)讀取共享資源的時候存在競爭條件。
計算機中通過設計同步器來協調進程(線程)之間執行順序。同步器作用就像登機安檢人員一樣可以協調旅客按順序通過。
在Java中,同步器可以理解為一個對象,它根據自身狀態協調線程的執行順序。比如鎖(Lock),信號量(Semaphore),屏障(CyclicBarrier),阻塞隊列(Blocking Queue)。
這些同步器在功能設計上有所不同,但是內部實現上有共通的地方。
1、同步器
同步器的設計一般包含幾個方面:狀態變量設計(同步器內部狀態),訪問條件設定,狀態更新,等待方式,通知策略。
訪問條件是控制線程是否能執行(訪問共享對象)的條件,它往往與狀態變量緊密相關。而通知策略是線程釋放鎖定狀態后通知其它等待線程的方式,一般有以下幾種情況:
- 通知所有等待的線程。
- 通知1個隨機的N個等待線程。
- 通知1個特定的N個等待線程
看下面例子,通過鎖方式的同步器
public class Lock{ // 狀態變量 isLocked private boolean isLocked = false; public synchronized void lock() throws InterruptedException{ // 訪問條件 當isLocked=false 時獲得訪問權限否則等待 while(isLocked){ // 阻塞等待 wait(); } //狀態更新 線程獲得訪問權限 isLocked = true; } public synchronized void unlock(){ //狀態更新 線程釋放訪問權限 isLocked = false; // 通知策略 object.notify | object.notifyAll notify(); } }
我們用計數信號量控制同時執行操作活動數。這里模擬一個連接池。
public class PoolSemaphore { // 狀態變量 actives 計數器 private int actives = 0; private int max; public PoolSemaphore(int max) { this.max = max; } public synchronized void acquire() throws InterruptedException { //訪問條件 激活數小于最大限制時,獲得訪問權限否則等待 while (this.actives == max) wait(); //狀態更新 線程獲得訪問權限 this.actives++; // 通知策略 object.notify | object.notifyAll this.notify(); } public synchronized void release() throws InterruptedException { //訪問條件 激活數不為0時,獲得訪問權限否則等待 while (this.actives == 0) wait(); //狀態更新 線程獲得訪問權限 this.actives--; // 通知策略 object.notify | object.notifyAll this.notify(); } }
1.1 原子指令
同步器設計里面,最重要的操作邏輯是“如果滿足條件,以更新狀態變量來標志線程獲得或釋放訪問權限”,該操作應具備原子性。
比如test-and-set 計算機原子指令,意思是進行條件判斷滿足則設置新值。
function Lock(boolean *lock) { while (test_and_set(lock) == 1); }
另外還有很多原子指令 fetch-and-add compare-and-swap,注意這些指令需硬件支持才有效。
同步操作中,利用計算機原子指令,可以避開鎖,提升效率。java中沒有 test-and-set 的支持,不過 java.util.concurrent.atomic 給我們提供了很多原子類API,里面支持了 getAndSet 和compareAndSet 操作。
看下面例子,主要在區別是等待方式不一樣,上面是通過wait()阻塞等待,下面是無阻塞循環。
public class Lock{ // 狀態變量 isLocked private AtomicBoolean isLocked = new AtomicBoolean(false); public void lock() throws InterruptedException{ // 等待方式 變為自旋等待 while(!isLocked.compareAndSet(false, true)); //狀態更新 線程獲得訪問權限 isLocked.set(true); } public synchronized void unlock(){ //狀態更新 線程釋放訪問權限 isLocked.set(false); } }
1.2 關于阻塞擴展說明
阻塞意味著需要將進程或線程狀態進行轉存,以便還原后恢復執行。這種操作是昂貴繁重,而線程基于進程之上相對比較輕量。線程的阻塞在不同編程平臺實現方式也有所不同,像Java是基于JVM運行,所以它由JVM完成實現。
在《Java Concurrency in Practice》中,作者提到
競爭性同步可能需要OS活動,這增加了成本。當爭用鎖時,未獲取鎖的線程必須阻塞。 JVM可以通過旋轉等待(反復嘗試獲取鎖直到成功)來實現阻塞,也可以通過操作系統掛起阻塞的線程來實現阻塞。哪種效率更高取決于上下文切換開銷與鎖定可用之前的時間之間的關系。對于短暫的等待,最好使用自旋等待;對于長時間的等待,最好使用暫停。一些JVM基于對過去等待時間的分析數據來自適應地在這兩者之間進行選擇,但是大多數JVM只是掛起線程等待鎖定。
從上面可以看出JVM實現阻塞兩種方式
- 旋轉等待(spin-waiting),簡單理解是不暫停執行,以循環的方式等待,適合短時間場景。
- 通過操作系統掛起線程。
JVM中通過 -XX: +UseSpinning 開啟旋轉等待, -XX: PreBlockSpi =10指定最大旋轉次數。
2、AQS
AQS是AbstractQueuedSynchronizer簡稱。本節對AQS只做簡單闡述,并不全面。
java.util.concurrent包中的 ReentrantLock,CountDownLatch,Semaphore,CyclicBarrier等都是基于是AQS同步器實現。
狀態變量 是用 int state 來表示,狀態的獲取與更新通過以下API操作。
int getState() void setState(int newState) boolean compareAndSetState(int expect, int update)
該狀態值在不同API中有不同表示意義。比如ReentrantLock中表示持有鎖的線程獲取鎖的次數,Semaphore表示剩余許可數。
關于等待方式和通知策略的設計
AQS通過維護一個FIFO同步隊列(Sync queue)來進行同步管理。當多線程爭用共享資源時被阻塞入隊。而線程阻塞與喚醒是通過 LockSupport.park/unpark API實現。
它定義了兩種資源共享方式。
- Exclusive(獨占,只有一個線程能執行,如ReentrantLock)
- Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)
每個節點包含waitStatus(節點狀態),prev(前繼),next(后繼),thread(入隊時線程),nextWaiter(condition隊列的后繼節點)
waitStatus 有以下取值:
- CANCELLED(1) 表示線程已取消。當發生超時或中斷,節點狀態變為取消,之后狀態不再改變。
- SIGNAL(-1) 表示后繼節點等待前繼的喚醒。后繼節點入隊時,會將前繼狀態更新為SIGNAL。
- CONDITION(-2) 表示線程在Condition queue 里面等待。當其他線程調用了Condition.signal()方法后,CONDITION狀態的節點將從 Condition queue 轉移到 Sync queue,等待獲取鎖。
- PROPAGATE(-3) 在共享模式下,當前節點釋放后,確保有效通知后繼節點。
- (0) 節點加入隊列時的默認狀態。
AQS 幾個關鍵 API
- tryAcquire(int) 獨占方式下,嘗試去獲取資源。成功返回true,否則false。
- tryRelease(int) 獨占方式下,嘗試釋放資源,成功返回true,否則false。
- tryAcquireShared(int) 共享方式下,嘗試獲取資源。返回負數為失敗,零和正數為成功并表示剩余資源。
- tryReleaseShared(int) 共享方式下,嘗試釋放資源,如果釋放后允許喚醒后續等待節點返回true,否則false。
- isHeldExclusively() 判斷線程是否正在獨占資源。
2.1 acquire(int arg)
public final void acquire(int arg) { if ( // 嘗試直接去獲取資源,如果成功則直接返回 !tryAcquire(arg) && //線程阻塞在同步隊列等待獲取資源。等待過程中被中斷,則返回true,否則false acquireQueued( // 標記該線程為獨占方式,并加入同步隊列尾部。 addWaiter(Node.EXCLUSIVE), arg) ) selfInterrupt(); }
2.2 release(int arg)
public final boolean release(int arg) { // 嘗試釋放資源 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 喚醒下一個線程(后繼節點) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { .... Node s = node.next; // 找到后繼節點 if (s == null || s.waitStatus > 0) {//無后繼或節點已取消 s = null; // 找到有效的等待節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 喚醒線程 }
總結:
文章記錄并發編程中同步器設計的一些共性特征。并簡單介紹了Java中的AQS。
到此這篇關于淺談Java并發之同步器設計的文章就介紹到這了,更多相關Java并發之同步器設計內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.onlythinking.com/2020/06/09/java%E5%B9%B6%E5%8F%91%E4%B9%8Bsynchronizer%E8%A7%A3%E6%9E%90/