概述
- rabbitmq是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據,或者簡單地將作業隊列以便讓分布式服務器進行處理。
- 它現實了amqp協議,并且遵循mozilla public license開源協議,它支持多種語言,可以方便的和spring集成。
- 消息隊列使用消息將應用程序連接起來,這些消息通過像rabbitmq這樣的消息代理服務器在應用程序之間路由。
基本概念
broker
用來處理數據的消息隊列服務器實體
vhost
由rabbitmq服務器創建的虛擬消息主機,擁有自己的權限機制,一個broker里可以開設多個vhost,用于不同用戶的權限隔離,vhost之間是也完全隔離的。
productor
產生用于消息通信的數據
channel
消息通道,在amqp中可以建立多個channel,每個channel代表一個會話任務。
exchange
direct
轉發消息到routing-key指定的隊列
fanout
fanout
轉發消息到所有綁定的隊列,類似于一種廣播發送的方式。
topic
topic
按照規則轉發消息,這種規則多為模式匹配,也顯得更加靈活
queue
queue
- 隊列是rabbitmq的內部對象,存儲消息
- 以動態的增加消費者,隊列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個消費者。
binding
表示交換機和隊列之間的關系,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相匹配。
consumer
監聽消息隊列來進行消息數據的讀取
springboot下三種exchange模式(fanout,direct,topic)實現
pom.xml中引用spring-boot-starter-amqp
1
2
3
4
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> |
增加rabbitmq配置
1
2
3
4
5
6
|
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest |
direct
direct模式一般情況下只需要定義queue 使用自帶交換機(defaultexchange)無需綁定交換機
1
2
3
4
5
6
7
8
9
|
@configuration public class rabbitp2pconfigure { public static final string queue_name = "p2p-queue" ; @bean public queue queue() { return new queue(queue_name, true ); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@runwith (springrunner. class ) @springboottest (classes = bootcoretestapplication. class ) @slf4j public class rabbittest { @autowired private amqptemplate amqptemplate; /** * 發送 */ @test public void sendlazy() throws interruptedexception { city city = new city(234556666l, "direct_name" , "direct_code" ); amqptemplate.convertandsend(rabbitlazyconfigure.queue_name, city); } /** * 領取 */ @test public void receive() throws interruptedexception { object obj = amqptemplate.receiveandconvert(rabbitlazyconfigure.queue_name); assert .notnull(obj, "" ); log.debug(obj.tostring()); } } |
適用場景:點對點
fanout
fanout則模式需要將多個queue綁定在同一個交換機上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
@configuration public class rabbitfanoutconfigure { public static final string exchange_name = "fanout-exchange" ; public static final string fanout_a = "fanout.a" ; public static final string fanout_b = "fanout.b" ; public static final string fanout_c = "fanout.c" ; @bean public queue amessage() { return new queue(fanout_a); } @bean public queue bmessage() { return new queue(fanout_b); } @bean public queue cmessage() { return new queue(fanout_c); } @bean public fanoutexchange fanoutexchange() { return new fanoutexchange(exchange_name); } @bean public binding bindingexchangea(queue amessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(amessage).to(fanoutexchange); } @bean public binding bindingexchangeb(queue bmessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(bmessage).to(fanoutexchange); } @bean public binding bindingexchangec(queue cmessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(cmessage).to(fanoutexchange); } } |
發送者
1
2
3
4
5
6
7
8
9
10
11
12
|
@slf4j public class sender { @autowired private amqptemplate rabbittemplate; public void sendfanout(object message) { log.debug( "begin send fanout message<" + message + ">" ); rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name, "" , message); } } |
我們可以通過@rabbitlistener監聽多個queue來進行消費
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@slf4j @rabbitlistener (queues = { rabbitfanoutconfigure.fanout_a, rabbitfanoutconfigure.fanout_b, rabbitfanoutconfigure.fanout_c }) public class receiver { @rabbithandler public void receivemessage(string message) { log.debug( "received <" + message + ">" ); } } |
適用場景
- 大規模多用戶在線(mmo)游戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發消息給參與群聊的用戶
topic
這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”,exchange會將消息轉發到所有關注主題能與routekey模糊匹配的隊列。
在進行綁定時,要提供一個該隊列關心的主題,如“topic.# (“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@configuration public class rabbittopicconfigure { public static final string exchange_name = "topic-exchange" ; public static final string topic = "topic" ; public static final string topic_a = "topic.a" ; public static final string topic_b = "topic.b" ; @bean public queue queuetopic() { return new queue(rabbittopicconfigure.topic); } @bean public queue queuetopica() { return new queue(rabbittopicconfigure.topic_a); } @bean public queue queuetopicb() { return new queue(rabbittopicconfigure.topic_b); } @bean public topicexchange exchange() { topicexchange topicexchange = new topicexchange(exchange_name); topicexchange.setdelayed( true ); return new topicexchange(exchange_name); } @bean public binding bindingexchangetopic(queue queuetopic, topicexchange exchange) { return bindingbuilder.bind(queuetopic).to(exchange).with(rabbittopicconfigure.topic); } @bean public binding bindingexchangetopics(queue queuetopica, topicexchange exchange) { return bindingbuilder.bind(queuetopica).to(exchange).with( "topic.#" ); } } |
同時去監聽三個queue
1
2
3
4
5
6
7
8
9
10
11
12
|
@slf4j @rabbitlistener (queues = { rabbittopicconfigure.topic, rabbittopicconfigure.topic_a, rabbittopicconfigure.topic_b }) public class receiver { @rabbithandler public void receivemessage(string message) { log.debug( "received <" + message + ">" ); } } |
通過測試我們可以發現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@runwith (springrunner. class ) @springboottest (classes = bootcoretestapplication. class ) public class rabbittest { @autowired private amqptemplate rabbittemplate; @test public void sendall() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, "topic.test" , "send all" ); } @test public void sendtopic() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "send topic" ); } @test public void sendtopica() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "send topica" ); } } |
適用場景
- 分發有關于特定地理位置的數據,例如銷售點
- 由多個工作者(workers)完成的后臺任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他類型的金融數據更新)
- 涉及到分類或者標簽的新聞更新(例如,針對特定的運動項目或者隊伍)
- 云端的不同種類服務的協調
- 分布式架構/基于系統的軟件封裝,其中每個構建者僅能處理一個特定的架構或者系統。
延遲隊列
延遲消費:
- 如用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。
- 用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。
延遲重試:
- 如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。
- 如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便于開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。
設置交換機延遲屬性為true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@configuration public class rabbitlazyconfigure { public static final string queue_name = "lazy-queue-t" ; public static final string exchange_name = "lazy-exchange-t" ; @bean public queue queue() { return new queue(queue_name, true ); } @bean public directexchange defaultexchange() { directexchange directexchange = new directexchange(exchange_name, true , false ); directexchange.setdelayed( true ); return directexchange; } @bean public binding binding() { return bindingbuilder.bind(queue()).to(defaultexchange()).with(queue_name); } } |
發送時設置延遲時間即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@slf4j public class sender { @autowired private amqptemplate rabbittemplate; public void sendlazy(object msg) { log.debug( "begin send lazy message<" + msg + ">" ); rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name, rabbitlazyconfigure.queue_name, msg, message -> { message.getmessageproperties().setheader( "x-delay" , 10000 ); return message; } ); } } |
結束
各種使用案例請直接查看 官方文檔
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/huyunfan/p/8024131.html