背景
最近公司的客戶要求,分配給員工的任務除了有微信通知外,還希望pc端的網頁也能實時收到通知。管理員分配任務是在我們的系統a,而員工接受任務是在系統b。兩個系統都是現在已投入使用的系統。
技術選型
根據需求我們最終選用springaop+rabbitmq+websocket。
springaop可以讓我們不修改原有代碼,直接將原有service作為切點,加入切面。rabbitmq可以讓a系統和b系統解耦。websocket則可以達到實時通知的要求。
springaop
aop稱為面向切面編程,在程序開發中主要用來解決一些系統層面上的問題,比如日志,事務,權限等待。是spring的核心模塊,底層是通過動態代理來實現(動態代理將在之后的文章重點介紹)。
基本概念
aspect(切面):通常是一個類,里面可以定義切入點和通知。
jointpoint(連接點):程序執行過程中明確的點,一般是方法的調用。
advice(通知):aop在特定的切入點上執行的增強處理,有before,after,afterreturning,afterthrowing,around。
pointcut(切入點):就是帶有通知的連接點,在程序中主要體現為書寫切入點表達式。
通知類型
before:在目標方法被調用之前做增強處理。
@before只需要指定切入點表達式即可
afterreturning:在目標方法正常完成后做增強。
@afterreturning除了指定切入點表達式后,還可以指定一個返回值形參名returning,代表目標方法的返回值
afterthrowing:主要用來處理程序中未處理的異常。
@afterthrowing除了指定切入點表達式后,還可以指定一個throwing的返回值形參名,可以通過該形參名
來訪問目標方法中所拋出的異常對象
after:在目標方法完成之后做增強,無論目標方法時候成功完成。
@after可以指定一個切入點表達式
around:環繞通知,在目標方法完成前后做增強處理,環繞通知是最重要的通知類型,像事務,日志等都是環繞通知,注意編程中核心是一個proceedingjoinpoint。
rabbitmq
從圖中我們可以看到rabbitmq主要的結構有:routing、binding、exchange、queue。
queue
queue(隊列)rabbitmq的作用是存儲消息,隊列的特性是先進先出。
exchange
生產者產生的消息并不是直接發送給消息隊列queue的,而是要經過exchange(交換器),由exchange再將消息路由到一個或多個queue,還會將不符合路由規則的消息丟棄。
routing
用于標記或生產者尋找exchange。
binding
用于exchange和queue做關聯。
exchange type fanout
fanout類型的exchange路由規則非常簡單,它會把所有發送到該exchange的消息路由到所有與它綁定的queue中。
direct
direct會把消息路由到那些binding key與routing key完全匹配的queue中。
topic
direct規則是嚴格意義上的匹配,換言之routing key必須與binding key相匹配的時候才將消息傳送給queue,那么topic這個規則就是模糊匹配,可以通過通配符滿足一部分規則就可以傳送。
headers
headers類型的exchange不依賴于routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
websocket
了解websocket必須先知道幾個常用的web通信技術及其區別。
短輪詢
短輪詢的基本思路就是瀏覽器每隔一段時間向瀏覽器發送http請求,服務器端在收到請求后,不論是否有數據更新,都直接進行響應。這種方式實現的即時通信,本質上還是瀏覽器發送請求,服務器接受請求的一個過程,通過讓客戶端不斷的進行請求,使得客戶端能夠模擬實時地收到服務器端的數據的變化。
這種方式的優點是比較簡單,易于理解,實現起來也沒有什么技術難點。缺點是顯而易見的,這種方式由于需要不斷的建立http連接,嚴重浪費了服務器端和客戶端的資源。尤其是在客戶端,距離來說,如果有數量級想對比較大的人同時位于基于短輪詢的應用中,那么每一個用戶的客戶端都會瘋狂的向服務器端發送http請求,而且不會間斷。人數越多,服務器端壓力越大,這是很不合理的。
因此短輪詢不適用于那些同時在線用戶數量比較大,并且很注重性能的web應用。
長輪詢/ comet
comet指的是,當服務器收到客戶端發來的請求后,不會直接進行響應,而是先將這個請求掛起,然后判斷服務器端數據是否有更新。如果有更新,則進行響應,如果一直沒有數據,則到達一定的時間限制(服務器端設置)后關閉連接。
長輪詢和短輪詢比起來,明顯減少了很多不必要的http請求次數,相比之下節約了資源。長輪詢的缺點在于,連接掛起也會導致資源的浪費。
sse
sse是html5新增的功能,全稱為server-sent events。它可以允許服務推送數據到客戶端。sse在本質上就與之前的長輪詢、短輪詢不同,雖然都是基于http協議的,但是輪詢需要客戶端先發送請求。而sse最大的特點就是不需要客戶端發送請求,可以實現只要服務器端數據有更新,就可以馬上發送到客戶端。
sse的優勢很明顯,它不需要建立或保持大量的客戶端發往服務器端的請求,節約了很多資源,提升應用性能。并且sse的實現非常簡單,不需要依賴其他插件。
websocket
websocket是html5定義的一個新協議,與傳統的http協議不同,該協議可以實現服務器與客戶端之間全雙工通信。簡單來說,首先需要在客戶端和服務器端建立起一個連接,這部分需要http。連接一旦建立,客戶端和服務器端就處于平等的地位,可以相互發送數據,不存在請求和響應的區別。
websocket的優點是實現了雙向通信,缺點是服務器端的邏輯非常復雜。現在針對不同的后臺語言有不同的插件可以使用。
四種web即時通信技術比較
從兼容性角度考慮,短輪詢>長輪詢>長連接sse>websocket;
從性能方面考慮,websocket>長連接sse>長輪詢>短輪詢。
實戰
項目使用springboot搭建。rabbitmq的安裝這里不講述。
rabbitmq配置
兩個系統a、b都需要操作rabbitmq,其中a生產消息,b消費消息。故都需要配置。
1、首先引入rabbitmq的dependency:
1
2
3
4
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> |
這個dependency中包含了rabbitmq相關dependency。
2、在項目的配置文件里配置為使用rabbitmq及其參數。
application-pro.yml
1
2
3
4
5
6
7
8
|
#消息隊列 message.queue.type: rabbitmq ## rabbit mq properties rabbitmq: host: localhost port: 5672 username: guest password: guest |
application.properties
1
2
|
#將要使用的隊列名 rabbitmq.websocket.msg.queue=websocket_msg_queue |
3、創建配置文件。隊列的創建交給spring。
rabbitmqconfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
@configuration @enablerabbit public class rabbitmqconfig { @value ( "${rabbitmq.host}" ) private string host; @value ( "${rabbitmq.port}" ) private string port; @value ( "${rabbitmq.username}" ) private string username; @value ( "${rabbitmq.password}" ) private string password; @value ( "${rabbitmq.websocket.msg.queue}" ) private string websocketmsgqueue; @bean public connectionfactory connectionfactory() throws ioexception { cachingconnectionfactory factory = new cachingconnectionfactory(); factory.setusername(username); factory.setpassword(password); // factory.setvirtualhost("test"); factory.sethost(host); factory.setport(integer.valueof(port)); factory.setpublisherconfirms( true ); //設置隊列參數,是否持久化、隊列ttl、隊列消息ttl等 factory.createconnection().createchannel( false ).queuedeclare(websocketmsgqueue, true , false , false , null ); return factory; } @bean public messageconverter messageconverter() { return new jackson2jsonmessageconverter(); } @bean @scope (configurablebeanfactory.scope_prototype) // 必須是prototype類型 public rabbittemplate rabbittemplate() throws ioexception { return new rabbittemplate(connectionfactory()); } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory() throws ioexception { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory()); factory.setconcurrentconsumers( 3 ); factory.setmaxconcurrentconsumers( 10 ); factory.setacknowledgemode(acknowledgemode.manual); return factory; } } |
4、系統b中創建隊列監聽,當隊列有消息時,發送websocket通知。
rabbitmqlistener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@component public class rabbitmqlistener { @autowired private rabbitmqservice mqservice; /** * websocket推送監聽器 * @param socketentity * @param deliverytag * @param channel */ @rabbitlistener (queues = "websocket_msg_queue" ) public void websocketmsglistener( @payload websocketmsgentity socketmsgentity, @header (amqpheaders.delivery_tag) long deliverytag, channel channel) throws ioexception { mqservice.handlewebsocketmsg(socketmsgentity, deliverytag, channel); } } |
rabbitmqservice.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public class rabbitmqservice { @autowired private messagewebsockethandler messagewebsockethandler; /** * @param socketmsgentity * @param deliverytag * @param channel * @throws ioexception */ void handlewebsocketmsg(websocketmsgentity socketmsgentity, long deliverytag, channel channel) throws ioexception { try { messagewebsockethandler.sendmessagetousers(socketmsgentity.tojsonstring(), socketmsgentity.gettouserids()); channel.basicack(deliverytag, false ); } catch (exception e) { channel.basicnack(deliverytag, false , false ); } } } |
websocketmsgentity為mq中傳送的實體。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
public class websocketmsgentity implements serializable { public enum ordertype{ repair( "維修" ), maintain( "保養" ), measure( "計量" ); ordertype(string value){ this .value = value; } string value; public string getvalue() { return value; } } //設備名稱 private string equname; //設備編號 private string equid; //工單類型 private ordertype ordertype; //工單單號 private string orderid; //工單狀態 private string orderstatus; //創建時間 private date createtime; //消息接收人id private list<string> touserids; public string getequname() { return equname; } public void setequname(string equname) { equname = equname; } public string getorderid() { return orderid; } public void setorderid(string orderid) { this .orderid = orderid; } public string getequid() { return equid; } public void setequid(string equid) { equid = equid; } public string getorderstatus() { return orderstatus; } public void setorderstatus(string orderstatus) { this .orderstatus = orderstatus; } public ordertype getordertype() { return ordertype; } public void setordertype(ordertype ordertype) { this .ordertype = ordertype; } public date getcreatetime() { return createtime; } public void setcreatetime(date createtime) { this .createtime = createtime; } public list<string> gettouserids() { return touserids; } public void settouserids(list<string> touserids) { this .touserids = touserids; } public string tojsonstring(){ return json.tojsonstring( this ); } } |
springaop
1、系統a中創建一個切面類datainterceptor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
@aspect @component public class datainterceptor { @autowired private messagequeueservice queueservice; //維修工單切點 @pointcut ( "execution(* com.zhishang.hes.common.service.impl.repairserviceimpl.executeflow(..))" ) private void repairmsg() { } /** * 返回通知,方法執行正常返回時觸發 * * @param joinpoint * @param result */ @afterreturning (value = "repairmsg()" , returning = "result" ) public void afterreturning(joinpoint joinpoint, object result) { //此處可以獲得切點方法名 //string methodname = joinpoint.getsignature().getname(); equipmentrepair equipmentrepair = (equipmentrepair) result; websocketmsgentity websocketmsgentity = this .generaterepairmsgentity(equipmentrepair); if (websocketmsgentity == null ) { return ; } queueservice.send(websocketmsgentity); } /** * 生成發送到mq的維修消息 * * @param equipmentrepair * @return */ private websocketmsgentity generaterepairmsgentity(equipmentrepair equipmentrepair) { websocketmsgentity websocketmsgentity = generaterepairmsgfromtasks(equipmentrepair); return websocketmsgentity; } /** * 從任務中生成消息 * * @param equipmentrepair * @return */ private websocketmsgentity generaterepairmsgfromtasks(equipmentrepair equipmentrepair) { //業務代碼略 } } |
2、發送消息到mq。這里只貼了發送的核心代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class rabbitmessagequeue extends abstractmessagequeue { @value ( "${rabbitmq.websocket.msg.queue}" ) private string websocketmsgqueue; @autowired private rabbittemplate rabbittemplate; @override public void send(websocketmsgentity entity) { //沒有指定exchange,則使用默認名為“”的exchange,binding名與queue名相同 rabbittemplate.convertandsend(websocketmsgqueue, entity); } } |
websocket
1、 系統b中引入websocket服務端dependency
1
2
3
4
5
|
<dependency> <groupid>org.springframework</groupid> <artifactid>spring-websocket</artifactid> <version> 4.3 . 10 .release</version> </dependency> |
2、 配置websocket,添加處理類
websocketconfigurer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@configuration @enablewebsocket public class websocketconfig extends webmvcconfigureradapter implements websocketconfigurer { private static logger logger = loggerfactory.getlogger(websocketconfig. class ); @override public void registerwebsockethandlers(websockethandlerregistry registry) { //配置websocket路徑 registry.addhandler(messagewebsockethandler(), "/msg-websocket" ).addinterceptors( new myhandshakeinterceptor()).setallowedorigins( "*" ); //配置websocket路徑 支持前端使用socketjs registry.addhandler(messagewebsockethandler(), "/sockjs/msg-websocket" ).setallowedorigins( "*" ).addinterceptors( new myhandshakeinterceptor()).withsockjs(); } @bean public messagewebsockethandler messagewebsockethandler() { logger.info( "......創建messagewebsockethandler......" ); return new messagewebsockethandler(); } } |
messagewebsockethandler.java 主要用于websocket連接及消息發送處理。配置中還使用了連接握手時的處理,主要是取用戶登陸信息,這里不多講述。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
public class messagewebsockethandler extends textwebsockethandler { private static logger logger = loggerfactory.getlogger(systemwebsockethandler. class ); private static concurrenthashmap<string, copyonwritearrayset<websocketsession>> users = new concurrenthashmap<>(); @override public void afterconnectionestablished(websocketsession session) throws exception { string userid = session.getattributes().get( "websocket_userid" ).tostring(); logger.info( "......afterconnectionestablished......" ); logger.info( "session.getid:" + session.getid()); logger.info( "session.getlocaladdress:" + session.getlocaladdress().tostring()); logger.info( "userid:" + userid); //websocket連接后記錄連接信息 if (users.keyset().contains(userid)) { copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); websocketsessions.add(session); } else { copyonwritearrayset<websocketsession> websocketsessions = new copyonwritearrayset<>(); websocketsessions.add(session); users.put(userid, websocketsessions); } } @override public void handletransporterror(websocketsession session, throwable throwable) throws exception { removeusersession(session); if (session.isopen()) { session.close(); } logger.info( "異常出現handletransporterror" + throwable.getmessage()); } @override public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception { removeusersession(session); logger.info( "關閉afterconnectionclosed" + closestatus.getreason()); } @override public boolean supportspartialmessages() { return false ; } /** * 給符合要求的在線用戶發送消息 * * @param message */ public void sendmessagetousers(string message, list<string> userids) throws ioexception{ if (stringutils.isempty(message) || collectionutils.isempty(userids)) { return ; } if (users.isempty()) { return ; } for (string userid : userids) { if (!users.keyset().contains(userid)) { continue ; } copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); if (websocketsessions == null ) { continue ; } for (websocketsession websocketsession : websocketsessions) { if (websocketsession.isopen()) { try { websocketsession.sendmessage( new textmessage(message)); } catch (ioexception e) { logger.error( " websocket server send message error " + e.getmessage()); try { throw e; } catch (ioexception e1) { e1.printstacktrace(); } } } } } } /** * websocket清除連接信息 * * @param session */ private void removeusersession(websocketsession session) { string userid = session.getattributes().get( "websocket_userid" ).tostring(); if (users.keyset().contains(userid)) { copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); websocketsessions.remove(session); if (websocketsessions.isempty()) { users.remove(userid); } } } } |
整個功能完成后,a系統分配任務時,系統b登陸用戶收到的消息如圖:
總體流程:
1、對于系統b,每個登陸的用戶都會和服務器建立websocket長連接。
2、系統a生成任務,aop做出響應,將封裝的消息發送給mq。
3、系統b中的mq監聽發現隊列有消息到達,消費消息。
4、系統b通過websocket長連接將消息發給指定的登陸用戶。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/little-sheep/p/9934887.html