websocket在現代瀏覽器中的應用已經算是比較普遍了,在某些業務場景下,要求必須能夠在服務器端推送消息至客戶端。在沒有websocket的年代,我們使用過dwr,在那個時候dwr真實一個非常棒的方案。但是在websocket興起之后,我們更愿意使用標準實現來解決問題、
首先交代一下,本篇文章不講解websocket的配置,主要講的是針對在微服務架構集群模式下解決方案的選擇。
微服務架構大家應該都不陌生了,在微服務架構下,服務是分布式的,而且為了保證業務的可用性,每個服務都是以集群的形式存在。在集群模式下,要保證集群的每一個節點的訪問得到相同的結果就需要做到數據一致性,如緩存、session等。
微服務集群緩存通常使用分布式緩存redis解決,session一致性也通常會通過redis解決,但是現在更流行的是無狀態的http,即無session化,最常見的解決方案就是oauth。
websocket有所不同,它是與服務端建立一個長連接,在集群模式下,顯然不可能把前端與服務集群中的每一個節點建立連接,一個可行的思路是像解決http session的共享一樣,通過redis來實現websocket的session共享,但是websocket session的數量是遠多于http session的數量的(因為每打開一個頁面都會建立一個websocket連接),所以隨著用戶量的增長,共享的數據量太大,很容易造成瓶頸。
另一個思路是,websocket總歸會與集群中某個節點建立連接,那么,只要找到連接所在的節點,就可以向服務端推送消息了,那么要解決的問題就是如何找到一個websocket連接所在的節點。要找到連接在哪個節點上,我們需要一個唯一的標識符用于尋找連接,然而在基于stomp的發布-訂閱模式下,一個消息的推送可能是面向若干個連接的,可能分布在集群中的每一個節點上,這樣去尋找連接的代價也很高。既然這樣,我們不妨換種思路,每一個websocket消息,我們在集群的每個節點上都進行推送,訂閱了該消息的連接,不管有一個還是一萬個,最終肯定都能收到這個消息。基于這個思路,我們做了一些技術選型:
- rabbitmq
- spring cloud stream
首先說rabbitmq,高級消息隊列,可以實現消息廣播(當然kafka一樣可以做到,這里只介紹一種),另一項技術是spring cloud stream,stream是一個用于構建高度可擴展事件驅動型微服務的框架,并且它可以跟rabbitmq、kafka以及其他多種消息服務集成,使用了stream,要把rabbitmq換成kafka只不過是改改配置的事情。接下來重點介紹使用方法:
引入依賴
1
2
3
4
5
6
7
8
|
<dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-stream</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-stream-binder-rabbit</artifactid> </dependency> |
配置binder
binder是stream中的重要概念,是用于配置用于stream發布和訂閱事件的消息中間件。先看一段配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
spring: cloud: stream: binders: defaultrabbit: type: rabbit environment: spring: rabbitmq: host: localhost username: username password: password virtual-host: / |
配置中的 defaultrabbit 是binder的名稱,一會會在其他配置中引用,type指定了消息中間件的類型,environment是對消息中間件的配置,這里的配置結構和spring.rabbitmq命名空間下的配置項一模一樣的,可以參照著進行配置(這樣配置的作用是可以把stream的rabbitmq配置和項目中其他地方使用的rabbitmq區分開,如果這里不配置environment,binder會沿用spring.rabbitmq命名空間下的配置),比如你的項目中的rabbitmq的配置是這樣的:
1
2
3
4
5
6
|
spring: rabbitmq: host: localhost username: username password: password virtual-host: / |
那上門的binder的environment配置完全可以去掉。
消息流與binder的綁定
微服務要接收揮著發布事件消息,根據spring cloud stream的名字,顧名思義,需要使用流,所以需要在配置中聲明兩個事件流,一個輸入流,一個輸出流:
1
2
3
4
5
6
7
8
9
10
|
spring: cloud: stream: bindings: websocketmessagein: destination: websocketmessage binder: defaultrabbit websocketmessageout: destination: websocketmessage binder: defaultrabbit |
這里我們看到,事件流引用了binder,表示這兩個流使用rabbitmq這個中間件(看到這里想必大家已經明白了,在一個項目中完全可以同時使用rabbit和kafka作為事件流的消息中間件)。
websocketmessagein,websocketmessageout是事件流的名字(可以自己隨便起),destination指定了兩個事件流的destination是同一個,這決定了寫入和讀取是指向同一個地方(不一定是同一個消息隊列)。
事件流聲明
事件流使用接口進行定義:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * websocket消息事件流接口 * created by 吳昊 on 18-11-8. * * @author 吳昊 * @since 1.4.3 */ interface websocketmessagestream { companion object { const val input: string = "websocketmessagein" const val output: string = "websocketmessageout" } /** * 輸入 */ @input (input) fun input(): subscribablechannel /** * 輸出 */ @output (output) fun output(): messagechannel } |
聲明事件流接口,這里面定義了兩個常量,分別對應配置中的兩個流名稱,通過調用input()方法獲取輸入流,通過調用output()獲取輸出流。
該接口的實現由spring cloud stream完成,不需要自己實現。
使用事件流
聲明一個bean:
1
2
3
4
|
@component @enablebinding (websocketmessagestream:: class ) class websocketmessageservice { …… |
這里的@enablebinding 注解指明了事件流接口類,只有添加了這個注解(要能被spring識別到,可以加在入口類上,也可以加在@configuration注解的類上),該接口才會被實現,并且加入到spring的容器中(可以注入)。
上面websocketmessageservice的內容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@autowired private lateinit var stream: websocketmessagestream @autowired private lateinit var template: simpmessagingtemplate @streamlistener (websocketmessagestream.input) fun messagereceived(message: websocketmessage) { template.convertandsend(message.destination, message.body) } fun send(destination: string, body: any) { stream.output().send( mutablemessage(websocketmessage(destination, body)) ) } |
接收消息
@streamlistener 注解指明了要監聽的事件流,方法接收的參數即事件的消息內容(使用jackson反序列化),這里的messagereceived方法直接將接收到的消息直接用websocket發送給前端
發送消息
同樣,發送也很簡單,將消息直接發送到輸入流中,上面的send方法即是將原本應該用simpmessagingtemplate發送給websocket的消息發送到spring cloud stream的事件流中。這樣做以后,項目中所有需要向前端推送websocket消息的操作都應該調用send方法來進行。
講到這里大家可能還有點糊涂,也有一些疑問,為什么這樣每個微服務節點就能收到事件消息了?或者單個節點接收事件消息和多個節點接收的配置是怎么控制的。各位不要著急,待我慢慢道來,接下來就要結合rabbit的知識來講解 了:
首先看一下rabbit的消息隊列:

從圖中看到,存在多個以websocketmessage開頭的隊列,這是每一個微服務節點創建了一個消息隊列,再來看exchange:

exchange綁定的消息隊列

這里的exchange名稱和上面消息隊列的名稱前綴均是websocketmessage, 這個都是 由前面的binding配置中的destination指定的,和destination名稱保持一致
當應用向輸入流中寫入事件時,使用destination作為key(即websocketmessage),將消息寫入名為websocketmessage的exchange,由于exchange綁定的消息隊列前綴均為websocketmessage且routing key都是#,所以exchange會將消息路由到每一個websocketmessage開頭的消息隊列上(這里涉及到rabbitmq的知識點,如過不懂請自行查閱資料),這樣每一個微服務都能接收到相同的消息。
我們再來看前面提出的問題,這樣的配置可以把消息推送到每一個微服務節點,那么如果需要一個消息只被一個節點接收,該怎么配置呢?很簡單,一個配置項就可以搞定:
1
2
3
4
5
6
7
8
|
spring: cloud: stream: bindings: websocketmessagein: group: test destination: websocketmessage binder: defaultrabbit |
可以看到,相比前面的配置,僅僅多了一個group的配置,這樣配置之后,rabbitmq會生成一個名為websocketmessage.test的消息隊列(前面講到的每個微服務建立的消息隊列是自動刪除的,即微服務斷開連接后消息隊列就被刪除,而這個消息隊列是持久化的,也就是即使所有的微服務節點全部斷開連接也不會被刪除),所有的微服務節點監聽這一個隊列,當隊列中有消息時,只會被一個節點消費。
要講的內容到此結束,spring cloud stream的配置遠不止這些,但是這些配置已足夠完成我所需要做的事情,其他的配置請參考spring cloud stream官方文檔:
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://juejin.im/post/5c03452bf265da61602cacfe