前言

ArrayBlockingQueue 由數組支持的有界阻塞隊列,隊列基于數組實現,容量大小在創建 ArrayBlockingQueue 對象時已經定義好。 此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖,默認采用非公平鎖。其數據結構如下圖:

- 注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程和請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖
隊列創建
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
應用場景
在線程池中有比較多的應用,生產者消費者場景。
- 先進先出隊列(隊列頭的是最先進隊的元素;隊列尾的是最后進隊的元素)
- 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現擴容,容量滿,則阻塞進隊操作;容量空,則阻塞出隊操作)
- 隊列不支持空元素
- 公平性 (fairness)可以在構造函數中指定。
此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過在構造函數將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
工作原理
ArrayBlockingQueue是對BlockingQueue的一個數組實現,它使用一把全局的鎖并行對queue的讀寫操作,同時使用兩個Condition阻塞容量為空時的取操作和容量滿時的寫操作。
基于 ReentrantLock 保證線程安全,根據 Condition 實現隊列滿時的阻塞。
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細地對鎖進行控制,它依賴于Lock,通過某個條件對多線程進行控制。
notEmpty表示“鎖的非空條件”。當某線程想從隊列中取數據時,而此時又沒有數據,則該線程通過notEmpty.await()進行等待;當其它線程向隊列中插入了元素之后,就調用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態的線程”。 同理,notFull表示“鎖的滿條件”。當某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當其它線程從隊列中取出元素之后,就喚醒該等待的線程。
- 試圖向已滿隊列中放入元素會導致放入操作受阻塞,直到BlockingQueue里有新的喚空間才會被醒繼續操作; 試圖從空隊列中檢索元素將導致類似阻塞,直到BlocingkQueue進了新貨才會被喚醒。
源碼分析
以下源碼分析基于JDK1.8
定義
ArrayBlockingQueue的類繼承關系如下:

其包含的方法定義如下:

成員屬性
/** 真正存入數據的數組 */
final Object[] items;
/** take,poll,peek or remove 的下一個索引 */
int takeIndex;
/** put,offer,or add 下一個索引 */
int putIndex;
/** 隊列中元素個數 */
int count;
/** 可重入鎖 */
final ReentrantLock lock;
/** 如果數組是空的,在該Condition上等待 */
private final Condition notEmpty;
/** 如果數組是滿的,在該Condition上等待 */
private final Condition notFull;
/** 遍歷器實現 */
transient Itrs itrs = null;
構造函數
/**
* 構造函數,設置隊列的初始容量
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 構造函數,
* capacity and the specified access policy.
*
* @param capacity 設置數組大小
* @param fair 設置是否為公平鎖
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 是否為公平鎖,如果是的話,那么先到的線程先獲得鎖對象
// 否則,由操作系統調度由哪個線程獲得鎖,一般為false,性能會比較高
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 構造函數,帶有初始內容的隊列
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
//加鎖的目的是為了其他CPU能夠立即看到修改
//加鎖和解鎖底層都是CAS,會強制修改寫回主存,對其他CPU可見
lock.lock(); // 要給數組設置內容,先上鎖
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e; // 依次拷貝內容
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i; // 如果 putIndex大于數組大小,那么從0重寫開始
} finally {
lock.unlock(); // 最后一定要釋放鎖
}
}
入隊方法
add / offer / put,這三個方法都是往隊列中添加元素,說明如下:
- add方法依賴于offer方法,如果隊列滿了則拋出異常,否則添加成功返回true;
- offer方法有兩個重載版本,只有一個參數的版本,如果隊列滿了就返回false,否則加入到隊列中,返回true,add方法就是調用此版本的offer方法;另一個帶時間參數的版本,如果隊列滿了則等待,可指定等待的時間,如果這期間中斷了則拋出異常,如果等待超時了則返回false,否則加入到隊列中返回true;
- put方法跟帶時間參數的offer方法邏輯一樣,不過沒有等待的時間限制,會一直等待直到隊列有空余位置了,再插入到隊列中,返回true
/**
* 添加一個元素,其實super.add里面調用了offer方法
*/
public boolean add(E e) {
return super.add(e);
}
/**
* 加入成功返回 true,否則返回 false
*/
public boolean offer(E e) {
// 創建插入的元素是否為null,是的話拋出NullPointerException異常
checkNotNull(e);
// 獲取“該阻塞隊列的獨占鎖”
final ReentrantLock lock = this.lock;
lock.lock(); // 上鎖
try {
// 如果隊列已滿,則返回false。
if (count == items.length) // 超過數組的容量
return false;
else {
// 如果隊列未滿,則插入e,并返回true。
enqueue(e);
return true;
}
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 如果隊列已滿的話,就會等待
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //和lock方法的區別是讓它在阻塞時可以拋出異常跳出
try {
while (count == items.length)
notFull.await(); // 這里就是阻塞了,要注意:如果運行到這里,那么它會釋放上面的鎖,一直等到 notify
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* 帶有超時事件的插入方法,unit 表示是按秒、分、時哪一種
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); // 帶有超時等待的阻塞方法
}
enqueue(e); // 入隊
return true;
} finally {
lock.unlock();
}
}
出隊方法
poll / take / peek,這幾個方法都是獲取隊列頂的元素,具體說明如下:
- poll方法有兩個重載版本,第一個版本,如果隊列是空的,返回null,否則移除并返回隊列頭部元素;另一個帶時間參數的版本,如果棧為空則等待,可以指定等待的時間,如果等待超時了則返回null,如果被中斷了則拋出異常,否則移除并返回棧頂元素
- take方法同帶時間參數的poll方法,但是不能指定等待時間,會一直等待直到隊列中有元素為止,然后移除并返回棧頂元素
- peek方法只是返回隊列頭部元素,不移除
// 實現的方法,如果當前隊列為空,返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 實現的方法,如果當前隊列為空,一直阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 隊列為空,阻塞方法
return dequeue();
} finally {
lock.unlock();
}
}
// 帶有超時事件的取元素方法,否則返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos); // 超時等待
}
return dequeue(); // 取得元素
} finally {
lock.unlock();
}
}
// 只是看一個隊列最前面的元素,取出是不擅長隊列中原來的元素,隊列為空時返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // 隊列為空時返回null
} finally {
lock.unlock();
}
}
刪除元素方法
remove / clear /drainT,這三個方法用于從隊列中移除元素,具體說明如下:
- remove方法用于移除某個元素,如果棧為空或者沒有找到該元素則返回false,否則從棧中移除該元素;移除時,如果該元素位于棧頂則直接移除,如果位于棧中間,則需要將該元素后面的其他元素往前面挪動,移除后需要喚醒因為棧滿了而阻塞的線程
- clear方法用于整個棧,同時將takeIndex置為putIndex,保證棧中的元素先進先出;最后會喚醒最多count個線程,因為正常一個線程插入一個元素,如果喚醒超過count個線程,可能導致部分線程因為棧滿了又再次被阻塞
- drainTo方法有兩個重載版本,一個是不帶個數,將所有的元素都移除并拷貝到指定的集合中;一個帶個數,將指定個數的元素移除并拷貝到指定的集合中,兩者的底層實現都是同一個方法。移除后需要重置takeIndex和count,并喚醒最多移除個數的因為棧滿而阻塞的線程。
/**
* 從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//從takeIndex開始往后遍歷直到等于putIndex
do {
if (o.equals(items[i])) {
removeAt(i); // 真正刪除的方法
return true;
}
//走到數組末尾了又從頭開始,put時也按照這個規則來
if (++i == items.length)
i = 0;
} while (i != putIndex); // 一直不斷的循環取出來做判斷
}
//如果數組為空,返回false
return false;
} finally {
lock.unlock();
}
}
/**
* 指定刪除索引上的元素.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
//如果移除的就是棧頂的元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
//元素個數減1
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// 如果移除的是棧中間的某個元素,需要將該元素后面的元素往前挪動
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
//到數組末尾了,從頭開始
if (next == items.length)
next = 0;
if (next != putIndex) {
//將后面一個元素復制到前面來
items[i] = items[next];
i = next;
} else {
//如果下一個元素的索引等于putIndex,說明i就是棧中最后一個元素了,直接將該元素置為null
items[i] = null;
//重置putIndex為i
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
//通知itrs節點移除了
itrs.removedAt(removeIndex);
}
//喚醒因為棧滿了而等待的線程
notFull.signal(); // 有一個元素刪除成功,那肯定隊列不滿
}
/**
* 清空隊列
*/
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//從takeIndex開始遍歷直到i等于putIndex,將數組元素置為null
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
//注意此處沒有將這兩個index置為0,只是讓他們相等,因為只要相等就可以實現棧先進先出了
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
//如果有因為棧滿了而等待的線程,則將其喚醒
//注意這里沒有使用signalAll而是通過for循環來signal多次,單純從喚醒線程來看是可以使用signalAll的,效果跟這里的for循環是一樣的
//如果有等待的線程,說明count就是當前線程的最大容量了,這里清空了,最多只能put count次,一個線程只能put 1次,只喚醒最多count個線程就避免了
//線程被喚醒后再次因為棧滿了而阻塞
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
/**
* 取出所有元素到集合
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
/**
* 取出所有元素到集合
*/
public int drainTo(Collection<? super E> c, int maxElements) {
//校驗參數合法
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取兩者之間的最小值
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
//從takeIndex開始遍歷,取出元素然后添加到c中,直到滿足個數要求為止
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
//取完了,修改count減去i
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
//通知itrs 棧空了
itrs.queueIsEmpty();
else if (i > take)
//說明take中間變成0了,通知itrs
itrs.takeIndexWrapped();
}
//喚醒在因為棧滿而等待的線程,最多喚醒i個,同上避免線程被喚醒了因為棧又滿了而阻塞
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
iterator / Itr / Itrs
Itr和Itrs都是ArrayBlockingQueue的兩個內部類,如下:

iterator方法返回一個迭代器實例,用于實現for循環遍歷和部分Collection接口,該方法的實現如下:
public Iterator<E> iterator() {
return new Itr();
}
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (count == 0) {
//NONE和DETACHED都是常量
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
//初始化各屬性
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
//初始化Itrs,將當前線程注冊到Itrs
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}
Itrs(Itr initial) {
register(initial);
}
//根據index計算cursor
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
/**
* 創建一個新的Itr實例時,會調用此方法將該實例添加到Node鏈表中
*/
void register(Itr itr) {
//創建一個新節點將其插入到head節點的前面
head = new Node(itr, head);
}
小結
ArrayBlockingQueue是一個阻塞隊列,內部由ReentrantLock來實現線程安全,由Condition的await和signal來實現等待喚醒的功能。它的數據結構是數組,準確地說是一個循環數組(可以類比一個圓環),所有的下標在到達最大長度時自動從0繼續開始。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git