一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java同步框架AbstractQueuedSynchronizer詳解

Java同步框架AbstractQueuedSynchronizer詳解

2021-01-23 12:33一字馬胡 Java教程

本篇文章主要介紹了Java同步框架AbstractQueuedSynchronizer詳解,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

AbstractQueuedSynchronizer概述

AbstractQueuedSynchronizer是java中非常重要的一個框架類,它實現了最核心的多線程同步的語義,我們只要繼承AbstractQueuedSynchronizer就可以非常方便的實現我們自己的線程同步器,java中的鎖Lock就是基于AbstractQueuedSynchronizer來實現的。下面首先展示了AbstractQueuedSynchronizer類提供的一些方法:

Java同步框架AbstractQueuedSynchronizer詳解

AbstractQueuedSynchronizer類方法

在類結構上,AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,AbstractOwnableSynchronizer僅有的兩個方法是提供當前獨占模式的線程設置:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * The current owner of exclusive mode synchronization.
 */
private transient Thread exclusiveOwnerThread;
 
/**
 * Sets the thread that currently owns exclusive access.
 * A {@code null} argument indicates that no thread owns access.
 * This method does not otherwise impose any synchronization or
 * {@code volatile} field accesses.
 * @param thread the owner thread
 */
protected final void setExclusiveOwnerThread(Thread thread) {
  exclusiveOwnerThread = thread;
}
 
/**
 * Returns the thread last set by {@code setExclusiveOwnerThread},
 * or {@code null} if never set. This method does not otherwise
 * impose any synchronization or {@code volatile} field accesses.
 * @return the owner thread
 */
protected final Thread getExclusiveOwnerThread() {
  return exclusiveOwnerThread;
}

exclusiveOwnerThread代表的是當前獲得同步的線程,因為是獨占模式,在exclusiveOwnerThread持有同步的過程中其他的線程的任何同步獲取請求將不能得到滿足。

至此,需要說明的是,AbstractQueuedSynchronizer不僅支持獨占模式下的同步實現,還支持共享模式下的同步實現。在java的鎖的實現上就有共享鎖和獨占鎖的區別,而這些實現都是基于AbstractQueuedSynchronizer對于共享同步和獨占同步的支持。從上面展示的AbstractQueuedSynchronizer提供的方法中,我們可以發現AbstractQueuedSynchronizer的API大概分為三類:

  • 類似acquire(int)的一類是最基本的一類,不可中斷
  • 類似acquireInterruptibly(int)的一類可以被中斷
  • 類似tryAcquireNanos(int, long)的一類不僅可以被中斷,而且可以設置阻塞時間

上面的三種類型的API分為獨占和共享兩套,我們可以根據我們的需求來使用合適的API來做多線程同步。

下面是一個繼承AbstractQueuedSynchronizer來實現自己的同步器的一個示例:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Mutex implements Lock, java.io.Serializable {
 
  // Our internal helper class
  private static class Sync extends AbstractQueuedSynchronizer {
   // Reports whether in locked state
   protected boolean isHeldExclusively() {
    return getState() == 1;
   }
 
   // Acquires the lock if state is zero
   public boolean tryAcquire(int acquires) {
    assert acquires == 1; // Otherwise unused
    if (compareAndSetState(0, 1)) {
     setExclusiveOwnerThread(Thread.currentThread());
     return true;
    }
    return false;
   }
 
   // Releases the lock by setting state to zero
   protected boolean tryRelease(int releases) {
    assert releases == 1; // Otherwise unused
    if (getState() == 0) throw new IllegalMonitorStateException();
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
   }
 
   // Provides a Condition
   Condition newCondition() { return new ConditionObject(); }
 
   // Deserializes properly
   private void readObject(ObjectInputStream s)
     throws IOException, ClassNotFoundException {
    s.defaultReadObject();
    setState(0); // reset to unlocked state
   }
  }
 
  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();
 
  public void lock()        { sync.acquire(1); }
  public boolean tryLock()     { return sync.tryAcquire(1); }
  public void unlock()       { sync.release(1); }
  public Condition newCondition()  { return sync.newCondition(); }
  public boolean isLocked()     { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
   sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
 }}

Mutex實現的功能是:使用0來代表可以獲得同步變量,使用1來代表需要等待同步變量被釋放再獲取,這是一個簡單的獨占鎖實現,任何時刻只會有一個線程獲得鎖,其他請求獲取鎖的線程都會阻塞等待直到鎖被釋放,等待的線程將再次競爭來獲得鎖。Mutex給了我們很好的范例,我們要實現自己的線程同步器,那么就繼承AbstractQueuedSynchronizer實現其三個抽象方法,然后使用該實現類來做lock和unlock的操作,可以發現,AbstractQueuedSynchronizer框架為我們鋪平了道路,我們只需要做一點點改變就可以實現高效安全的線程同步去,下文中將分析AbstractQueuedSynchronizer是如何為我么提供如此強大得同步能力的。

AbstractQueuedSynchronizer實現細節

獨占模式

AbstractQueuedSynchronizer使用一個volatile類型的int來作為同步變量,任何想要獲得鎖的線程都需要來競爭該變量,獲得鎖的線程可以繼續業務流程的執行,而沒有獲得鎖的線程會被放到一個FIFO的隊列中去,等待再次競爭同步變量來獲得鎖。AbstractQueuedSynchronizer為每個沒有獲得鎖的線程封裝成一個Node再放到隊列中去,下面先來分析一下Node這個數據結構:

?
1
2
3
4
5
6
7
8
9
10
11
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL  = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

上面展示的是Node定義的四個狀態,需要注意的是只有一個狀態是大于0的,也就是CANCELLED,也就是被取消了,不需要為此線程協調同步變量的競爭了。其他幾個的意義見注釋。上一小節說到,AbstractQueuedSynchronizer提供獨占式和共享式兩種模式,AbstractQueuedSynchronizer使用下面的兩個變量來標志是共享還是獨占模式:

?
1
2
3
4
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

有趣的是,Node使用了一個變量nextWaiter來代表兩種含義,當在獨占模式下,nextWaiter表示下一個等在ConditionObject上的Node,在共享模式下就是SHARED,因為對于任何一個同步器來說,都不可能同時實現共享和獨占兩種模式的,更為專業的解釋為:

?
1
2
3
4
5
6
7
8
9
10
11
/**
 * Link to next node waiting on condition, or the special
 * value SHARED. Because condition queues are accessed only
 * when holding in exclusive mode, we just need a simple
 * linked queue to hold nodes while they are waiting on
 * conditions. They are then transferred to the queue to
 * re-acquire. And because conditions can only be exclusive,
 * we save a field by using special value to indicate shared
 * mode.
 */
Node nextWaiter;

AbstractQueuedSynchronizer使用雙向鏈表來管理請求同步的Node,保存了鏈表的head和tail,新的Node將會被插到鏈表的尾端,而鏈表的head總是代表著獲得鎖的線程,鏈表頭的線程釋放了鎖之后會通知后面的線程來競爭共享變量。下面分析一下AbstractQueuedSynchronizer是如何實現獨占模式下的acquire和release的。

首先,使用方法acquire(int)可以競爭同步變量,下面是調用鏈路:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
 
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}
 
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

首先會調用方法tryAcquire來嘗試獲的鎖,而tryAcquire這個方法是需要子類來實現的,子類的實現無非就是通過compareAndSetState、getState、setState三個方法來操作同步變量state,子類的方法實現需要根據各自的需求場景來實現。繼續分析上面的acquire流程,如果tryAcquire返回true了,也就是成功改變了state的值了,也就是獲得了同步鎖了,那么就可以退出了。如果返回false,說明有其他的線程獲得鎖了,這個時候AbstractQueuedSynchronizer會使用addWaiter將當前線程添加到等待隊列的尾部等待再次競爭。需要注意的是將當前線程標記為了獨占模式。然后重頭戲來了,方法acquireQueued使得新添加的Node在一個for死循環中不斷的輪詢,也就是自旋,acquireQueued方法退出的條件是:

  1. 該節點的前驅節點是頭結點,頭結點代表的是獲得鎖的節點,只有它釋放了state其他線程才能獲得這個變量的所有權
  2. 在條件1的前提下,方法tryAcquire返回true,也就是可以獲得同步資源state

滿足上面兩個條件之后,這個Node就會獲得鎖,根據AbstractQueuedSynchronizer的規定,獲得鎖的Node必須是鏈表的頭結點,所以,需要將當前節點設定為頭結點。那如果不符合上面兩個條件的Node會怎么樣呢?看for循環里面的第二個分支,首先是shouldParkAfterFailedAcquire方法,看名字應該是說判斷是否應該park當前該線程,然后是方法parkAndCheckInterrupt,這個方法是在shouldParkAfterFailedAcquire返回true的前提之下才會之下,意思就是首先判斷一下是否需要park該Node,如果需要,那么就park它。關于線程的park和unpark,AbstractQueuedSynchronizer使用了偏向底層的技術來實現,在此先不做分析。現在來分析一下再什么情況下Node會被park(block):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    /*
     * This node has already set status asking a release
     * to signal it, so it can safely park.
     */
    return true;
  if (ws > 0) {
    /*
     * Predecessor was cancelled. Skip over predecessors and
     * indicate retry.
     */
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    /*
     * waitStatus must be 0 or PROPAGATE. Indicate that we
     * need a signal, but don't park yet. Caller will need to
     * retry to make sure it cannot acquire before parking.
     */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

可以發現,只有當Node的前驅節點的狀態為Node.SIGNAL的時候才會返回true,也就是說,只有當前驅節點的狀態變為了Node.SIGNAL,才會去通知當前節點,所以如果前驅節點是Node.SIGNAL的,那么當前節點就可以放心的park就好了,前驅節點在完成工作之后在釋放資源的時候會unpark它的后繼節點。下面看一下release的過程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
 
private void unparkSuccessor(Node node) {
  /*
   * If status is negative (i.e., possibly needing signal) try
   * to clear in anticipation of signalling. It is OK if this
   * fails or if status is changed by waiting thread.
   */
  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
 
  /*
   * Thread to unpark is held in successor, which is normally
   * just the next node. But if cancelled or apparently null,
   * traverse backwards from tail to find the actual
   * non-cancelled successor.
   */
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);

首先通過tryRelease方法來保證資源安全完整的釋放了之后,如果發現節點的狀態小于0,會變為0。0代表的是初始化的狀態,當前的線程已經完成了工作,釋放了鎖,就要恢復原來的樣子。然后會獲取該節點的后繼節點,如果沒有后續節點了,或者后繼節點已經被取消了,那么從尾部開始向前找第一個符合要求的節點,然后unpark它。

上面介紹了一對acquire-release,如果希望線程可以在競爭的時候被中斷,可以使用acquireInterruptibly。如果希望加上獲取鎖的時間限制,可以使用tryAcquireNanos(int, long)方法來獲取。

共享模式

和獨占模式一樣,分析一下acquireShared的過程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
}
 
private void doAcquireShared(int arg) {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }

獲取鎖的流程如下:

  1. 嘗試使用tryAcquireShared方法,如果返回值大于等于0則表示成功,否則運行doAcquireShared方法
  2. 將當前競爭同步的線程添加到鏈表尾部,然后自旋
  3. 獲取前驅節點,如果前驅節點是頭節點,也就是說前驅節點現在持有鎖,那么繼續運行4,否則park該節點等待被unpark
  4. 使用tryAcquireShared方法來競爭,如果返回值大于等于0,那么就算是獲取成功了,否則繼續自旋嘗試

共享模式下的release流程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}
 
private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }
    if (h == head)          // loop if head changed
      break;
  }
 
private void unparkSuccessor(Node node) {
  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
 
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);
}

首先嘗試使用tryReleaseShared方法來釋放資源,如果釋放失敗,則返回false,如果釋放成功了,那么繼續執行doReleaseShared方法喚醒后續節點來競爭資源。需要注意的是,共享模式和獨占模式的區別在于,獨占模式只允許一個線程獲得資源,而共享模式允許多個線程獲得資源。所以在獨占模式下只有當tryAcquire返回true的時候我們才能確定獲得資源了,而在共享模式下,只要tryAcquireShared返回值大于等于0就可以說明獲得資源了,所以你要確保你需要實現的需求和AbstractQueuedSynchronizer希望的是一致的。

桶獨占模式一樣,共享模式也有其他的兩種API:

  • acquireSharedInterruptibly:支持相應中斷的資源競爭
  • tryAcquireSharedNanos:可以設定時間的資源競爭

本文大概描述了AbstractQueuedSynchronizer框架的一些基本情況,具體的細節沒有深究,但是AbstractQueuedSynchronizer作為Java中鎖實現的底層支撐,需要好好研究一下,后續會基于AbstractQueuedSynchronizer來分析java中各種鎖的實現。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.jianshu.com/p/853b203a8d93

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 福利一区福利二区 | 调教女高中生第3部分 | 免费精品在线视频 | 国产精品久久久久久搜索 | 奇米影视999| gay台湾无套男同志可播放 | 欧美一级视频在线 | 99ri精品 | 精品精品久久宅男的天堂 | 欧美成人中文字幕 | 热久久免费视频 | 日本在线亚州精品视频在线 | 91四虎国自产在线播放线 | 和两个男人玩3p好爽视频 | 好男人资源免费播放在线观看 | 国产老熟 | 四虎影视4hu最新地址在线884 | 精品国产一区二区三区在线 | 魔兽官方小说 | 变态 另类 人妖小说 | 欧美日韩综合一区 | 日本无遮挡亲吻膜下面免费 | 特a级片 | 欧美日韩精 | 亚洲一区二区三区深夜天堂 | 久久久久青草大香线综合精品 | 亚洲一级特黄 | 校花被老头夺去第一次动图 | 亚洲国产香蕉视频欧美 | 国产福利一区二区三区 | 精品视频国产 | 亚洲国产一区二区三区a毛片 | 免费观看的毛片 | 国产精品福利短视在线播放频 | 亚洲欧美国产精品久久久 | 亚洲乱亚洲乱妇41p 亚洲乱码一区二区三区国产精品 | 日本道色综合久久影院 | 超级碰碰青草免费视频92 | 成人男女啪啪免费观看网站 | 国产主播福利在线观看 | 高清国产在线观看 |