創建一個websocket的服務端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
package smile import ( "errors" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) const ( // 允許等待的寫入時間 writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 512 ) // 最大的連接ID,每次連接都加1 處理 var maxConnId int64 // 客戶端讀寫消息 type wsMessage struct { // websocket.TextMessage 消息類型 messageType int data []byte } // ws 的所有連接 // 用于廣播 var wsConnAll map[int64]*wsConnection var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // 允許所有的CORS 跨域請求,正式環境可以關閉 CheckOrigin: func(r *http.Request) bool { return true }, } // 客戶端連接 type wsConnection struct { wsSocket *websocket.Conn // 底層websocket inChan chan *wsMessage // 讀隊列 outChan chan *wsMessage // 寫隊列 mutex sync.Mutex // 避免重復關閉管道,加鎖處理 isClosed bool closeChan chan byte // 關閉通知 id int64 } func wsHandler(resp http.ResponseWriter, req *http.Request) { // 應答客戶端告知升級連接為websocket wsSocket, err := upgrader.Upgrade(resp, req, nil) if err != nil { log.Println("升級為websocket失敗", err.Error()) return } maxConnId++ // TODO 如果要控制連接數可以計算,wsConnAll長度 // 連接數保持一定數量,超過的部分不提供服務 wsConn := &wsConnection{ wsSocket: wsSocket, inChan: make(chan *wsMessage, 1000), outChan: make(chan *wsMessage, 1000), closeChan: make(chan byte), isClosed: false, id: maxConnId, } wsConnAll[maxConnId] = wsConn log.Println("當前在線人數", len(wsConnAll)) // 處理器,發送定時信息,避免意外關閉 go wsConn.processLoop() // 讀協程 go wsConn.wsReadLoop() // 寫協程 go wsConn.wsWriteLoop() } // 處理隊列中的消息 func (wsConn *wsConnection) processLoop() { // 處理消息隊列中的消息 // 獲取到消息隊列中的消息,處理完成后,發送消息給客戶端 for { msg, err := wsConn.wsRead() if err != nil { log.Println("獲取消息出現錯誤", err.Error()) break } log.Println("接收到消息", string(msg.data)) // 修改以下內容把客戶端傳遞的消息傳遞給處理程序 err = wsConn.wsWrite(msg.messageType, msg.data) if err != nil { log.Println("發送消息給客戶端出現錯誤", err.Error()) break } } } // 處理消息隊列中的消息 func (wsConn *wsConnection) wsReadLoop() { // 設置消息的最大長度 wsConn.wsSocket.SetReadLimit(maxMessageSize) wsConn.wsSocket.SetReadDeadline(time.Now().Add(pongWait)) for { // 讀一個message msgType, data, err := wsConn.wsSocket.ReadMessage() if err != nil { websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) log.Println("消息讀取出現錯誤", err.Error()) wsConn.close() return } req := &wsMessage{ msgType, data, } // 放入請求隊列,消息入棧 select { case wsConn.inChan <- req: case <-wsConn.closeChan: return } } } // 發送消息給客戶端 func (wsConn *wsConnection) wsWriteLoop() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() }() for { select { // 取一個應答 case msg := <-wsConn.outChan: // 寫給websocket if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil { log.Println("發送消息給客戶端發生錯誤", err.Error()) // 切斷服務 wsConn.close() return } case <-wsConn.closeChan: // 獲取到關閉通知 return case <-ticker.C: // 出現超時情況 wsConn.wsSocket.SetWriteDeadline(time.Now().Add(writeWait)) if err := wsConn.wsSocket.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // 寫入消息到隊列中 func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error { select { case wsConn.outChan <- &wsMessage{messageType, data}: case <-wsConn.closeChan: return errors.New("連接已經關閉") } return nil } // 讀取消息隊列中的消息 func (wsConn *wsConnection) wsRead() (*wsMessage, error) { select { case msg := <-wsConn.inChan: // 獲取到消息隊列中的消息 return msg, nil case <-wsConn.closeChan: } return nil, errors.New("連接已經關閉") } // 關閉連接 func (wsConn *wsConnection) close() { log.Println("關閉連接被調用了") wsConn.wsSocket.Close() wsConn.mutex.Lock() defer wsConn.mutex.Unlock() if wsConn.isClosed == false { wsConn.isClosed = true // 刪除這個連接的變量 delete(wsConnAll, wsConn.id) close(wsConn.closeChan) } } // 啟動程序 func StartWebsocket(addrPort string) { wsConnAll = make(map[int64]*wsConnection) http.HandleFunc("/ws", wsHandler) http.ListenAndServe(addrPort, nil) } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://studygolang.com/articles/23126