ArrayBlockingQueue是常用的線程集合,在線程池中也常常被當做任務隊列來使用。使用頻率特別高。他是維護的是一個循環隊列(基于數組實現),循環結構在數據結構中比較常見,但是在源碼實現中還是比較少見的。
線程安全的實現
線程安全隊列,基本是離不開鎖的。ArrayBlockingQueue使用的是ReentrantLock,配合兩種Condition,實現了集合的線程安全操作。這里稍微說一個好習慣,下面是成員變量的聲明。
1
2
3
4
5
6
7
8
9
|
private static final long serialVersionUID = -817911632652898426L; final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null ; |
賦值的操作基本都是在構造函數里做的。這樣有個好處,代碼執行可控。成員變量的初始化也是會合并在構造方法里執行的,但是在執行順序上需要好好斟酌,如果寫在構造方法里初始化,則沒有相關問題。
阻塞隊列的常用場所就是生產者消費者。一般都是生產者放入,消費者從頭取數據。下面重點說這兩個操作。
這兩個操作都是依靠鎖來保證線程安全的。
生產操作
1
2
3
4
5
6
7
8
9
10
11
12
|
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } |
put等放入操作,首先是獲取鎖,如果發現數據滿了,就通過notFull的condition,來阻塞線程。這里的條件判定一定是用while而不是if,多線程情況下,可以被喚醒后發現又滿了。
1
2
3
4
5
6
7
8
|
private void enqueue(E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); } |
這個是入隊列的操作。首先獲取維護的數組。putindex就是放入操作的標志。這個操作會一直加。達到預定的長度后就變成0從頭開始計數。這樣插入的操作就是一個循環的操作了,count就是用來做計數的,作為能否插入數據的一個標準,插入數據后就通過notEmpty的condition發出一個信號喚醒消費線程。
消費操作
1
2
3
4
5
6
7
8
9
10
11
|
public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } |
消費的方法也是這樣。先獲取鎖,然后進行條件判斷,如果沒有數據,則阻塞線程。注意點和put一樣。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private E dequeue() { final Object[] items = this .items; @SuppressWarnings ( "unchecked" ) E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; } |
取數據的時候,也依靠takeIndex,這是一個標志,這個數值也會一直增加,表示取的第一個數據的位置。如果這個標志走到最后,然后變成0,從頭再來。這樣保證取出的數據都是fifo的順序。刪除的時候如果發現迭代中,則會修改迭代器的遍歷。然后通過notFull的condition來喚醒生產線程。
移除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public boolean remove(Object o) { if (o == null ) return false ; final Object[] items = this .items; final ReentrantLock lock = this .lock; lock.lock(); try { if (count > 0 ) { final int putIndex = this .putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true ; } if (++i == items.length) i = 0 ; } while (i != putIndex); } return false ; } finally { lock.unlock(); } } |
對于remove操作就比較麻煩了,首先獲取鎖之后,把兩個標志位本地化,然后找到要刪除的元素的位置。調用removeAt,這里刪除需要對標志位做改變。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
void removeAt( final int removeIndex) { final Object[] items = this .items; if (removeIndex == takeIndex) { items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); } else { final int putIndex = this .putIndex; for ( int i = removeIndex;;) { int next = i + 1 ; if (next == items.length) next = 0 ; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null ; this .putIndex = i; break ; } } count--; if (itrs != null ) itrs.removedAt(removeIndex); } notFull.signal(); } |
如果刪除的元素是位置和takeindex一樣。那就可以直接刪除,然后讓刪除標志位向后移動。如果不是,則從刪除的位置開始,進行后面向前面的數據覆蓋的操作。直到遇到putindex的前一個位置。然后把那個位置的數據設置為null。并且把putindex的位置往前移動一格,正在迭代的時候要刪除數據并且喚醒生產線程。
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
原文鏈接:https://my.oschina.net/xpbob/blog/830366