一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java并發編程Semaphore計數信號量詳解

Java并發編程Semaphore計數信號量詳解

2021-01-25 11:26IAMTJW Java教程

這篇文章主要介紹了Java并發編程Semaphore計數信號量詳解,具有一定參考價值,需要的朋友可以了解下。

Semaphore 是一個計數信號量,它的本質是一個共享鎖。信號量維護了一個信號量許可集。線程可以通過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過release()來釋放它所持有的信號量許可(用完信號量之后必須釋放,不然其他線程可能會無法獲取信號量)。

簡單示例:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package me.socketthread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreLearn {
  //信號量總數
  private static final int SEM_MAX = 12;
  public static void main(String[] args) { 
    Semaphore sem = new Semaphore(SEM_MAX);
    //創建線程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    //在線程池中執行任務
    threadPool.execute(new MyThread(sem, 7));
    threadPool.execute(new MyThread(sem, 4));
    threadPool.execute(new MyThread(sem, 2));
    //關閉池
    threadPool.shutdown();
  }
}
  class MyThread extends Thread {
    private volatile Semaphore sem;  // 信號量
    private int count;    // 申請信號量的大小 
     
    MyThread(Semaphore sem, int count) {
      this.sem = sem;
      this.count = count;
    }
    public void run() {
      try {
       // 從信號量中獲取count個許可
        sem.acquire(count);
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + " acquire count="+count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 釋放給定數目的許可,將其返回到信號量。
        sem.release(count);
        System.out.println(Thread.currentThread().getName() + " release " + count + "");
      }
    }
  }

執行結果:

?
1
2
3
4
5
6
pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=7
pool-1-thread-1 release 7
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=2
pool-1-thread-3 release 2

線程1和線程2會并發執行,因為兩者的信號量和沒有超過總信號量,當前兩個線程釋放掉信號量之后線程3才能繼續執行。

源碼分析:

1、構造函數

在構造函數中會初始化信號量值,這值最終是作為鎖標志位state的值

?
1
Semaphore sem = new Semaphore(12);//簡單來說就是給鎖標識位state賦值為12

2、Semaphore.acquire(n);簡單理解為獲取鎖資源,如果獲取不到線程阻塞

?
1
Semaphore.acquire(n);//從鎖標識位state中獲取n個信號量,簡單來說是state = state-n 此時state大于0表示可以獲取信號量,如果小于0則將線程阻塞
?
1
2
3
4
5
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //獲取鎖
    sync.acquireSharedInterruptibly(permits);
  }

acquireSharedInterruptibly中的操作是獲取鎖資源,如果可以獲取則將state= state-permits,否則將線程阻塞

?
1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
      throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//tryAcquireShared中嘗試獲取鎖資源
      doAcquireSharedInterruptibly(arg); //將線程阻塞
  }

tryAcquireShared中的操作是嘗試獲取信號量值,簡單來說就是state=state-acquires ,如果此時小于0則返回負值,否則返回大于新值,再判斷是否將當線程線程阻塞

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
      //獲取state值
        int available = getState();
      //從state中獲取信號量
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
        //如果信號量小于0則直接返回,表示無法獲取信號量,否則將state值修改為新值
          return remaining;
      }
    }

doAcquireSharedInterruptibly中的操作簡單來說是將當前線程添加到FIFO隊列中并將當前線程阻塞。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/會將線程添加到FIFO隊列中,并阻塞 
private void doAcquireSharedInterruptibly(int arg) 
    throws InterruptedException { 
    //將線程添加到FIFO隊列中 
    final Node node = addWaiter(Node.SHARED); 
    boolean failed = true
    try
      for (;;) { 
        final Node p = node.predecessor(); 
        if (p == head) { 
          int r = tryAcquireShared(arg); 
          if (r >= 0) { 
            setHeadAndPropagate(node, r); 
            p.next = null; // help GC 
            failed = false
            return
          
        
        //parkAndCheckInterrupt完成線程的阻塞操作 
        if (shouldParkAfterFailedAcquire(p, node) && 
          parkAndCheckInterrupt()) 
          throw new InterruptedException(); 
      
    } finally
      if (failed) 
        cancelAcquire(node); 
    
  }

3、Semaphore.release(int permits),這個函數的實現操作是將state = state+permits并喚起處于FIFO隊列中的阻塞線程。

?
1
2
3
4
5
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
  //state = state+permits,并將FIFO隊列中的阻塞線程喚起
    sync.releaseShared(permits);
  }

releaseShared中的操作是將state = state+permits,并將FIFO隊列中的阻塞線程喚起。

?
1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
    //tryReleaseShared將state設置為state = state+arg
    if (tryReleaseShared(arg)) {
      //喚起FIFO隊列中的阻塞線程
      doReleaseShared();
      return true;
    }
    return false;
  }

tryReleaseShared將state設置為state = state+arg

?
1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        //將state值設置為state=state+releases
        if (compareAndSetState(current, next))
          return true;
      }
    }

doReleaseShared()喚起FIFO隊列中的阻塞線程

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doReleaseShared() { 
  
    for (;;) { 
      Node h = head; 
      if (h != null && h != tail) { 
        int ws = h.waitStatus; 
        if (ws == Node.SIGNAL) { 
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
            continue;      // loop to recheck cases 
          //完成阻塞線程的喚起操作 
          unparkSuccessor(h); 
        
        else if (ws == 0 && 
             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
          continue;        // loop on failed CAS 
      
      if (h == head)          // loop if head changed 
        break
    
  }

總結:Semaphore簡單來說設置了一個信號量池state,當線程執行時會從state中獲取值,如果可以獲取則線程執行,并且在執行后將獲取的資源返回到信號量池中,并喚起其他阻塞線程;如果信號量池中的資源無法滿足某個線程的需求則將此線程阻塞。

Semaphore源碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class Semaphore implements java.io.Serializable {
  private static final long serialVersionUID = -3222578661600680210L;
  private final Sync sync;
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    //設置鎖標識位state的初始值
    Sync(int permits) {
      setState(permits);
    }
    //獲取鎖標識位state的值,如果state值大于其需要的值則表示鎖可以獲取
    final int getPermits() {
      return getState();
    }
    //獲取state值減去acquires后的值,如果大于等于0則表示鎖可以獲取
    final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
    //釋放鎖
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        //將state值加上release值
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
          return true;
      }
    }
    //將state的值減去reductions
    final void reducePermits(int reductions) {
      for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
          throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
          return;
      }
    }
    final int drainPermits() {
      for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
          return current;
      }
    }
  }
  //非公平鎖
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
    }
  }
  //公平鎖
  static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
  }
  //設置信號量
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
  //獲取鎖
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
  public void acquireUninterruptibly() {
    sync.acquireShared(1);
  }
  public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
  }
  public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }
  public void release() {
    sync.releaseShared(1);
  }
  //獲取permits值鎖
  public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
  }
  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
  }
  public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
  }
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  }
  //釋放
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
  }
  public int availablePermits() {
    return sync.getPermits();
  }
  public int drainPermits() {
    return sync.drainPermits();
  }
  protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
  }
  public boolean isFair() {
    return sync instanceof FairSync;
  }
  public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }
  public final int getQueueLength() {
    return sync.getQueueLength();
  }
  protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
  }
  public String toString() {
    return super.toString() + "[Permits = " + sync.getPermits() + "]";
  }
}

總結

以上就是本文關于Java并發編程Semaphore計數信號量詳解的全部內容,希望對大家有所幫助。有什么問題,可以留言交流討論。感謝朋友們對本站的支持!

原文鏈接:http://blog.csdn.net/qq924862077/article/details/70224646

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产大片视频免费观看 | 美女岳肉太深了使劲 | 国产91短视频 | bt伙计最新合集 | 欧美亚洲视频在线观看 | 欧美精品v日韩精品v国产精品 | 日产精品卡一卡2卡三卡乱码工厂 | 国产一区二区三区毛片 | 高清色黄毛片一级毛片 | 亚洲国产精品福利片在线观看 | 国产乱码一卡二卡3卡四卡 国产乱插 | 91混血大战上海双胞胎 | 国产精品一区二区三区免费 | 丝袜捆绑调教丨vk | 91免费视频国产 | 高清国产欧美一v精品 | 日本特级a禁片在线播放 | 911精品国产亚洲日本美国韩国 | 日韩亚洲欧美理论片 | 国产精品色拉拉免费看 | 91香蕉在线 | 天若有情1992国语版完整版 | 射综合网 | 5x视频在线观看 | 欧美成人午夜片一一在线观看 | 久草在在线免视频在线观看 | 国产欧美成人不卡视频 | 桃乃木香奈作品在线 | 欧洲另类一二三四区 | 处女私拍 | 2021福利视频 | 免费黄色片网站 | 女主被男主做哭失禁高h | 国产一级毛片外aaaa | 亚洲欧美日韩久久一区 | bt天堂在线最新版www | 日本一区二区精品88 | 日本韩国一区二区三区 | 欧美一区精品 | 精品99在线观看 | 亚洲第一男人网站 |