一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Golang - Go 實現百萬WebSocket連接的方法示例

Go 實現百萬WebSocket連接的方法示例

2020-05-27 10:13watermelo Golang

這篇文章主要介紹了Go 實現百萬WebSocket連接的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

Go 實現百萬WebSocket連接的方法示例

大家好!我是 Sergey Kamardin,是 Mail.Ru 的一名工程師。

本文主要介紹如何使用 Go 開發高負載的 WebSocket 服務。

如果你熟悉 WebSockets,但對 Go 了解不多,仍希望你對這篇文章的想法和性能優化方面感興趣。

1. 簡介

為了定義本文的討論范圍,有必要說明我們為什么需要這個服務。

Mail.Ru 有很多有狀態系統。用戶的電子郵件存儲就是其中之一。我們有幾種方法可以跟蹤該系統的狀態變化以及系統事件,主要是通過定期系統輪詢或者狀態變化時的系統通知來實現。

兩種方式各有利弊。但是對于郵件而言,用戶收到新郵件的速度越快越好。

郵件輪詢大約每秒 50,000 個 HTTP 查詢,其中 60% 返回 304 狀態,這意味著郵箱中沒有任何更改。

因此,為了減少服務器的負載并加快向用戶發送郵件的速度,我們決定通過用發布 - 訂閱服務(也稱為消息總線,消息代理或事件管道)的模式來造一個輪子。一端接收有關狀態更改的通知,另一端訂閱此類通知。

之前的架構:

Go 實現百萬WebSocket連接的方法示例

現在的架構:

Go 實現百萬WebSocket連接的方法示例

第一個方案是之前的架構。瀏覽器定期輪詢 API 并查詢存儲(郵箱服務)是否有更改。

第二種方案是現在的架構。瀏覽器與通知 API 建立了 WebSocket 連接,通知 API 是總線服務的消費者。一旦接收到新郵件后,Storage 會將有關它的通知發送到總線(1),總線將其發送給訂閱者(2)。 API 通過連接發送這個收到的通知,將其發送到用戶的瀏覽器(3)。

所以現在我們將討論這個 API 或者這個 WebSocket 服務。展望一下未來,我們的服務將來可能會有 300 萬個在線連接。

2. 常用的方式

我們來看看如何在沒有任何優化的情況下使用 Go 實現服務器的某些部分。

在我們繼續使用 net/http 之前,來談談如何發送和接收數據。這個數據位于 WebSocket 協議上(例如 JSON 對象),我們在下文中將其稱為包。

我們先來實現 Channel 結構體,該結構體將包含在 WebSocket 連接上發送和接收數據包的邏輯。

2.1 Channel 結構體

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// WebSocket Channel 的實現
// Packet 結構體表示應用程序級數據
type Packet struct {
  ...
}
 
// Channel 裝飾用戶連接
type Channel struct {
  conn net.Conn  // WebSocket 連接
  send chan Packet // 傳出的 packets 隊列
}
 
func NewChannel(conn net.Conn) *Channel {
  c := &Channel{
    conn: conn,
    send: make(chan Packet, N),
  }
 
  go c.reader()
  go c.writer()
 
  return c
}

我想讓你注意的是 readerwriter goroutines。每個 goroutine 都需要內存棧,初始大小可能為 2 到 8 KB,具體 取決于操作系統 和 Go 版本。

關于上面提到的 300 萬個線上連接,為此我們需要消耗 24 GB 的內存(假設單個 goroutine 消耗 4 KB 棧內存)用于所有的連接。并且這還沒包括為 Channel 結構體分配的內存, ch.send 傳出的數據包占用的內存以及其他內部字段的內存。

2.2 I/O goroutines

讓我們來看看 reader 的實現:

?
1
2
3
4
5
6
7
8
9
10
// Channel's reading goroutine.
func (c *Channel) reader() {
  // 創建一個緩沖 read 來減少 read 的系統調用
  buf := bufio.NewReader(c.conn)
 
  for {
    pkt, _ := readPacket(buf)
    c.handle(pkt)
  }
}

這里我們使用了 bufio.Reader 來減少 read() 系統調用的次數,并盡可能多地讀取 buf 中緩沖區大小所允許的數量。在這個無限循環中,我們等待新數據的到來。請先記住這句話: 等待新數據的到來 。我們稍后會回顧。

我們先不考慮傳入的數據包的解析和處理,因為它對我們討論的優化并不重要。但是, buf 值得我們關注:默認情況下,它是 4 KB,這意味著連接還需要 12 GB 的內存。 writer 也有類似的情況:

?
1
2
3
4
5
6
7
8
9
10
// Channel's writing goroutine.
func (c *Channel) writer() {
  // 創建一個緩沖 write 來減少 write 的系統調用
  buf := bufio.NewWriter(c.conn)
 
  for pkt := range c.send {
    _ := writePacket(buf, pkt)
    buf.Flush()
  }
}

我們通過 Channel 的 c.send 遍歷將數據包傳出 并將它們寫入緩沖區。細心的讀者可能猜到了,這是我們 300 萬個連接的另外 12 GB 的內存消耗。

2.3 HTTP

已經實現了一個簡單的 Channel ,現在我們需要使用 WebSocket 連接。由于仍然處于常用的方式的標題下,所以我們以常用的方式繼續。

注意:如果你不知道 WebSocket 的運行原理,需要記住客戶端會通過名為 Upgrade 的特殊 HTTP 機制轉換到 WebSocket 協議。在成功處理 Upgrade 請求后,服務端和客戶端將使用 TCP 連接來傳輸二進制的 WebSocket 幀。 這里 是連接的內部結構的說明。

?
1
2
3
4
5
6
7
8
9
10
11
// 常用的轉換為 WebSocket 的方法
import (
  "net/http"
  "some/websocket"
)
 
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
  conn, _ := websocket.Upgrade(r, w)
  ch := NewChannel(conn)
  //...
})

需要注意的是, http.ResponseWriterbufio.Readerbufio.Writer (均為 4 KB 的緩沖區)分配了內存,用于對 *http.Request 的初始化和進一步的響應寫入。

無論使用哪種 WebSocket 庫,在 Upgrade 成功后, 服務端在調用 responseWriter.Hijack() 之后都會收到 I/O 緩沖區和 TCP 連接。

提示:在某些情況下, go:linkname 可被用于通過調用 net/http.putBufio {Reader, Writer} 將緩沖區返回給 net/http 內的 sync.Pool

因此,我們還需要 24 GB 的內存用于 300 萬個連接。

那么,現在為了一個什么功能都沒有的應用程序,一共需要消耗 72 GB 的內存!

3. 優化

我們回顧一下在簡介部分中談到的內容,并記住用戶連接的方式。在切換到 WebSocket 后,客戶端會通過連接發送包含相關事件的數據包。然后(不考慮 ping/pong 等消息),客戶端可能在整個連接的生命周期中不會發送任何其他內容。

連接的生命周期可能持續幾秒到幾天。

因此,大部分時間 Channel.reader()Channel.writer() 都在等待接收或發送數據。與它們一起等待的還有每個大小為 4 KB 的 I/O 緩沖區。

現在我們對哪些地方可以做優化應該比較清晰了。

3.1 Netpoll

Channel.reader() 通過給 bufio.Reader.Read() 內的 conn.Read() 加鎖來 等待新數據的到來 (譯者注:上文中的伏筆),一旦連接中有數據,Go runtime(譯者注:runtime 包含 Go 運行時的系統交互的操作,這里保留原文)“喚醒” goroutine 并允許它讀取下一個數據包。在此之后,goroutine 再次被鎖定,同時等待新的數據。讓我們看看 Go runtime 來理解 goroutine 為什么必須“被喚醒”。

如果我們查看 conn.Read() 的實現 ,將會在其中看到 net.netFD.Read() 調用 :

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Go 內部的非阻塞讀.
// net/fd_unix.go
 
func (fd *netFD) Read(p []byte) (n int, err error) {
  //...
  for {
    n, err = syscall.Read(fd.sysfd, p)
    if err != nil {
      n = 0
      if err == syscall.EAGAIN {
        if err = fd.pd.waitRead(); err == nil {
          continue
        }
      }
    }
    //...
    break
  }
  //...
}

Go 在非阻塞模式下使用套接字。 EAGAIN 表示套接字中沒有數據,并且讀取空套接字時不會被鎖定,操作系統將返回控制權給我們。(譯者注:EAGAIN 表示目前沒有可用數據,請稍后再試)

我們從連接文件描述符中看到一個 read() 系統調用函數。如果 read 返回 EAGAIN 錯誤 ,則 runtime 調用 pollDesc.waitRead() :

?
1
2
3
4
5
6
7
8
9
10
11
// Go 內部關于 netpoll 的使用
// net/fd_poll_runtime.go
 
func (pd *pollDesc) waitRead() error {
  return pd.wait('r')
}
 
func (pd *pollDesc) wait(mode int) error {
  res := runtime_pollWait(pd.runtimeCtx, mode)
  //...
}

如果 深入挖掘 ,我們將看到 netpoll 在 Linux 中是使用 epoll 實現的,而在 BSD 中是使用 kqueue 實現的。為什么不對連接使用相同的方法?我們可以分配一個 read 緩沖區并僅在真正需要時啟動 read goroutine:當套接字中有可讀的數據時。

在 github.com/golang/go 上,有一個導出 netpoll 函數的 issue 。

3.2 去除 goroutines 的內存消耗

假設我們有 Go 的 netpoll 實現 。現在我們可以避免在內部緩沖區啟動 Channel.reader() goroutine,而是在連接中訂閱可讀數據的事件:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 使用 netpoll
ch := NewChannel(conn)
 
// 通過 netpoll 實例觀察 conn
poller.Start(conn, netpoll.EventRead, func() {
  // 我們在這里產生 goroutine 以防止在輪詢從 ch 接收數據包時被鎖。
  go Receive(ch)
})
 
// Receive 從 conn 讀取數據包并以某種方式處理它。
func (ch *Channel) Receive() {
  buf := bufio.NewReader(ch.conn)
  pkt := readPacket(buf)
  c.handle(pkt)
}

Channel.writer() 更簡單,因為我們只能在發送數據包時運行 goroutine 并分配緩沖區:

?
1
2
3
4
5
6
7
// 當我們需要時啟動 writer goroutine
func (ch *Channel) Send(p Packet) {
  if c.noWriterYet() {
    go ch.writer()
  }
  ch.send <- p
}

需要注意的是,當操作系統在 write() 調用上返回 EAGAIN 時,我們不處理這種情況。我們依靠 Go runtime 來處理這種情況,因為這種情況在服務器上很少見。然而,如果有必要,它可以以與 reader() 相同的方式處理。

當從 ch.send (一個或幾個)讀取傳出數據包后,writer 將完成其操作并釋放 goroutine 的內存和發送緩沖區的內存。

完美!我們通過去除兩個運行的 goroutine 中的內存消耗和 I/O 緩沖區的內存消耗節省了 48 GB。

3.3 資源控制

大量連接不僅僅涉及到內存消耗高的問題。在開發服務時,我們遇到了反復出現的競態條件和 self-DDoS 造成的死鎖。

例如,如果由于某種原因我們突然無法處理 ping/pong 消息,但是空閑連接的處理程序繼續關閉這樣的連接(假設連接被破壞,沒有提供數據),客戶端每隔 N 秒失去連接并嘗試再次連接而不是等待事件。

被鎖或超載的服務器停止服務,如果它之前的負載均衡器(例如,nginx)將請求傳遞給下一個服務器實例,這將是不錯的。

此外,無論服務器負載如何,如果所有客戶端突然(可能是由于錯誤原因)向我們發送數據包,之前的 48 GB 內存的消耗將不可避免,因為需要為每個連接分配 goroutine 和緩沖區。

Goroutine 池

上面的情況,我們可以使用 goroutine 池限制同時處理的數據包數量。下面是這種池的簡單實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// goroutine 池的簡單實現
package gopool
 
func New(size int) *Pool {
  return &Pool{
    work: make(chan func()),
    sem: make(chan struct{}, size),
  }
}
 
func (p *Pool) Schedule(task func()) error {
  select {
  case p.work <- task:
  case p.sem <- struct{}{}:
    go p.worker(task)
  }
}
 
func (p *Pool) worker(task func()) {
  defer func() { <-p.sem }
  for {
    task()
    task = <-p.work
  }
}

現在我們的 netpoll 代碼如下:

?
1
2
3
4
5
6
7
8
9
// 處理 goroutine 池中的輪詢事件。
pool := gopool.New(128)
 
poller.Start(conn, netpoll.EventRead, func() {
  // 我們在所有 worker 被占用時阻塞 poller
  pool.Schedule(func() {
    Receive(ch)
  })
})

現在我們不僅在套接字中有可讀數據時讀取,而且還在第一次機會獲取池中的空閑 goroutine。??

同樣,我們修改 Send()

?
1
2
3
4
5
6
7
8
9
// 復用 writing goroutine
pool := gopool.New(128)
 
func (ch *Channel) Send(p Packet) {
  if c.noWriterYet() {
    pool.Schedule(ch.writer)
  }
  ch.send <- p
}

取代 go ch.writer() ,我們想寫一個復用的 goroutines。因此,對于擁有 N 個 goroutines 的池,我們可以保證同時處理 N 個請求并且在 N + 1 的時候, 我們不會分配 N + 1 個緩沖區。 goroutine 池還允許我們限制新連接的 Accept()Upgrade() ,并避免大多數的 DDoS 攻擊。

3.4 upgrade 零拷貝

如前所述,客戶端使用 HTTP Upgrade 切換到 WebSocket 協議。這就是 WebSocket 協議的樣子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
## HTTP Upgrade 示例
 
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
 
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

也就是說,在我們的例子中,需要 HTTP 請求及其 Header 用于切換到 WebSocket 協議。這些知識以及 http.Request 中存儲的內容 表明,為了優化,我們需要在處理 HTTP 請求時放棄不必要的內存分配和內存復制,并棄用 net/http 庫。

例如, http.Request 有一個與 Header 具有相同名稱的字段 ,這個字段用于將數據從連接中復制出來填充請求頭。想象一下,該字段需要消耗多少額外內存,例如碰到比較大的 Cookie 頭。

WebSocket 的實現

不幸的是,在我們優化的時候所有存在的庫都是使用標準的 net/http 庫進行升級。而且,(兩個)庫都不能使用上述的讀寫優化方案。為了采用這些優化方案,我們需要用一個比較低級的 API 來處理 WebSocket。要重用緩沖區,我們需要把協議函數變成這樣:

?
1
2
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

如果有一個這種 API 的庫,我們可以按下面的方式從連接中讀取數據包(數據包的寫入也一樣):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 預期的 WebSocket 實現API
// getReadBuf, putReadBuf 用來復用 *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
 
// 當 conn 中的數據可讀取時,readPacket 被調用
func readPacket(conn io.Reader) error {
  buf := getReadBuf()
  defer putReadBuf(buf)
 
  buf.Reset(conn)
  frame, _ := ReadFrame(buf)
  parsePacket(frame.Payload)
  //...
}

簡單來說,我們需要自己的 WebSocket 庫。

github.com/gobwas/ws

在意識形態上,編寫 ws 庫是為了不將其協議操作邏輯強加給用戶。所有讀寫方法都實現了標準的 io.Reader 和 io.Writer 接口,這樣就可以使用或不使用緩沖或任何其他 I/O 包裝器。

除了來自標準庫 net/http 的升級請求之外, ws 還支持零拷貝升級,升級請求的處理以及切換到 WebSocket 無需分配內存或復制內存。 ws.Upgrade() 接受 io.ReadWriternet.Conn 實現了此接口)。換句話說,我們可以使用標準的 net.Listen() 將接收到的連接從 ln.Accept() 轉移給 ws.Upgrade() 。該庫使得可以復制任何請求數據以供應用程序使用(例如, Cookie 用來驗證會話)。

下面是升級請求的 基準測試 結果:標準庫 net/http 的服務與用零拷貝升級的 net.Listen()

?
1
2
BenchmarkUpgradeHTTP  5156 ns/op  8576 B/op  9 allocs/op
BenchmarkUpgradeTCP   973 ns/op   0 B/op    0 allocs/op

切換到 ws零拷貝升級 為我們節省了另外的 24 GB 內存 - 在 net/http 處理請求時為 I/O 緩沖區分配的空間。

3.5 摘要

我們總結一下這些優化。

  • 內部有緩沖區的 read goroutine 是代價比較大的。解決方案:netpoll(epoll,kqueue); 重用緩沖區。
  • 內部有緩沖區的 write goroutine 是代價比較大的。解決方案:需要的時候才啟動 goroutine; 重用緩沖區。
  • 如果有大量的連接,netpoll 將無法正常工作。解決方案:使用 goroutines 池并限制池的 worker 數。
  • net/http 不是處理升級到 WebSocket 的最快方法。解決方案:在裸 TCP 連接上使用內存零拷貝升級。

服務的代碼看起來如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// WebSocket 服務器示例,包含 netpoll,goroutine 池和內存零拷貝的升級。
import (
  "net"
  "github.com/gobwas/ws"
)
 
ln, _ := net.Listen("tcp", ":8080")
 
for {
  // 嘗試在空閑池的 worker 內的接收傳入的連接。如果超過 1ms 沒有空閑 worker,則稍后再試。這有助于防止 self-ddos 或耗盡服務器資源的情況。
  err := pool.ScheduleTimeout(time.Millisecond, func() {
    conn := ln.Accept()
    _ = ws.Upgrade(conn)
 
    // 使用 Channel 結構體包裝 WebSocket 連接
    // 將幫助我們處理應用包
    ch := NewChannel(conn)
 
    // 等待連接傳入字節
    poller.Start(conn, netpoll.EventRead, func() {
      // 不要超過資源限制
      pool.Schedule(func() {
        // 讀取并處理傳入的包
        ch.Recevie()
      })
    })
  })
  if err != nil {
    time.Sleep(time.Millisecond)
  }
}

總結

過早優化是編程中所有邪惡(或至少大部分)的根源。 -- Donald Knuth

當然,上述優化是和需求相關的,但并非所有情況下都是如此。例如,如果空閑資源(內存,CPU)和線上連接數之間的比率比較高,則優化可能沒有意義。但是,通過了解優化的位置和內容,我們會受益匪淺。

感謝你的關注!

引用

https://github.com/mailru/easygo

https://github.com/gobwas/ws

https://github.com/gobwas/ws-examples

https://github.com/gobwas/httphead

Russian version of this article

 以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://studygolang.com/articles/22501

延伸 · 閱讀

精彩推薦
  • GolangGolang中Bit數組的實現方式

    Golang中Bit數組的實現方式

    這篇文章主要介紹了Golang中Bit數組的實現方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    天易獨尊11682021-06-09
  • Golanggo日志系統logrus顯示文件和行號的操作

    go日志系統logrus顯示文件和行號的操作

    這篇文章主要介紹了go日志系統logrus顯示文件和行號的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    SmallQinYan12302021-02-02
  • Golanggolang json.Marshal 特殊html字符被轉義的解決方法

    golang json.Marshal 特殊html字符被轉義的解決方法

    今天小編就為大家分享一篇golang json.Marshal 特殊html字符被轉義的解決方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧 ...

    李浩的life12792020-05-27
  • GolangGolang通脈之數據類型詳情

    Golang通脈之數據類型詳情

    這篇文章主要介紹了Golang通脈之數據類型,在編程語言中標識符就是定義的具有某種意義的詞,比如變量名、常量名、函數名等等,Go語言中標識符允許由...

    4272021-11-24
  • Golanggo語言制作端口掃描器

    go語言制作端口掃描器

    本文給大家分享的是使用go語言編寫的TCP端口掃描器,可以選擇IP范圍,掃描的端口,以及多線程,有需要的小伙伴可以參考下。 ...

    腳本之家3642020-04-25
  • Golanggolang 通過ssh代理連接mysql的操作

    golang 通過ssh代理連接mysql的操作

    這篇文章主要介紹了golang 通過ssh代理連接mysql的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    a165861639710342021-03-08
  • Golanggolang如何使用struct的tag屬性的詳細介紹

    golang如何使用struct的tag屬性的詳細介紹

    這篇文章主要介紹了golang如何使用struct的tag屬性的詳細介紹,從例子說起,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看...

    Go語言中文網11352020-05-21
  • Golanggolang的httpserver優雅重啟方法詳解

    golang的httpserver優雅重啟方法詳解

    這篇文章主要給大家介紹了關于golang的httpserver優雅重啟的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,...

    helight2992020-05-14
主站蜘蛛池模板: 日本一在线中文字幕天堂 | 欧美一区二区三区不卡视频 | 久久全国免费久久青青小草 | 91亚洲一区二区在线观看不卡 | 我的绝色岳每雯雯 | 免费看一级 | 阿v天堂2020| 古装一级无遮挡毛片免费观看 | 国产精品毛片无码 | 日本孕妇与黑人xxxxxx | 成年女人毛片免费观看97 | 人人爽人人香蕉 | 爱情岛论坛自拍永久入口 | 俄罗斯freeoo性另类 | 99久久伊人一区二区yy5099 | 国产思妍小仙女一二区 | 黑人巨荃大战乌克兰美女 | 我的妹妹最近有点怪在线观看 | 色综合色狠狠天天综合色hd | 好男人资源在线观看免费的 | 国产成人精品1024在线 | 91制片厂制作传媒破解版免费 | 欧美一级h | 亚洲啊v| 男人插曲女人下面 | 青青青青青国产免费手机看视频 | 操黄| 日剧整部剧护妻狂魔免费观看全集 | 亚洲一区二区三区福利在线 | 男人狂躁女人下面狂叫图片 | 国产91精品在线播放 | 国产视频久久 | 欧美高清免费一级在线 | 婷婷丁香视频 | 国产巨大bbbb俄罗斯 | 美女扒开胸罩露出奶了无遮挡免费 | 思思91精品国产综合在线 | 四虎影视紧急入口地址大全 | 精品国产精品国产 | 国产成人+亚洲欧洲 | 亚洲欧美精品一区天堂久久 |