實現生產者消費者的四種方式
一、最基礎的
利用 wait() 和 notify() 方法實現,當緩沖區滿或為空時都調用 wait() 方法等待,當生產者生產了一個產品或消費者消費了一個產品后會喚醒所有線程;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package com.practice; public class testMain { private static Integer count = 0 ; private static final Integer FULL = 10 ; private static String LOCK = "lock" ; public static void main(String[] args) { testMain testMain = new testMain(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i++) { try { Thread.sleep( 3000 ); } catch (Exception e){ e.printStackTrace(); } synchronized (LOCK){ while (count == FULL){ //緩存空間滿了 try { LOCK.wait(); //線程阻塞 } catch (Exception e){ e.printStackTrace(); } } count++; //生產者 System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" +count); LOCK.notifyAll(); //喚醒所有線程 } } } } class Consumer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i++) { try { Thread.sleep( 3000 ); } catch (InterruptedException e){ e.printStackTrace(); } synchronized (LOCK){ while (count == 0 ){ try { LOCK.wait(); } catch (Exception e){ } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有 " +count); LOCK.notifyAll(); //喚醒所有線程 } } } } } |
二、java.util.concurrent.lock 中的 Lock 框架
通過對 lock 的 lock() 方法和 unlock() 方法實現對鎖的顯示控制,而 synchronize()
則是對鎖的隱形控制,可重入鎖也叫做遞歸鎖,指的是同一個線程外層函數獲得鎖之后,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響;
簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個線程再次得到鎖,那么獲計數器就加1,函數調用結束計數器就減1,然后鎖需要釋放兩次才能獲得真正釋放,已經獲取鎖的線程進入其他需要相同鎖的同步代碼塊不會被阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private static Integer count = 0 ; private static Integer FULL = 10 ; / / 創建一個鎖對象 private Lock lock = new ReentrantLock(); / / 創建兩個條件變量,一個為緩沖非滿,一個緩沖區非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args){ ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i + + ) { try { Thread.sleep( 3000 ); } catch (Exception e) { e.printStackTrace(); } / / 獲取鎖 lock.lock(); try { while (count = = FULL) { try { notFull.await(); }catch(InterruptedException e){ e.printStackTrace(); } } count + + ; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); } finally { lock.unlock(); } } } } class Consumer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i + + ) { try { Thread.sleep( 3000 ); } catch (Exception e){ e.printStackTrace(); } lock.lock(); try { while (count = = 0 ){ try { notEmpty.await(); }catch (InterruptedException e){ e.printStackTrace(); } } count - - ; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有 " + count); } finally { lock.unlock(); / / 解鎖 } } } } } |
三、阻塞隊列BlockingQueue的實現
被阻塞的情況主要分為如下兩種,BlockingQueue 是線程安全的
1,當隊列滿了的時候進行入隊操作;
2,當隊列空的時候進行出隊操作
Blockqueue 接口的一些方法
四類方法分別對應于:
1,ThrowsException,如果操作不能馬上進行,則拋出異常;
2,SpecialValue 如果操作不能馬上進行,將會返回一個特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定時間未執行返回一個特殊值 true 或 false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 實現生產者消費模型 */ public class BlockQueueTest { public static Integer count = 0 ; //創建一個阻塞隊列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( 10 ); public static void main(String[] args) { BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); new Thread(testMain. new Producer()).start(); new Thread(testMain. new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i++) { try { Thread.sleep( 3000 ); } catch (Exception e){ e.printStackTrace(); } try { blockingQueue.put( 1 ); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有 " + count); } catch (InterruptedException e){ e.printStackTrace(); } } } } class Consumer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i++) { try { Thread.sleep( 3000 ); } catch (InterruptedException e){ e.printStackTrace(); } try { blockingQueue.take(); //消費 count--; System.out.println(Thread.currentThread().getName() + " 消費者消費,目前總共有 " + count); } catch (InterruptedException e){ e.printStackTrace(); } } } } } |
四、信號量 Semaphore 的實現
Semaphore (信號量) 用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Java中的 Semaphone 維護了一個許可集,一開始設定這個許可集的數量,使用 acquire()
方法獲得一個許可,當許可不足時會被阻塞,release()
添加一個許可。
下面代碼中,還加入了 mutex
信號量,維護消費者和生產者之間的同步關系,保證生產者消費者之間的交替進行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Integer count = 0 ; / / 創建三個信號量 final Semaphore notFull = new Semaphore( 10 ); final Semaphore notEmpty = new Semaphore( 0 ); final Semaphore mutex = new Semaphore( 1 ); / / 互斥鎖,控制共享數據的互斥訪問 public static void main(String[] args) { SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i + + ) { try { Thread.sleep( 3000 ); }catch (InterruptedException e){ e.printStackTrace(); } try { notFull.acquire(); / / 獲取一個信號量 mutex.acquire(); count + + ; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有 " + count); } catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release(); / / 添加 notEmpty.release(); } } } } class Consumer implements Runnable{ @Override public void run(){ for ( int i = 0 ; i < 10 ; i + + ) { try { Thread.sleep( 3000 ); }catch(InterruptedException e){ e.printStackTrace(); } try { notEmpty.acquire(); mutex.acquire(); count - - ; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); }catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } } |
Reference
https://juejin.cn/post/6844903486895865864#comment
以上就是Java編程生產者消費者實現的四種方法的詳細內容,更多關于java實現生產消費者的資料請關注服務器之家其它相關文章!
原文鏈接:https://blog.csdn.net/weixin_42512684/article/details/118151060