0. redis通信協議
redis的客戶端(redis-cli)和服務端(redis-server)的通信是建立在tcp連接之上, 兩者之間數據傳輸的編碼解碼方式就是所謂的redis通信協議。所以,只要我們的redis-cli實現了這個協議的解析和編碼,那么我們就可以完成所有的redis操作。
redis 協議設計的非常易讀,也易于實現,關于具體的redis通信協議請參考:通信協議(protocol)。后面我們在實現這個協議的過程中也會簡單重復介紹一下具體實現
1. 建立tcp連接
redis客戶端和服務端的通信是建立tcp連接之上,所以第一步自然是先建立連接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package main import ( "flag" "log" "net" ) var host string var port string func init() { flag.StringVar(&host, "h", "localhost", "hsot") flag.StringVar(&port, "p", "6379", "port") } func main() { flag.Parse() tcpAddr := &net.TCPAddr{IP: net.ParseIP(host), Port: port} conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { log.Println(err) } defer conn.Close() // to be continue } |
后續我們發送和接受數據便都可以使用conn.Read()和conn.Write()來進行了
2. 發送請求
發送請求第一個第一個字節是"*",中間是包含命令本身的參數個數,后面跟著"\r\n" 。之后使用"$"加參數字節數量并使用"\r\n"結尾,然后緊跟參數內容同時也使用"\r\n"結尾。如執行 SET key liangwt 客戶端發送的請求為"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$7\r\nliangwt\r\n"
注意:
- 命令本身也作為協議的其中一個參數來發送
- \r\n 對應byte的十進制為 13 10
我們可以使用telnet測試下
1
2
3
4
5
6
7
8
9
10
11
12
|
wentao@bj:~/github.com/liangwt/redis-cli$ telnet 127.0.0.1 6379 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. *3 $3 SET $3 key $7 liangwt +OK |
先暫時忽略服務端的回復,通過telnet我們可以看出請求協議非常簡單,所以對于請求協議的實現不做過多的介紹了,直接放代碼(如下使用基于字符串拼接,只是為了更直觀的演示,效率并不高,實際代碼中我們使用bytes.Buffer來實現)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func MultiBulkMarshal(args ...string) string { var s string s = "*" s += strconv.Itoa(len(args)) s += "\r\n" // 命令所有參數 for _, v := range args { s += "$" s += strconv.Itoa(len(v)) s += "\r\n" s += v s += "\r\n" } return s } |
在實現了對命令和參數進行編碼之后,我們便可以通過conn.Write()把數據推送到服務端
1
2
3
4
5
6
7
8
9
|
func main() { // .... req := MultiBulkMarshal("SET", "key", "liangwt") _, err = conn.Write([]byte(req)) if err != nil { log.Fatal(err) } // to be continue } |
3. 獲取回復
我們首先實現通過tcp獲取服務端返回值,就是上面提到過的conn.Read()。
1
2
3
4
5
6
7
8
9
|
func main() { // .... p := make([]byte, 1024) _, err = conn.Read(p) if err != nil { log.Fatal(err) } // to be continue } |
4. 解析回復
我們拿到p之后我們就可以解析返回值了,redis服務端的回復是分為幾種情況的
- 狀態回復
- 錯誤回復
- 整數回復
- 批量回復
- 多條批量回復
我們把前四種單獨看作一組,因為他們都是單一類型的返回值
我們把最后的多條批量回復看成單獨的一組,因為它是包含前面幾種類型的混合類型。而且你可以發現它和我們的請求協議是一樣的
也正是基于以上的考慮我們創建兩個函數來分別解析單一類型和混合類型,這樣在解析混合類型中的某一類型時就只需要調用單一類型解析的函數即可
在解析具體協議前我們先實現一個是讀取到\r\n為止的函數
1
2
3
4
5
6
7
8
9
10
11
|
func ReadLine(p []byte) ([]byte, error) { for i := 0; i < len(p); i++ { if p[i] == '\r' { if p[i+1] != '\n' { return []byte{}, errors.New("format error") } return p[0:i], nil } } return []byte{}, errors.New("format error") } |
第一種狀態回復:
狀態回復是一段以 "+" 開始, "\r\n" 結尾的單行字符串。如 SET 命令成功的返回值:"+OK\r\n"
所以我們判斷第一個字符是否等于 '+' 如果相等,則讀取到\r\n
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte err error length int ) switch p[0] { case '+': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } |
注:我們在返回實際回復內容的同時也返回了整個回復的長度,方便后面解析多條批量回復時定位下一次的解析位置
第二種錯誤回復:
錯誤回復的第一個字節是 "-", "\r\n" 結尾的單行字符串。如執行 SET key缺少參數時返回值:"-ERR wrong number of arguments for 'set' command\r\n"
錯誤回復和狀態回復非常相似,解析方式也是一樣到。所以我們只需添加一個case即可
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte err error length int ) switch p[0] { case '+', '-': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } |
第三種整數回復:
整數回復的第一個字節是":",中間是字符串表示的整數,"\r\n" 結尾的單行字符串。如執行LLEN mylist命令時返回 ":10\r\n"
整數回復也和上面兩種是一樣的,只不過返回的是字符串表示的十進制整數
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte err error length int ) switch p[0] { case '+', '-', ':': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } |
第四種批量回復:
批量回復的第一個字節為 "$",接下來是字符串表示的整數,它表示實際回復的長度,之后跟著一個 "\r\n",再后面跟著的是實際回復數據,最末尾是另一個 "\r\n"。如GET key 命令的返回值:"$7\r\nliangwt\r\n"
所以批量回復解析的實現:
- 讀取第一行得到實際回復的長度
- 把字符串類型的長度轉換成對應十進制整數
- 從第二行開始位置往下讀對應長度
但是對于某些不存在的key,批量回復會將特殊值 -1 用作回復的長度值, 此時我們不需要繼續往下讀取實際回復。例如GET NOT_EXIST_KEY 返回值:"$-1", 所以我們需要對此特殊情況判斷,讓函數返回一個空對象(nil)而不是空值("")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func SingleUnMarshal(p []byte) ([]byte, int, error) { // .... case '$': n, err := ReadLine(p[1:]) if err != nil { return []byte{}, 0, err } l, err := strconv.Atoi(string(n)) if err != nil { return []byte{}, 0, err } if l == -1 { return nil, 0, nil } // +3 的原因 $ \r \n 三個字符 result = p[len(n)+3 : len(n)+3+l] length = len(n) + 5 + l } return result, length, err } |
思考:
為什么redis要使用提前告知字節數,然后往下讀取指定長度的方式,而不是直接讀取第二行到\r\n為止?
答案很明顯:此方式可以讓redis讀取返回值時不受具體的返回內容影響,在按行讀取的情況下,無論使用任何分割符都有可能導致redis在解析具體內容時把內容中的分割符當作時結尾,導致解析錯誤。
思考一下這種情況:我們SET key "liang\r\nwt" ,那么當我們GET key時,服務端返回值為"$9\r\nliang\r\nwt\r\n" 完全規避了value中的\r\n影響
第五種多條批量回復:
多條批量回復是由多個回復組成的數組,它的第一個字節為"*", 后跟一個字符串表示的整數值, 這個值記錄了多條批量回復所包含的回復數量, 再后面是一個"\r\n"。如LRANGE mylist 0 -1的返回值:"*3\r\n$1\r\n3\r\n$1\r\n2\r\n$1\r\n1"。
所以多條批量回復解析的實現:
- 解析第一行數據獲得字符串類型的回復數量
- 把字符串類型的長度轉換成對應十進制整數
- 按照單條回復依次逐個解析,一共解析成上面得到的數量
在這里我們用到了單條解析時返回的字節長度length,通過這個長度我們可以很方便的知道下次單條解析的開始位置為上一次位置+length
在解析多條批量回復時需要注意兩點:
第一,多條批量回復也可以是空白的(empty)。例如執行LRANGE NOT_EXIST_KEY 0 -1 服務端返回值"*0\r\n"。此時客戶端返回的應該空數組[][]byte
第二,多條批量回復也可以是無內容的(null multi bulk reply)。例如執行BLPOP key 1 服務端返回值"*-1\r\n"。此時客戶端返回的應該是nil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func MultiUnMarsh(p []byte) ([][]byte, error) { if p[0] != '*' { return [][]byte{}, errors.New("format error") } n, err := ReadLine(p[1:]) if err != nil { return [][]byte{}, err } l, err := strconv.Atoi(string(n)) if err != nil { return [][]byte{}, err } // 多條批量回復也可以是空白的(empty) if l == 0 { return [][]byte{}, nil } // 無內容的多條批量回復(null multi bulk reply)也是存在的, // 客戶端庫應該返回一個 null 對象, 而不是一個空數組。 if l == -1 { return nil, nil } result := make([][]byte, l) t := len(n) + 3 for i := 0; i < l; i++ { ret, length, err := SingleUnMarshal(p[t:]) if err != nil { return [][]byte{}, errors.New("format error") } result[i] = ret t += length } return result, nil } |
5. 命令行模式
一個可用的redis-cli自然是一個交互式的,用戶輸入指令然后輸出返回值。在go中我們可以使用以下代碼即可獲得一個類似的交互式命令行
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func main() { // .... for { fmt.Printf("%s:%d>", host, port) bio := bufio.NewReader(os.Stdin) input, _, err := bio.ReadLine() if err != nil { log.Fatal(err) } fmt.Printf("%s\n", input) } } |
我們運行以上代碼就可以實現
1
2
3
4
5
|
localhost:6379>set key liang set key liang localhost:6379>get key get key localhost:6379> |
結合上我們的redis發送請求和解析請求即可完成整個redis-cli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
func main() { // .... for { fmt.Printf("%s:%d>", host, port) // 獲取輸入命令和參數 bio := bufio.NewReader(os.Stdin) input, err := bio.ReadString('\n') if err != nil { log.Fatal(err) } fields := strings.Fields(input) // 編碼發送請求 req := MultiBulkMarshal(fields...) // 發送請求 _, err = conn.Write([]byte(req)) if err != nil { log.Fatal(err) } // 讀取返回內容 p := make([]byte, 1024) _, err = conn.Read(p) if err != nil { log.Fatal(err) } // 解析返回內容 if p[0] == '*' { result, err := MultiUnMarsh(p) } else { result, _, err := SingleUnMarshal(p) } } // .... } |
6. 總結
到目前為止我們的cli程序已經全部完成,但其實還有很多不完美地方。但核心的redis協議解析已經完成,使用這個解析我們能完成任何的cli與服務器之間的交互
更詳細的redis-cli實現可以參考我的github:A Simaple redis cli - Rclient
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://my.oschina.net/liangwt/blog/2231557