使用場景
由于公司業(yè)務(wù)需求,需要對接socket、MQTT等消息隊(duì)列。
眾所周知 socket 是雙向通信,socket的回復(fù)是人為定義的,客戶端推送消息給服務(wù)端,服務(wù)端的回復(fù)是兩條線。無法像http請求有回復(fù)。
下發(fā)指令給硬件時(shí),需要校驗(yàn)此次數(shù)據(jù)下發(fā)是否成功。
用戶體驗(yàn)而言,點(diǎn)擊按鈕就要知道此次的下發(fā)成功或失敗。
如上圖模型,
第一種方案使用Tread.sleep
優(yōu)點(diǎn):占用資源小,放棄當(dāng)前cpu資源
缺點(diǎn): 回復(fù)速度快,休眠時(shí)間過長,仍然需要等待休眠結(jié)束才能返回,響應(yīng)速度是固定的,無法及時(shí)響應(yīng)第二種方案使用CountDownLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package com.lzy.demo.delay; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CountDownLatchPool { //countDonw池 private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>(); //延遲隊(duì)列 private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>(); private volatile static boolean flag = false ; //單線程池 private final static ExecutorService t = new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>( 1 )); public static void addCountDownLatch(Integer messageId) { CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId, new CountDownLatch( 1 ) ); if (countDownLatch == null ){ countDownLatch = countDownLatchMap.get(messageId); } try { addDelayQueue(messageId); countDownLatch.await(3L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "阻塞等待結(jié)束~~~~~~" ); } public static void removeCountDownLatch(Integer messageId){ CountDownLatch countDownLatch = countDownLatchMap.get(messageId); if (countDownLatch == null ) return ; countDownLatch.countDown(); countDownLatchMap.remove(messageId); System.out.println( "清除Map數(shù)據(jù)" +countDownLatchMap); } private static void addDelayQueue(Integer messageId){ delayQueue.add( new MessageDelayQueueUtil(messageId)); clearMessageId(); } private static void clearMessageId(){ synchronized (CountDownLatchPool. class ){ if (flag){ return ; } flag = true ; } t.execute(()->{ while (delayQueue.size() > 0 ){ System.out.println( "進(jìn)入線程并開始執(zhí)行" ); try { MessageDelayQueueUtil take = delayQueue.take(); Integer messageId1 = take.getMessageId(); removeCountDownLatch(messageId1); System.out.println( "清除隊(duì)列數(shù)據(jù)" +messageId1); } catch (InterruptedException e) { e.printStackTrace(); } } flag = false ; System.out.println( "結(jié)束end----" ); }); } public static void main(String[] args) throws InterruptedException { /* 測試超時(shí)清空map new Thread(()->addCountDownLatch(1)).start(); new Thread(()->addCountDownLatch(2)).start(); new Thread(()->addCountDownLatch(3)).start(); */ //提前創(chuàng)建線程,清空countdown new Thread(()->{ try { Thread.sleep(500L); removeCountDownLatch( 1 ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //開始阻塞 addCountDownLatch( 1 ); //通過調(diào)整上面的sleep我們發(fā)現(xiàn)阻塞市場取決于countDownLatch.countDown()執(zhí)行時(shí)間 System.out.println( "阻塞結(jié)束----" ); } } class MessageDelayQueueUtil implements Delayed { private Integer messageId; private long avaibleTime; public Integer getMessageId() { return messageId; } public void setMessageId(Integer messageId) { this .messageId = messageId; } public long getAvaibleTime() { return avaibleTime; } public void setAvaibleTime( long avaibleTime) { this .avaibleTime = avaibleTime; } public MessageDelayQueueUtil(Integer messageId){ this .messageId = messageId; //avaibleTime = 當(dāng)前時(shí)間+ delayTime //重試3次,每次3秒+1秒的延遲 this .avaibleTime= 3000 * 3 + 1000 + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { long diffTime= avaibleTime- System.currentTimeMillis(); return unit.convert(diffTime,TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { //compareTo用在DelayedUser的排序 return ( int )( this .avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime()); } } |
由于socket并不確定每次都會有數(shù)據(jù)返回,所以map的數(shù)據(jù)會越來越大,最終導(dǎo)致內(nèi)存溢出
需定時(shí)清除map內(nèi)的無效數(shù)據(jù)。
可以使用DelayedQuene延遲隊(duì)列來處理,相當(dāng)于給對象添加一個(gè)過期時(shí)間
使用方法 addCountDownLatch 等待消息,異步回調(diào)消息清空removeCountDownLatch
到此這篇關(guān)于詳解Java中CountDownLatch異步轉(zhuǎn)同步工具類的文章就介紹到這了,更多相關(guān)CountDownLatch異步轉(zhuǎn)同步工具類內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://blog.csdn.net/qq_37256345/article/details/117808156