Semaphore 是一個計數信號量,它的本質是一個共享鎖。信號量維護了一個信號量許可集。線程可以通過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過release()來釋放它所持有的信號量許可(用完信號量之后必須釋放,不然其他線程可能會無法獲取信號量)。
簡單示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package me.socketthread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreLearn { //信號量總數 private static final int SEM_MAX = 12 ; public static void main(String[] args) { Semaphore sem = new Semaphore(SEM_MAX); //創建線程池 ExecutorService threadPool = Executors.newFixedThreadPool( 3 ); //在線程池中執行任務 threadPool.execute( new MyThread(sem, 7 )); threadPool.execute( new MyThread(sem, 4 )); threadPool.execute( new MyThread(sem, 2 )); //關閉池 threadPool.shutdown(); } } class MyThread extends Thread { private volatile Semaphore sem; // 信號量 private int count; // 申請信號量的大小 MyThread(Semaphore sem, int count) { this .sem = sem; this .count = count; } public void run() { try { // 從信號量中獲取count個許可 sem.acquire(count); Thread.sleep( 2000 ); System.out.println(Thread.currentThread().getName() + " acquire count=" +count); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放給定數目的許可,將其返回到信號量。 sem.release(count); System.out.println(Thread.currentThread().getName() + " release " + count + "" ); } } } |
執行結果:
1
2
3
4
5
6
|
pool-1-thread-2 acquire count=4 pool-1-thread-1 acquire count=7 pool-1-thread-1 release 7 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=2 pool-1-thread-3 release 2 |
線程1和線程2會并發執行,因為兩者的信號量和沒有超過總信號量,當前兩個線程釋放掉信號量之后線程3才能繼續執行。
源碼分析:
1、構造函數
在構造函數中會初始化信號量值,這值最終是作為鎖標志位state的值
1
|
Semaphore sem = new Semaphore( 12 ); //簡單來說就是給鎖標識位state賦值為12 |
2、Semaphore.acquire(n);簡單理解為獲取鎖資源,如果獲取不到線程阻塞
1
|
Semaphore.acquire(n); //從鎖標識位state中獲取n個信號量,簡單來說是state = state-n 此時state大于0表示可以獲取信號量,如果小于0則將線程阻塞 |
1
2
3
4
5
|
public void acquire( int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); //獲取鎖 sync.acquireSharedInterruptibly(permits); } |
acquireSharedInterruptibly中的操作是獲取鎖資源,如果可以獲取則將state= state-permits,否則將線程阻塞
1
2
3
4
5
6
7
|
public final void acquireSharedInterruptibly( int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) //tryAcquireShared中嘗試獲取鎖資源 doAcquireSharedInterruptibly(arg); //將線程阻塞 } |
tryAcquireShared中的操作是嘗試獲取信號量值,簡單來說就是state=state-acquires ,如果此時小于0則返回負值,否則返回大于新值,再判斷是否將當線程線程阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected int tryAcquireShared( int acquires) { for (;;) { if (hasQueuedPredecessors()) return - 1 ; //獲取state值 int available = getState(); //從state中獲取信號量 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //如果信號量小于0則直接返回,表示無法獲取信號量,否則將state值修改為新值 return remaining; } } |
doAcquireSharedInterruptibly中的操作簡單來說是將當前線程添加到FIFO隊列中并將當前線程阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/會將線程添加到FIFO隊列中,并阻塞 private void doAcquireSharedInterruptibly( int arg) throws InterruptedException { //將線程添加到FIFO隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; // help GC failed = false ; return ; } } //parkAndCheckInterrupt完成線程的阻塞操作 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
3、Semaphore.release(int permits),這個函數的實現操作是將state = state+permits并喚起處于FIFO隊列中的阻塞線程。
1
2
3
4
5
|
public void release( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); //state = state+permits,并將FIFO隊列中的阻塞線程喚起 sync.releaseShared(permits); } |
releaseShared中的操作是將state = state+permits,并將FIFO隊列中的阻塞線程喚起。
1
2
3
4
5
6
7
8
9
|
public final boolean releaseShared( int arg) { //tryReleaseShared將state設置為state = state+arg if (tryReleaseShared(arg)) { //喚起FIFO隊列中的阻塞線程 doReleaseShared(); return true ; } return false ; } |
tryReleaseShared將state設置為state = state+arg
1
2
3
4
5
6
7
8
9
10
11
|
protected final boolean tryReleaseShared( int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error( "Maximum permit count exceeded" ); //將state值設置為state=state+releases if (compareAndSetState(current, next)) return true ; } } |
doReleaseShared()喚起FIFO隊列中的阻塞線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; // loop to recheck cases //完成阻塞線程的喚起操作 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; // loop on failed CAS } if (h == head) // loop if head changed break ; } } |
總結:Semaphore簡單來說設置了一個信號量池state,當線程執行時會從state中獲取值,如果可以獲取則線程執行,并且在執行后將獲取的資源返回到信號量池中,并喚起其他阻塞線程;如果信號量池中的資源無法滿足某個線程的需求則將此線程阻塞。
Semaphore源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //設置鎖標識位state的初始值 Sync( int permits) { setState(permits); } //獲取鎖標識位state的值,如果state值大于其需要的值則表示鎖可以獲取 final int getPermits() { return getState(); } //獲取state值減去acquires后的值,如果大于等于0則表示鎖可以獲取 final int nonfairTryAcquireShared( int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //釋放鎖 protected final boolean tryReleaseShared( int releases) { for (;;) { int current = getState(); //將state值加上release值 int next = current + releases; if (next < current) // overflow throw new Error( "Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } //將state的值減去reductions final void reducePermits( int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error( "Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } } //非公平鎖 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync( int permits) { super (permits); } protected int tryAcquireShared( int acquires) { return nonfairTryAcquireShared(acquires); } } //公平鎖 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync( int permits) { super (permits); } protected int tryAcquireShared( int acquires) { for (;;) { if (hasQueuedPredecessors()) return - 1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } //設置信號量 public Semaphore( int permits) { sync = new NonfairSync(permits); } public Semaphore( int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //獲取鎖 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } public void acquireUninterruptibly() { sync.acquireShared( 1 ); } public boolean tryAcquire() { return sync.nonfairTryAcquireShared( 1 ) >= 0 ; } public boolean tryAcquire( long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos( 1 , unit.toNanos(timeout)); } public void release() { sync.releaseShared( 1 ); } //獲取permits值鎖 public void acquire( int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0 ; } public boolean tryAcquire( int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } //釋放 public void release( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.releaseShared(permits); } public int availablePermits() { return sync.getPermits(); } public int drainPermits() { return sync.drainPermits(); } protected void reducePermits( int reduction) { if (reduction < 0 ) throw new IllegalArgumentException(); sync.reducePermits(reduction); } public boolean isFair() { return sync instanceof FairSync; } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final int getQueueLength() { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public String toString() { return super .toString() + "[Permits = " + sync.getPermits() + "]" ; } } |
總結
以上就是本文關于Java并發編程Semaphore計數信號量詳解的全部內容,希望對大家有所幫助。有什么問題,可以留言交流討論。感謝朋友們對本站的支持!
原文鏈接:http://blog.csdn.net/qq924862077/article/details/70224646