前言
在學習過程中發現redis的zset還可以用來實現輕量級的延時消息隊列功能,雖然可靠性還有待提高,但是對于一些對數據可靠性要求不那么高的功能要求完全可以實現。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel來實現一個小demo。
提前準備 安裝redis, redis-go
因為用的是macOS, 直接
1
2
|
$ brew install redis $ go get github.com/garyburd/redigo/redis |
又因為比較懶,生成任務的唯一id時,直接采用了bson中的objectId,所以:
1
|
$ go get gopkg.in/mgo.v2/bson |
唯一id不是必須有,但如果之后有實際應用需要攜帶,便于查找相應任務。
生產者
通過一個for循環生成10w個任務, 每一個任務有不同的時間
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func producer() { count := 0 //生成100000個任務 for count < 100000 { count++ dealTime := int64(rand.Intn(5)) + time.Now().Unix() uuid := bson.NewObjectId().Hex() redis.Client.AddJob(&job.JobMessage{ Id: uuid, DealTime: dealTime, }, + int64(dealTime)) } } |
其中AddJob函數在另一個包中, 將上一個函數中隨機生成的時間作為需要處理的時間戳.
1
2
3
4
5
6
7
8
|
// 添加任務 func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" conn.Do("zadd", key, dealTime, util.JsonEncode(msg)) } |
消費者
消費者處理流程分為兩個步驟:
- 獲取小于等于當前時間戳的任務
- 通過刪除當前任務來判斷誰獲得了當前任務
因為在獲取小于等于當前時間戳的任務時,可能有多個go routine同時讀到了當前任務,而只有一個任務可以來處理當前任務。因此我們需要通過一個方案來判斷究竟由誰來處理這個任務(當然如果只有一個消費者可以讀到就直接處理):這個時候可以通過redis的刪除操作來獲取,因為刪除指定value時只有成功的操作才會返回不為0,所以我們可以認為刪除當前隊列成功的那個go routine拿到了當前的任務。
下面是代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// 消費者 func consumer() { // 啟動10個go routine一起去拿 count := 0 for count < 10 { go func() { for { jobs := redis.Client.GetJob() if len(jobs) <= 0 { time.Sleep(time.Second * 1) continue } currentJob := jobs[0] // 如果當前搶redis隊列成功, if redis.Client.DelJob(currentJob) > 0 { var jobMessage job.JobMessage util.JsonDecode(currentJob, &jobMessage) //自定義的json解析函數 handleMessage(&jobMessage) } } }() count++ } } // 處理任務用函數 func handleMessage(msg *job.JobMessage) { fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime) go func() { countChan <- true }() } |
redis部分的代碼,獲取任務和刪除任務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 獲取任務 func (client *RedisClient) GetJob() []string { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" timeNow := time.Now().Unix() ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1)) if err != nil { panic(err) } return ret } // 刪除當前任務, 用來判斷是否搶到了當前任務 func (client *RedisClient) DelJob(value string) int { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" ret, err := redis.Int(conn.Do("zrem", key, value)) if err != nil { panic(err) } return ret } |
代碼大抵如此。最后跑起來之后,大概每3-4秒鐘能夠處理掉1w個任務,速度上確實是...
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://studygolang.com/articles/24925