一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Golang - 使用go實現一個超級mini的消息隊列的示例代碼

使用go實現一個超級mini的消息隊列的示例代碼

2022-01-24 00:45壯士斷臂 Golang

本文主要介紹了使用go實現一個超級mini的消息隊列的示例代碼,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下

前言

趁著有空余時間,就想著擼一個mini的生產-消費消息隊列,說干就干了。自己是個javer,這次實現,特意換用了go。沒錯,是零基礎上手go,順便可以學學go。

前置知識:

  • go基本語法
  • 消息隊列概念,也就三個:生產者、消費者、隊列

目的

  • 沒想著實現多復雜,因為時間有限,就mini就好,mini到什么程度呢
  • 使用雙向鏈表數據結構作為隊列
  • 有多個topic可供生產者生成消息和消費者消費消息
  • 支持生產者并發寫
  • 支持消費者讀,且ok后,從隊列刪除
  • 消息不丟失(持久化)
  • 高性能(先這樣想)

 

設計

整體架構

使用go實現一個超級mini的消息隊列的示例代碼

協議

通訊協議底層使用tcp,mq是基于tcp自定義了一個協議,協議如下

使用go實現一個超級mini的消息隊列的示例代碼

type Msg struct {
 Id int64
 TopicLen int64
 Topic string
 // 1-consumer 2-producer 3-comsumer-ack 4-error
 MsgType int64 // 消息類型
 Len int64 // 消息長度
 Payload []byte // 消息
}

Payload使用字節數組,是因為不管數據是什么,只當做字節數組來處理即可。Msg承載著生產者生產的消息,消費者消費的消息,ACK、和錯誤消息,前兩者會有負載,而后兩者負載和長度都為空

協議的編解碼處理,就是對字節的處理,接下來有從字節轉為Msg,和從Msg轉為字節兩個函數

func BytesToMsg(reader io.Reader) Msg {

 m := Msg{}
 var buf [128]byte
 n, err := reader.Read(buf[:])
 if err != nil {
    fmt.Println("read failed, err:", err)
 }
 fmt.Println("read bytes:", n)
 // id
 buff := bytes.NewBuffer(buf[0:8])
 binary.Read(buff, binary.LittleEndian, &m.Id)
 // topiclen
 buff = bytes.NewBuffer(buf[8:16])
 binary.Read(buff, binary.LittleEndian, &m.TopicLen)
 // topic
 msgLastIndex := 16 + m.TopicLen
 m.Topic = string(buf[16: msgLastIndex])
 // msgtype
 buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
 binary.Read(buff, binary.LittleEndian, &m.MsgType)

 buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
 binary.Read(buff, binary.LittleEndian, &m.Len)

 if m.Len <= 0 {
    return m
 }

 m.Payload = buf[msgLastIndex + 16:]
 return m
}

func MsgToBytes(msg Msg) []byte {
 msg.TopicLen = int64(len([]byte(msg.Topic)))
 msg.Len = int64(len([]byte(msg.Payload)))

 var data []byte
 buf := bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.Id)
 data = append(data, buf.Bytes()...)

 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.TopicLen)
 data = append(data, buf.Bytes()...)
 
 data = append(data, []byte(msg.Topic)...)

 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.MsgType)
 data = append(data, buf.Bytes()...)
 
 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.Len)
 data = append(data, buf.Bytes()...)
 data = append(data, []byte(msg.Payload)...)

 return data
}

隊列

使用container/list,實現先入先出,生產者在隊尾寫,消費者在隊頭讀取

package broker

import (
 "container/list"
 "sync"
)

type Queue struct {
 len int
 data list.List
}

var lock sync.Mutex

func (queue *Queue) offer(msg Msg) {
 queue.data.PushBack(msg)
 queue.len = queue.data.Len()
}

func (queue *Queue) poll() Msg{
 if queue.len == 0 {
    return Msg{}
 }
 msg := queue.data.Front()
 return msg.Value.(Msg)
}

func (queue *Queue) delete(id int64) {
 lock.Lock()
 for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
    if msg.Value.(Msg).Id == id {
       queue.data.Remove(msg)
       queue.len = queue.data.Len()
       break
    }
 }
 lock.Unlock()
}

方法offer往隊列里插入數據,poll從隊列頭讀取數據素,delete根據消息ID從隊列刪除數據。這里使用Queue結構體對List進行封裝,其實是有必要的,List作為底層的數據結構,我們希望隱藏更多的底層操作,只給客戶提供基本的操作
delete操作是在消費者消費成功且發送ACK后,對消息從隊列里移除的,因為消費者可以多個同時消費,所以這里進入臨界區時加鎖(em,加鎖是否就一定會影響對性能有較大的影響呢)

broker

broker作為服務器角色,負責接收連接,接收和響應請求

package broker

import (
 "bufio"
 "net"
 "os"
 "sync"
 "time"
)

var topics = sync.Map{}

func handleErr(conn net.Conn)  {
 defer func() {
    if err := recover(); err != nil {
       println(err.(string))
       conn.Write(MsgToBytes(Msg{MsgType: 4}))
    }
 }()
}

func Process(conn net.Conn) {
 handleErr(conn)
 reader := bufio.NewReader(conn)
 msg := BytesToMsg(reader)
 queue, ok := topics.Load(msg.Topic)
 var res Msg
 if msg.MsgType == 1 {
    // comsumer
    if queue == nil || queue.(*Queue).len == 0{
       return
    }
    msg = queue.(*Queue).poll()
    msg.MsgType = 1
    res = msg
 } else if msg.MsgType == 2 {
    // producer
    if ! ok {
       queue = &Queue{}
       queue.(*Queue).data.Init()
       topics.Store(msg.Topic, queue)
    }
    queue.(*Queue).offer(msg)
    res = Msg{Id: msg.Id, MsgType: 2}
 } else if msg.MsgType == 3 {
    // consumer ack
    if queue == nil {
       return
    }
    queue.(*Queue).delete(msg.Id)

 }
 conn.Write(MsgToBytes(res))

}

MsgType等于1時,直接消費消息;MsgType等于2時是生產者生產消息,如果隊列為空,那么還需創建一個新的隊列,放在對應的topic下;MsgType等于3時,代表消費者成功消費,可以

刪除消息

我們說消息不丟失,這里實現不完全,我就實現了持久化(持久化也沒全部實現)。思路就是該topic對應的隊列里的消息,按協議格式進行序列化,當broker啟動時,從文件恢復
持久化需要考慮的是增量還是全量,需要保存多久,這些都會影響實現的難度和性能(想想Kafka和Redis的持久化),這里表示簡單實現就好:定時器定時保存

func Save()  {
 ticker := time.NewTicker(60)
 for {
    select {
    case <-ticker.C:
       topics.Range(func(key, value interface{}) bool {
          if value == nil {
             return false
          }
          file, _ := os.Open(key.(string))
          if file == nil {
             file, _ = os.Create(key.(string))
          }
          for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
             file.Write(MsgToBytes(msg.Value.(Msg)))
          }
          _ := file.Close()
          return false
       })
    default:
       time.Sleep(1)
    }
 }
}

有一個問題是,當上面的delete操作時,這里的file文件需不需要跟著delete掉對應的消息?答案是需要刪除的,如果不刪除,只能等下一次的全量持久化來覆蓋了,中間就有臟數據問題
下面是啟動邏輯

package main

import (
 "awesomeProject/broker"
 "fmt"
 "net"
)

func main()  {
 listen, err := net.Listen("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("listen failed, err:", err)
    return
 }
 go broker.Save()
 for {
    conn, err := listen.Accept()
    if err != nil {
       fmt.Print("accept failed, err:", err)
       continue
    }
    go broker.Process(conn)

 }
}

生產者

package main

import (
 "awesomeProject/broker"
 "fmt"
 "net"
)

func produce() {
 conn, err := net.Dial("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("connect failed, err:", err)
 }
 defer conn.Close()

 msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("我")}
 n, err := conn.Write(broker.MsgToBytes(msg))
 if err != nil {
    fmt.Print("write failed, err:", err)
 }

 fmt.Print(n)
}

消費者

package main

import (
 "awesomeProject/broker"
 "bytes"
 "fmt"
 "net"
)

func comsume() {
 conn, err := net.Dial("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("connect failed, err:", err)
 }
 defer conn.Close()

 msg := broker.Msg{Topic: "topic-test",  MsgType: 1}

 n, err := conn.Write(broker.MsgToBytes(msg))
 if err != nil {
    fmt.Println("write failed, err:", err)
 }
 fmt.Println("n", n)

 var res [128]byte
 conn.Read(res[:])
 buf := bytes.NewBuffer(res[:])
 receMsg := broker.BytesToMsg(buf)
 fmt.Print(receMsg)

 // ack
 conn, _ = net.Dial("tcp", "127.0.0.1:12345")
 l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
 if e != nil {
    fmt.Println("write failed, err:", err)
 }
 fmt.Println("l:", l)
}

消費者這里ack時重新創建了連接,如果不創建連接的話,那服務端那里就需要一直從conn讀取數據,直到結束。思考一下,像RabbitMQ的ack就有自動和手工的ack,如果是手工的ack,必然需要一個新的連接,因為不知道客戶端什么時候發送ack,自動的話,當然可以使用同一個連接,but這里就簡單創建一條新連接吧

啟動

先啟動broker,再啟動producer,然后啟動comsumer,OK,能跑,能實現發送消息到隊列,從隊列消費消息

 

總結

整體雖然簡單,但畢竟是使用go實現的,就是看似一頓操作猛如虎,實質慌如狗。第一時間就被go的gopath和go mod困擾住,后面語法的使用,比如指針,傳值傳引用等,最頭疼的就是類型轉換,作為一個javer,使用go進行類型轉換,著實被狠狠得虐了一番。

到此這篇關于使用go實現一個超級mini的消息隊列的示例代碼的文章就介紹到這了,更多相關go mini消息隊列內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://juejin.cn/post/7041085344481017887

延伸 · 閱讀

精彩推薦
  • Golanggolang如何使用struct的tag屬性的詳細介紹

    golang如何使用struct的tag屬性的詳細介紹

    這篇文章主要介紹了golang如何使用struct的tag屬性的詳細介紹,從例子說起,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看...

    Go語言中文網11352020-05-21
  • Golanggolang 通過ssh代理連接mysql的操作

    golang 通過ssh代理連接mysql的操作

    這篇文章主要介紹了golang 通過ssh代理連接mysql的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    a165861639710342021-03-08
  • Golanggo語言制作端口掃描器

    go語言制作端口掃描器

    本文給大家分享的是使用go語言編寫的TCP端口掃描器,可以選擇IP范圍,掃描的端口,以及多線程,有需要的小伙伴可以參考下。 ...

    腳本之家3642020-04-25
  • GolangGolang通脈之數據類型詳情

    Golang通脈之數據類型詳情

    這篇文章主要介紹了Golang通脈之數據類型,在編程語言中標識符就是定義的具有某種意義的詞,比如變量名、常量名、函數名等等,Go語言中標識符允許由...

    4272021-11-24
  • GolangGolang中Bit數組的實現方式

    Golang中Bit數組的實現方式

    這篇文章主要介紹了Golang中Bit數組的實現方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    天易獨尊11682021-06-09
  • Golanggolang的httpserver優雅重啟方法詳解

    golang的httpserver優雅重啟方法詳解

    這篇文章主要給大家介紹了關于golang的httpserver優雅重啟的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,...

    helight2992020-05-14
  • Golanggolang json.Marshal 特殊html字符被轉義的解決方法

    golang json.Marshal 特殊html字符被轉義的解決方法

    今天小編就為大家分享一篇golang json.Marshal 特殊html字符被轉義的解決方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧 ...

    李浩的life12792020-05-27
  • Golanggo日志系統logrus顯示文件和行號的操作

    go日志系統logrus顯示文件和行號的操作

    這篇文章主要介紹了go日志系統logrus顯示文件和行號的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧...

    SmallQinYan12302021-02-02
主站蜘蛛池模板: 欧美人交性视频在线香蕉 | 99视频九九精品视频在线观看 | 久久精品一区二区三区资源网 | 亚洲人成综合在线播放 | 国产亚洲欧美在线中文bt天堂网 | free性泰国女人hd | 久久久久激情免费观看 | 成人国产精品视频频 | a及毛片 | 国产网站免费观看 | 嫩草在线视频www免费观看 | 国产午夜精品福利久久 | 91制片厂制作传媒网站破解 | 手机跑分排行最新排名 | 日韩性大片免费 | 贤妻良母电影日本 | 国产一级毛片国语版 | 咪咪爱小说| 4虎tv| 99国产精品热久久久久久夜夜嗨 | 99久久免费国内精品 | 超高清欧美同性videos | 日韩欧一级毛片在线播无遮挡 | 日韩欧美亚洲一区精选 | 99热这里有免费国产精品 | 99国产在线视频 | 成人免费公开视频 | 近亲乱中文字幕 | 久久成人免费大片 | 亚洲精品国产一区二区第一页 | 欧美一区二区三区gg高清影视 | 国产精品久久久久网站 | zozo日本另类极品 | 欧美成人精品福利在线视频 | 隔壁老王国产在线精品 | 久久精品一区二区三区资源网 | 女子监狱第二季未删减在线看 | 91在线老王精品免费播放 | 亚洲系列国产系列 | 青青精品视频 | 亚洲成年|