本文中代碼可以在 github.com/alfred-zhong/wserver 獲取。
背景
最近拿到需求要在網(wǎng)頁上展示報警信息。以往報警信息都是通過短信,微信和 App 推送給用戶的,現(xiàn)在要讓登錄用戶在網(wǎng)頁端也能實時接收到報警推送。
依稀記得以前工作的時候遇到過類似的需求。因為以前的瀏覽器標(biāo)準(zhǔn)比較陳舊,并且那時用 Java 較多,所以那時候解決這個問題就用了 Comet4J。具體的原理就是長輪詢,長鏈接。但現(xiàn)在畢竟 html5 流行開來了,IE 都被 Edge 接替了,再用以前這種技術(shù)就顯得過時。
很早以前就聽過 WebSocket 的大名,但因為那時很多用戶的瀏覽器還不支持,所以對這個技術(shù)也就是淺嘗輒止,沒有太深入研究過。現(xiàn)在趁著項目需要,就來稍微深入了解一下。
websocket 簡介
以往瀏覽器要獲取服務(wù)端數(shù)據(jù),都是通過發(fā)送 HTTP 請求,然后等待服務(wù)端回應(yīng)的。也就是說瀏覽器端一直是整個請求的發(fā)起者,只有它主動,才能獲取到數(shù)據(jù)。而要讓瀏覽器一側(cè)能夠獲取到服務(wù)端的實時數(shù)據(jù),就需要不停地向服務(wù)端發(fā)起請求。雖然大多數(shù)情況下并沒有獲取到實際數(shù)據(jù),但這大大增加了網(wǎng)絡(luò)壓力,對于服務(wù)端來說壓力也直線上升。
后來我們學(xué)會了使用長連接 + 長輪詢的方式。換句話說,也就是延長 HTTP 請求的存在時間,盡量保持 HTTP 連接。雖然這在一定程度上降低了不少壓力,但仍然需要不停地進行輪詢,也做不到真正的實時性。(借用一張圖)
隨著 HTML5 的到來,WebSocket 在 2011 年被定為標(biāo)準(zhǔn)(詳情請參見 RFC 6455)。
借用 《Go Web 編程》的話。WebSocket 采用了一些特殊的報頭,使得瀏覽器和服務(wù)器只需要做一個握手的動作,就可以在瀏覽器和服務(wù)器之間建立一條連接通道。且此連接會保持在活動狀態(tài),你可以使用 JavaScript 來向連接寫入或從中接收數(shù)據(jù),就像在使用一個常規(guī)的 TCP Socket 一樣。它解決了 Web 實時化的問題。
由于 WebSocket 是全雙工通信,所以當(dāng)建立了 WebSocket 連接之后,接下來的通信就類似于傳統(tǒng)的 TCP 通信了。客戶端和服務(wù)端可以相互發(fā)送數(shù)據(jù),不再有實時性的問題。
開發(fā)包的選擇
在 Go 官方的 SDK 中,并不包含對 WebSocket 的支持,所以必須使用第三方庫。
要使用 Golang 開發(fā) WebSocket,選擇基本就在 x/net/websocket 和 gorilla/websocket 之間。《Go Web 編程》一書中的例子使用了 x/net/websocket 作為開發(fā)包,而且貌似它也更加官方且正式。而實際根據(jù)我在網(wǎng)上查詢得到的反饋看來,并非如此。x/net/websocket 貌似 Bug 較多,且較為不穩(wěn)定,問題解決也并不及時。相比之下,gorilla/websocket 則更加優(yōu)秀。
還有對于 Gorilla web toolkit 組織的貢獻,必須予以感謝。🙏。其下不僅有 WebSocket 的實現(xiàn),也有一些其他工具。歡迎大家使用并且能夠給予反饋或貢獻。
推送服務(wù)實現(xiàn)
基本原理
項目初步設(shè)計如下:
server 啟動以后會注冊兩個 Handler。
websocketHandler 用于提供瀏覽器端發(fā)送 Upgrade 請求并升級為 WebSocket 連接。
pushHandler 用于提供外部推送端發(fā)送推送數(shù)據(jù)的請求。
瀏覽器首先連接 websocketHandler (默認地址為 ws://ip:port/ws)升級請求為 WebSocket 連接,當(dāng)連接建立之后需要發(fā)送注冊信息進行注冊。這里注冊信息中包含一個 token 信息。server 會對提供的 token 進行驗證并獲取到相應(yīng)的 userId(通常來說,一個 userId 可能同時關(guān)聯(lián)許多 token),并保存維護好 token, userId 和 conn(連接)之間的關(guān)系。
推送端發(fā)送推送數(shù)據(jù)的請求到 pushHandler(默認地址為 ws://ip:port/push),請求中包含了 userId 字段和 message 字段。server 會根據(jù) userId 獲取到所有此時連接到該 server 的 conn,然后將 message 一一進行推送。
由于推送服務(wù)的實時性,推送的數(shù)據(jù)并沒有也不需要進行緩存。
代碼詳解
我在此處會稍微講述一下代碼的基本構(gòu)成,也順便說說 Go 語言中一些常用的寫法和模式(本人也是從其他語言轉(zhuǎn)向 Go 語言,畢竟 Go 語言也相當(dāng)年輕。所以有建議的話,敬請?zhí)岢觥#S捎?Go 語言的發(fā)明人和一些主要維護者大都來自于 C/C++ 語言,所以 Go 語言的代碼也更偏向于 C/C++ 系。
首先先看一下 Server 的結(jié)構(gòu):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// Server defines parameters for running websocket server. type Server struct { // Address for server to listen on Addr string // Path for websocket request, default "/ws" . WSPath string // Path for push message, default "/push" . PushPath string // Upgrader is for upgrade connection to websocket connection using // "github.com/gorilla/websocket" . // // If Upgrader is nil, default upgrader will be used. Default upgrader is // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always // returns true . Upgrader *websocket.Upgrader // Check token if it's valid and return userID. If token is valid, userID // must be returned and ok should be true . Otherwise ok should be false . AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true , // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler } |
PS: 由于我整個項目的注釋都是用英文寫的,所以見諒了,希望不妨礙閱讀。
這里說一下 Upgrader *websocket.Upgrader,這是 gorilla/websocket 包的對象,它用來升級 HTTP 請求。
如果一個結(jié)構(gòu)體參數(shù)過多,通常不建議直接初始化,而是使用它提供的 New 方法。這里是:
1
2
3
4
5
6
7
8
|
// NewServer creates a new Server. func NewServer(addr string) *Server { return &Server{ Addr: addr, WSPath: serverDefaultWSPath, PushPath: serverDefaultPushPath, } } |
這也是 Go 語言對外提供初始化方法的一種常見用法。
然后 Server 使用 ListenAndServe 方法啟動并監(jiān)聽端口,與 http 包的使用類似:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// ListenAndServe listens on the TCP network address and handle websocket // request. func (s *Server) ListenAndServe() error { b := &binder{ userID2EventConnMap: make (map[string]*[]eventConn), connID2UserIDMap: make (map[string]string), } // websocket request handler wh := websocketHandler{ upgrader: defaultUpgrader, binder: b, } if s.Upgrader != nil { wh.upgrader = s.Upgrader } if s.AuthToken != nil { wh.calcUserIDFunc = s.AuthToken } s.wh = &wh http.Handle(s.WSPath, s.wh) // push request handler ph := pushHandler{ binder: b, } if s.PushAuth != nil { ph.authFunc = s.PushAuth } s.ph = &ph http.Handle(s.PushPath, s.ph) return http.ListenAndServe(s.Addr, nil) } |
這里我們生成了兩個 Handler,分別為 websocketHandler 和 pushHandler。websocketHandler 負責(zé)與瀏覽器建立連接并傳輸數(shù)據(jù),而 pushHandler 則處理推送端的請求。可以看到,這里兩個 Handler 都封裝了一個 binder 對象。這個 binder 用于維護 token <-> userID <-> Conn 的關(guān)系:
1
2
3
4
5
6
7
8
9
|
// binder is defined to store the relation of userID and eventConn type binder struct { mu sync .RWMutex // map stores key: userID and value of related slice of eventConn userID2EventConnMap map[string]*[]eventConn // map stores key: connID and value: userID connID2UserIDMap map[string]string } |
websocketHandler
具體看一下 websocketHandler 的實現(xiàn)。
1
2
3
4
5
6
7
8
9
10
11
12
|
// websocketHandler defines to handle websocket upgrade request. type websocketHandler struct { // upgrader is used to upgrade request. upgrader *websocket.Upgrader // binder stores relations about websocket connection and userID. binder *binder // calcUserIDFunc defines to calculate userID by token. The userID will // be equal to token if this function is nil. calcUserIDFunc func(token string) (userID string, ok bool) } |
很簡單的結(jié)構(gòu)。websocketHandler 實現(xiàn)了 http.Handler 接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// First try to upgrade connection to websocket. If success, connection will // be kept until client send close message or server drop them. func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsConn, err := wh.upgrader.Upgrade(w, r, nil) if err != nil { return } defer wsConn.Close() // handle Websocket request conn := NewConn(wsConn) conn.AfterReadFunc = func(messageType int, r io.Reader) { var rm RegisterMessage decoder := json.NewDecoder(r) if err := decoder.Decode(& rm ); err != nil { return } // calculate userID by token userID := rm .Token if wh.calcUserIDFunc != nil { uID, ok := wh.calcUserIDFunc( rm .Token) if !ok { return } userID = uID } // bind wh.binder.Bind(userID, rm .Event, conn) } conn.BeforeCloseFunc = func() { // unbind wh.binder.Unbind(conn) } conn.Listen() } |
首先將傳入的 http.Request 轉(zhuǎn)換為 websocket.Conn,再將其分裝為我們自定義的一個 wserver.Conn(封裝,或者說是組合,是 Go 語言的典型用法。記住,Go 語言沒有繼承,只有組合)。然后設(shè)置了 Conn 的 AfterReadFunc 和 BeforeCloseFunc 方法,接著啟動了 conn.Listen()。AfterReadFunc 意思是當(dāng) Conn 讀取到數(shù)據(jù)后,嘗試驗證并根據(jù) token 計算 userID,然乎 bind 注冊綁定。BeforeCloseFunc 則為 Conn 關(guān)閉前進行解綁操作。
pushHandler
pushHandler 則容易理解。它解析請求然后推送數(shù)據(jù):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// Authorize if needed. Then decode the request and push message to each // realted websocket connection. func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // authorize if s.authFunc != nil { if ok := s.authFunc(r); !ok { w.WriteHeader(http.StatusUnauthorized) return } } // read request var pm PushMessage decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pm); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } // validate the data if pm.UserID == "" || pm.Event == "" || pm.Message == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } cnt, err := s.push(pm.UserID, pm.Event, pm.Message) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } result := strings.NewReader( fmt .Sprintf( "message sent to %d clients" , cnt)) io.Copy(w, result) } |
Conn
Conn (此處指 wserver.Conn) 為 websocket.Conn 的包裝。
1
2
3
4
5
6
7
8
9
10
11
|
// Conn wraps websocket.Conn with Conn. It defines to listen and read // data from Conn. type Conn struct { Conn *websocket.Conn AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync .Once id string stopCh chan struct{} } |
最主要的方法為 Listen():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// Listen listens for receive data from websocket connection. It blocks // until websocket connection is closed. func (c *Conn) Listen() { c.Conn.SetCloseHandler(func(code int, text string) error { if c.BeforeCloseFunc != nil { c.BeforeCloseFunc() } if err := c.Close(); err != nil { log.Println(err) } message := websocket.FormatCloseMessage(code, "" ) c.Conn.WriteControl(websocket.CloseMessage, message, time .Now().Add( time .Second)) return nil }) // Keeps reading from Conn util get error. ReadLoop: for { select { case <-c.stopCh: break ReadLoop default: messageType, r, err := c.Conn.NextReader() if err != nil { // TODO: handle read error maybe break ReadLoop } if c.AfterReadFunc != nil { c.AfterReadFunc(messageType, r) } } } } |
主要設(shè)置了當(dāng) websocket 連接關(guān)閉時的處理和不停地讀取數(shù)據(jù)。
文中很難全面地描述整個代碼的運作流程,像具體閱讀代碼,請前往 github.com/alfred-zhong/wserver 獲取。
后記
代碼我已經(jīng)進行了一定的測試,也已經(jīng)在正式環(huán)境中運行了一段時間。但是代碼可能仍然不夠穩(wěn)定,所以在使用過程中出現(xiàn)問題,也實屬正常。隨意隨時歡迎大家給我提 issues 或者 PRs。
總結(jié)
以上所述是小編給大家介紹的利用 Go 語言編寫一個簡單的 WebSocket 推送服務(wù),希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對服務(wù)器之家網(wǎng)站的支持!
原文鏈接:https://www.cnblogs.com/snowInPluto/p/8688453.html