Semaphore(信號量)是JUC包中比較常用到的一個類,它是AQS共享模式的一個應用,可以允許多個線程同時對共享資源進行操作,并且可以有效的控制并發數,利用它可以很好的實現流量控制。Semaphore提供了一個許可證的概念,可以把這個許可證看作公共汽車車票,只有成功獲取車票的人才能夠上車,并且車票是有一定數量的,不可能毫無限制的發下去,這樣就會導致公交車超載。所以當車票發完的時候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那么他的位置將會空閑出來,因此如果這時其他人想要上車的話就又可以獲得車票了。利用Semaphore可以實現各種池,我們在本篇末尾將會動手寫一個簡易的數據庫連接池。首先我們來看一下Semaphore的構造器。
1
2
3
4
5
6
7
8
9
|
//構造器1 public Semaphore( int permits) { sync = new NonfairSync(permits); } //構造器2 public Semaphore( int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } |
Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的信號量在獲取許可證時會采用非公平方式獲取,使用構造器2可以通過參數指定獲取許可證的方式(公平or非公平)。Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,默認的是獲取和釋放一個許可證,也可以傳入參數來同時獲取和釋放多個許可證。在本篇中我們只講每次獲取和釋放一個許可證的情況。
1.獲取許可證
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
//獲取一個許可證(響應中斷) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } //獲取一個許可證(不響應中斷) public void acquireUninterruptibly() { sync.acquireShared( 1 ); } //嘗試獲取許可證(非公平獲取) public boolean tryAcquire() { return sync.nonfairTryAcquireShared( 1 ) >= 0 ; } //嘗試獲取許可證(定時獲取) public boolean tryAcquire( long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos( 1 , unit.toNanos(timeout)); } |
上面的API是Semaphore提供的默認獲取許可證操作。每次只獲取一個許可證,這也是現實生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之后可能會阻塞線程,而嘗試獲取則不會。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時我們比較常用到的是acquire方法去獲取許可證。下面我們就來看看它是怎樣獲取的。可以看到acquire方法里面直接就是調用sync.acquireSharedInterruptibly(1),這個方法是AQS里面的方法,我們在講AQS源碼系列文章的時候曾經講過,現在我們再來回顧一下。
1
2
3
4
5
6
7
8
9
10
11
12
|
//以可中斷模式獲取鎖(共享模式) public final void acquireSharedInterruptibly( int arg) throws InterruptedException { //首先判斷線程是否中斷, 如果是則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //1.嘗試去獲取鎖 if (tryAcquireShared(arg) < 0 ) { //2. 如果獲取失敗則進人該方法 doAcquireSharedInterruptibly(arg); } } |
acquireSharedInterruptibly方法首先就是去調用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS里面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
abstract static class Sync extends AbstractQueuedSynchronizer { //非公平方式嘗試獲取 final int nonfairTryAcquireShared( int acquires) { for (;;) { //獲取可用許可證 int available = getState(); //獲取剩余許可證 int remaining = available - acquires; //1.如果remaining小于0則直接返回remaining //2.如果remaining大于0則先更新同步狀態再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } } //非公平同步器 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync( int permits) { super (permits); } //嘗試獲取許可證 protected int tryAcquireShared( int acquires) { return nonfairTryAcquireShared(acquires); } } //公平同步器 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync( int permits) { super (permits); } //嘗試獲取許可證 protected int tryAcquireShared( int acquires) { for (;;) { //判斷同步隊列前面有沒有人排隊 if (hasQueuedPredecessors()) { //如果有的話就直接返回-1,表示嘗試獲取失敗 return - 1 ; } //獲取可用許可證 int available = getState(); //獲取剩余許可證 int remaining = available - acquires; //1.如果remaining小于0則直接返回remaining //2.如果remaining大于0則先更新同步狀態再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } } |
這里需要注意的是NonfairSync的tryAcquireShared方法直接調用的是nonfairTryAcquireShared方法,這個方法是在父類Sync里面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去參入的參數,如果結果不小于0的話證明還有可用的許可證,那么就直接使用CAS操作更新同步狀態的值,最后不管結果是否小于0都會返回該結果值。這里我們要了解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前線程獲取成功但后續線程不能再獲取,正數表示當前線程獲取成功并且后續線程也能夠獲取。我們再來看acquireSharedInterruptibly方法的代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
//以可中斷模式獲取鎖(共享模式) public final void acquireSharedInterruptibly( int arg) throws InterruptedException { //首先判斷線程是否中斷, 如果是則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //1.嘗試去獲取鎖 //負數:表示獲取失敗 //零值:表示當前線程獲取成功, 但是后繼線程不能再獲取了 //正數:表示當前線程獲取成功, 并且后繼線程同樣可以獲取成功 if (tryAcquireShared(arg) < 0 ) { //2. 如果獲取失敗則進人該方法 doAcquireSharedInterruptibly(arg); } } |
如果返回的remaining小于0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來就會調用doAcquireSharedInterruptibly方法,這個方法我們在講AQS的時候講過,它會將當前線程包裝成結點放入同步隊列尾部,并且有可能掛起線程。這也是當remaining小于0時線程會排隊阻塞的原因。而如果返回的remaining>=0的話就代表當前線程獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會再去調用doAcquireSharedInterruptibly方法阻塞當前線程了。以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此之前先去調用hasQueuedPredecessors方法判斷同步隊列是否有人在排隊,如果有的話就直接return -1表示獲取失敗,否則才繼續執行下面和非公平獲取一樣的步驟。
2.釋放許可證
1
2
3
4
|
//釋放一個許可證 public void release() { sync.releaseShared( 1 ); } |
調用release方法是釋放一個許可證,它的操作很簡單,就調用了AQS的releaseShared方法,我們來看看這個方法。
1
2
3
4
5
6
7
8
9
10
|
//釋放鎖的操作(共享模式) public final boolean releaseShared( int arg) { //1.嘗試去釋放鎖 if (tryReleaseShared(arg)) { //2.如果釋放成功就喚醒其他線程 doReleaseShared(); return true ; } return false ; } |
AQS的releaseShared方法首先調用tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync里面。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
abstract static class Sync extends AbstractQueuedSynchronizer { ... //嘗試釋放操作 protected final boolean tryReleaseShared( int releases) { for (;;) { //獲取當前同步狀態 int current = getState(); //將當前同步狀態加上傳入的參數 int next = current + releases; //如果相加結果小于當前同步狀態的話就報錯 if (next < current) { throw new Error( "Maximum permit count exceeded" ); } //以CAS方式更新同步狀態的值, 更新成功則返回true, 否則繼續循環 if (compareAndSetState(current, next)) { return true ; } } } ... } |
可以看到tryReleaseShared方法里面采用for循環進行自旋,首先獲取同步狀態,將同步狀態加上傳入的參數,然后以CAS方式更新同步狀態,更新成功就返回true并跳出方法,否則就繼續循環直到成功為止,這就是Semaphore釋放許可證的流程。
3.動手寫個連接池
Semaphore代碼并沒有很復雜,常用的操作就是獲取和釋放一個許可證,這些操作的實現邏輯也都比較簡單,但這并不妨礙Semaphore的廣泛應用。下面我們就來利用Semaphore實現一個簡單的數據庫連接池,通過這個例子希望讀者們能更加深入的掌握Semaphore的運用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
public class ConnectPool { //連接池大小 private int size; //數據庫連接集合 private Connect[] connects; //連接狀態標志 private boolean [] connectFlag; //剩余可用連接數 private volatile int available; //信號量 private Semaphore semaphore; //構造器 public ConnectPool( int size) { this .size = size; this .available = size; semaphore = new Semaphore(size, true ); connects = new Connect[size]; connectFlag = new boolean [size]; initConnects(); } //初始化連接 private void initConnects() { //生成指定數量的數據庫連接 for ( int i = 0 ; i < this .size; i++) { connects[i] = new Connect(); } } //獲取數據庫連接 private synchronized Connect getConnect(){ for ( int i = 0 ; i < connectFlag.length; i++) { //遍歷集合找到未使用的連接 if (!connectFlag[i]) { //將連接設置為使用中 connectFlag[i] = true ; //可用連接數減1 available--; System.out.println( "【" +Thread.currentThread().getName()+ "】以獲取連接 剩余連接數:" + available); //返回連接引用 return connects[i]; } } return null ; } //獲取一個連接 public Connect openConnect() throws InterruptedException { //獲取許可證 semaphore.acquire(); //獲取數據庫連接 return getConnect(); } //釋放一個連接 public synchronized void release(Connect connect) { for ( int i = 0 ; i < this .size; i++) { if (connect == connects[i]){ //將連接設置為未使用 connectFlag[i] = false ; //可用連接數加1 available++; System.out.println( "【" +Thread.currentThread().getName()+ "】以釋放連接 剩余連接數:" + available); //釋放許可證 semaphore.release(); } } } //剩余可用連接數 public int available() { return available; } } |
測試代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class TestThread extends Thread { private static ConnectPool pool = new ConnectPool( 3 ); @Override public void run() { try { Connect connect = pool.openConnect(); Thread.sleep( 100 ); //休息一下 pool.release(connect); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { for ( int i = 0 ; i < 10 ; i++) { new TestThread().start(); } } } |
測試結果:
我們使用一個數組來存放數據庫連接的引用,在初始化連接池的時候會調用initConnects方法創建指定數量的數據庫連接,并將它們的引用存放到數組中,此外還有一個相同大小的數組來記錄連接是否可用。每當外部線程請求獲取一個連接時,首先調用semaphore.acquire()方法獲取一個許可證,然后將連接狀態設置為使用中,最后返回該連接的引用。許可證的數量由構造時傳入的參數決定,每調用一次semaphore.acquire()方法許可證數量減1,當數量減為0時說明已經沒有連接可以使用了,這時如果其他線程再來獲取就會被阻塞。每當線程釋放一個連接的時候會調用semaphore.release()將許可證釋放,此時許可證的總量又會增加,代表可用的連接數增加了,那么之前被阻塞的線程將會醒來繼續獲取連接,這時再次獲取就能夠成功獲取連接了。測試示例中初始化了一個3個連接的連接池,我們從測試結果中可以看到,每當線程獲取一個連接剩余的連接數將會減1,等到減為0時其他線程就不能再獲取了,此時必須等待一個線程將連接釋放之后才能繼續獲取。可以看到剩余連接數總是在0到3之間變動,說明我們這次的測試是成功的。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.cnblogs.com/liuyun1995/p/8474026.html