1、什么是阻塞隊列?
隊列是一種數據結構,它有兩個基本操作:在隊列尾部加入一個元素,從隊列頭部移除一個元素。阻塞隊里與普通的隊列的區別在于,普通隊列不會對當前線程產生阻塞,在面對類似消費者-生產者模型時,就必須額外的實現同步策略以及線程間喚醒策略。使用阻塞隊列,就會對當前線程產生阻塞,當隊列是空時,從隊列中獲取元素的操作將會被阻塞,當隊列是滿時,往隊列里添加元素的操作也會被阻塞。
2、主要的阻塞隊列及其方法
java.util.concurrent包下提供主要的幾種阻塞隊列,主要有以下幾個:
1)ArrayBlockingQueue:基于數組實現的阻塞隊列,在創建ArrayBlockingQueue對象時必須指定其容量大小,還可以指定訪問策略,默認情況下為非公平的,即不保證等待時間最長的線程最優先能夠訪問隊列。
2)、LinkedBlockingQueue:基于鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
3)、以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
4)、DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
阻塞隊列包括了非阻塞隊列中的大部分方法,還提供另外若干非常有用的方法:
put方法用來向隊尾存入元素,如果隊列滿,則等待;
take方法用來從隊首取元素,如果隊列為空,則等待;
offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;
poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;
下面看一段代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import java.util.concurrent.ArrayBlockingQueue; /** * @author 作者:徐劍 E-mail:[email protected] * @version 創建時間:2016年3月20日 下午12:52:53 * 類說明 */ public class BlockingQueue { public static void main(String[] args) throws InterruptedException { java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>( 5 ); for ( int i = 0 ; i < 10 ; i++) { // 將指定元素添加到此隊列中 blockingQueue.put( "加入元素" + i); System.out.println( "向阻塞隊列中添加了元素:" + i); } System.out.println( "程序到此運行結束,即將退出----" ); } } |
當限制阻塞隊列數量為5時,添加了5個元素之后,繼續添加將會隊列外阻塞等待,此時程序并未終止。
當隊列滿了之后,我們將隊首元素移除,則可以繼續向阻塞隊列中添加元素,代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class BlockingQueue { public static void main(String[] args) throws InterruptedException { java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>( 5 ); for ( int i = 0 ; i < 10 ; i++) { // 將指定元素添加到此隊列中 blockingQueue.put( "加入元素" + i); System.out.println( "向阻塞隊列中添加了元素:" + i); if (i>= 4 ) System.out.println( "移除隊首元素" +blockingQueue.take()); } System.out.println( "程序到此運行結束,即將退出----" ); } |
執行結果如下:
3、阻塞隊列的實現原理
下面主要看一下ArrayBlockingQueue的實現原理。
首先看一下ArrayBlockingQueue類的成員變量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 底層存儲結構-數組 */ final Object[] items; /** 隊首元素下標 */ int takeIndex; /** 隊尾元素下標 */ int putIndex; /**隊列元素總數 */ int count; /** 重入鎖 */ final ReentrantLock lock; /** notEmpty等待條件 */ private final Condition notEmpty; /** notFull等待條件 */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null ; |
可以看到,ArrayBlockingQueue用來存儲元素的實際上是一個數組。
再看下ArrayBlockingQueue兩個重要方法的實現,put()和take():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public void put(E e) throws InterruptedException { //先檢查e是否為空 checkNotNull(e); //獲取鎖 final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { //當隊列已滿,進入條件等待 while (count == items.length) notFull.await(); //隊列不滿,進行入隊列操作 enqueue(e); } finally { //釋放鎖 lock.unlock(); } } |
再看下具體的入隊操作:
1
2
3
4
5
6
7
8
9
10
11
12
|
private void enqueue(E x) { final Object[] items = this .items; //隊尾入隊 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; //隊列總數+1 count++; //notempty條件的等待集中隨機選擇一個線程,解除其阻塞狀態 notEmpty.signal(); } |
下面是take()方法的源代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public E take() throws InterruptedException { //獲取鎖 final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { //隊列為空 while (count == 0 ) //線程加入notEmpty條件等待集 notEmpty.await(); //非空,出隊列 return dequeue(); } finally { //釋放鎖 lock.unlock(); } } |
4、阻塞隊列的應用:實現消費者-生產者模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
/** * @author 作者:徐劍 E-mail:[email protected] * @version 創建時間:2016年3月20日 下午2:21:55 * 類說明:阻塞隊列實現的消費者-生產者模式 */ public class Test { private int queueSize = 10 ; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test. new Producer(); Consumer consumer = test. new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while ( true ) { try { queue.take(); System.out.println( "從隊列取走一個元素,隊列剩余" + queue.size() + "個元素" ); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while ( true ) { try { queue.put( 1 ); System.out.println( "向隊列取中插入一個元素,隊列剩余空間:" + (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } } |
以上就是本文的全部內容,希望對大家的學習有所幫助。