一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 詳解Java 信號量Semaphore

詳解Java 信號量Semaphore

2020-09-15 00:56java小新人 Java教程

這篇文章主要介紹了Java 信號量Semaphore的相關資料,幫助大家更好的理解和學習Java并發,感興趣的朋友可以了解下

  Semaphore也是一個同步器,和前面兩篇說的CountDownLatch和CyclicBarrier不同,這是遞增的,初始化的時候可以指定一個值,但是不需要知道需要同步的線程個數,只需要在同步的地方調用acquire方法時指定需要同步的線程個數;

一.簡單使用

  同步兩個子線程,只有其中兩個子線程執行完畢,主線程才會執行:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.demo.study;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
 
public class Study0217 {
  //創建一個信號量的實例,信號量初始值為0
  static Semaphore semaphore = new Semaphore(0);
  
  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信號量加一
      semaphore.release();
    });
    
    pool.submit(()->{
      System.out.println("Thread2---start");
      //信號量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信號量加一
      semaphore.release();
    });
    //等待兩個子線程執行完畢就放過,必須要信號量等于2才放過
    semaphore.acquire(2);
    System.out.println("兩個子線程執行完畢");
    
    //關閉線程池,正在執行的任務繼續執行
    pool.shutdown();
 
  }
 
}

詳解Java 信號量Semaphore

這個信號量也可以復用,類似CyclicBarrier:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.example.demo.study;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
 
public class Study0217 {
  //創建一個信號量的實例,信號量初始值為0
  static Semaphore semaphore = new Semaphore(0);
  
  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信號量加一
      semaphore.release();
    });
    
    pool.submit(()->{
      System.out.println("Thread2---start");
      //信號量加一
      semaphore.release();
    });
    
    //等待兩個子線程執行完畢就放過,必須要信號量等于2才放過
    semaphore.acquire(2);
    System.out.println("子線程1,2執行完畢");
    
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信號量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread4---start");
      //信號量加一
      semaphore.release();
    });
    
    semaphore.acquire(2);
    System.out.println("子線程3,4執行完畢");
    
    //關閉線程池,正在執行的任務繼續執行
    pool.shutdown();
 
  }
 
}

詳解Java 信號量Semaphore

二.信號量原理 

  看看下面這個圖,可以知道信號量Semaphore還是根據AQS實現的,內部有個Sync工具類操作AQS,還分為公平策略和非公平策略;

詳解Java 信號量Semaphore

構造器:

?
1
2
3
4
5
6
7
8
//默認是非公平策略
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}
//可以根據第二個參數選擇是公平策略還是非公平策略
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire(int permits)方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void acquire(int permits) throws InterruptedException {
  if (permits < 0) throw new IllegalArgumentException();
  sync.acquireSharedInterruptibly(permits);
}
 
//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  //這里根據子類是公平策略還是非公平策略
  if (tryAcquireShared(arg) < 0)
    //獲取失敗會進入這里,將線程放入阻塞隊列,然后再嘗試,還是失敗的話就調用park方法掛起當前線程
    doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
  return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
  //一個無限循環,獲取state剩余的信號量,因為每調用一次release()方法的話,信號量就會加一,這里將
  //最新的信號量減去傳進來的參數比較,比如有兩個線程,其中一個線程已經調用了release方法,然后調用acquire(2)方法,那么
  //這里remaining的值就是-1,返回-1,然后當前線程就會被丟到阻塞隊列中去了;如果另外一個線程也調用了release方法,
  //那么此時的remaining==0,所以在這里的if中會調用CAS將0設置到state
  //
  for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 || compareAndSetState(available, remaining))
      return remaining;
  }
}
//公平策略
//和上面非公平差不多,只不過這里會查看阻塞隊列中當前節點前面有沒有前驅節點,有的話直接返回-1,
//就會把當前線程丟到阻塞隊列中阻塞去了,沒有前驅節點的話,就跟非公平模式一樣的了
protected int tryAcquireShared(int acquires) {
  for (;;) {
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||compareAndSetState(available, remaining))
      return remaining;
  }
}

再看看release(int permits)方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//這個方法的作用就是將信號量加一
public void release(int permits) {
  if (permits < 0) throw new IllegalArgumentException();
  sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
  //tryReleaseShared嘗試釋放資源
  if (tryReleaseShared(arg)) {
    //釋放資源成功就調用park方法喚醒喚醒AQS隊列中最前面的節點中的線程
    doReleaseShared();
    return true;
  }
  return false;
}
 
protected final boolean tryReleaseShared(int releases) {
  //一個無限循環,獲取state,然后加上傳進去的參數,如果新的state的值小于舊的state,說明已經超過了state的最大值,溢出了
  //沒有溢出的話,就用CAS更新state的值
  for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
      return true;
  }
}
 
private void doReleaseShared() {
  
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      //ws==Node.SIGNAL表示節點中線程需要被喚醒
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        //調用阻塞隊列中線程的unpark方法喚醒線程
        unparkSuccessor(h);
      }
      //ws == 0表示節點中線程是初始狀態
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }
    
    if (h == head)          // loop if head changed
      break;
  }
}

  以最上面的例子簡單說一下,其實不是很難,首先線程1和線程2分別去調用release方法,這個方法里面會將AQS中的state加一,但是在執行這個操作之前,主線程肯定會先到acquire(2),在這個方法里面,假如默認使用非公平策略,首先獲取當前的信號量state(state的初始值是0),用當前信號量減去2,如果小于0,那么當前主線程就會丟到AQS隊列中阻塞;

  這個時候線程1的release方法執行了,于是就把信號量state加一(此時state==1),CAS更新state為一,成功的話,就調用doReleaseShared()方法喚醒AQS阻塞隊列中最先掛起的線程(這里就是因為調用acquire方法而阻塞的主線程),主線程喚醒之后又會去獲取最新的信號量,與2比較,發現還是小于0,于是又會阻塞;

  線程2此時的release方法執行完成,重復線程一的操作,主線程喚醒之后(此時state==2),又去獲取最新的信號量發現是2,減去acquire方法的參數2等于0,于是就用CAS更新state的值,然后acquire方法也就執行完畢,主線程繼續執行后面的代碼;

  其實信號量還是很有意思的,記得在項目里,有人利用信號量實現了一個故障隔離,什么時候我可以把整理之后的代碼貼出來分享一下,還是很有意思的,就跟springcloud的熔斷機制差不多,場景是:比如你在service的一個方法調用第三方的接口,你不知道調不調得通,而且你不希望每次前端過來都會去調用,比如當調用失敗的次數超過100次,那么五分鐘之后才會再去實際調用這個第三方服務!這五分鐘內前調用這個服務,就會觸發我們這個故障隔離的機制,向前端返回一個特定的錯誤碼和錯誤信息!

以上就是詳解Java 信號量Semaphore的詳細內容,更多關于Java 信號量Semaphore的資料請關注服務器之家其它相關文章!

原文鏈接:https://www.cnblogs.com/wyq1995/p/12319707.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 成人在线播放视频 | 国产一区二区三区四区波多野结衣 | 娇妻终于接受了3p的调教 | 手机免费在线视频 | 国产东北三老头伦一肥婆 | 欧美灰丝袜丝交nylons | 国自产精品手机在线视频 | 丝袜老师好湿好紧我要进去了 | 亚洲欧美日韩高清 | 98pao强力打造高清免费 | 亚洲精品有码在线观看 | 日本不卡视频免费 | 海绵宝宝第二季全集免费观看 | 高清视频大片免费观看 | 午夜精品久久久久久久99 | 短篇艳妇系列 | 亚洲人和日本人hd | 5566中文字幕亚洲精品 | 高清欧美不卡一区二区三区 | 丝袜捆绑调教丨vk | 97综合 | 任我鲁精品视频精品 | 人妖欧美一区二区三区四区 | 千金在线观看 | 亚洲国产精品高清在线 | futa文| 牛牛在线观看 | 日产精品卡一卡2卡三卡乱码工厂 | 欧美一级视频在线高清观看 | 91丝袜足控免费网站xx | 胸奶好大好紧好湿好爽 | 国产亚洲玖玖玖在线观看 | 日本高清视频网站 | 91噜噜噜在线观看 | 韩国三级 720p | 久久足恋网 | 欧美18-19 | 肉车各种play文r | 大jjjj免费看视频 | 白白国产永久免费视频 | 青青久久精品国产免费看 |