amqp協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由exchange來接收,然后exchange按照特定的策略轉發到queue進行存儲。同理,消費者也是如此。exchange 就類似于一個交換機,轉發各個消息分發到相應的隊列中。
rabbitmq提供了四種exchange模式:fanout,direct,topic,header 。 header模式在實際使用中較少,本文只對前三種模式進行比較。
一. fanout exchange
所有發送到fanout exchange的消息都會被轉發到與該exchange 綁定(binding)的所有queue上。
fanout exchange 不需要處理routekey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每臺子網內的主機都獲得了一份復制的消息。
所以,fanout exchange 轉發消息是最快的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
/// <summary> /// 生產者 /// </summary> /// <param name="change"></param> private static void producermessage(mymessage msg) { var advancedbus = createadvancedbus(); if (advancedbus.isconnected) { var exchange = advancedbus.exchangedeclare( "user" , exchangetype.fanout); advancedbus.publish(exchange, "" , false , new message<mymessage>(msg)); } else { console.writeline( "can't connect" ); } } /// <summary> /// 消費者 /// </summary> private static void consumemessage() { var advancedbus = createadvancedbus(); var exchange = advancedbus.exchangedeclare( "user" , exchangetype.fanout); var queue = advancedbus.queuedeclare( "user.notice.wangwu" ); advancedbus.bind(exchange, queue, "user.notice.wangwu" ); advancedbus.consume(queue, registration => { registration.add<mymessage>((message, info) => { console.writeline( "body: {0}" , message.body); }); }); } |
適用場景:
第一:大型玩家在玩在線游戲的時候,可以用它來廣播重大消息。這讓我想到電影微微一笑很傾城中,有款游戲需要在世界上公布玩家重大消息,也許這個就是用的mq實現的。這讓我不禁佩服肖奈,人家在大學的時候就知道rabbitmq的這種特性了。
第二:體育新聞實時更新到手機客戶端。
第三:群聊功能,廣播消息給當前群聊中的所有人。
二. direct exchange
所有發送到direct exchange的消息被轉發到routekey中指定的queue。
direct模式,可以使用rabbitmq自帶的exchange:default exchange 。所以不需要將exchange進行任何綁定(binding)操作 。消息傳遞時,routekey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
/// <summary> /// 生產者 /// </summary> /// <param name="change"></param> private static void producermessage(mymessage msg) { var advancedbus = createadvancedbus(); if (advancedbus.isconnected) { var queue = advancedbus.queuedeclare( "user.notice.zhangsan" ); advancedbus.publish(exchange.getdefault(), queue.name, false , new message<mymessage>(msg)); } else { console.writeline( "can't connect" ); } } /// <summary> /// 消費者 /// </summary> private static void consumemessage() { var advancedbus = createadvancedbus(); var exchange = advancedbus.exchangedeclare( "user" , exchangetype.direct); var queue = advancedbus.queuedeclare( "user.notice.lisi" ); advancedbus.bind(exchange, queue, "user.notice.lisi" ); advancedbus.consume(queue, registration => { registration.add<mymessage>((message, info) => { console.writeline( "body: {0}" , message.body); }); }); } |
三. topic exchange
所有發送到topic exchange的消息被轉發到所有關心routekey中指定topic的queue上,
exchange 將routekey 和某topic 進行模糊匹配。此時隊列需要綁定一個topic。可以使用通配符進行模糊匹配,符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“log.#”能夠匹配到“log.info.oa”,但是“log.*” 只會匹配到“log.error”。
所以,topic exchange 使用非常靈活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
/// <summary> /// 生產者 /// </summary> /// <param name="change"></param> private static void producermessage(mymessage msg) { //// 創建消息bus ibus bus = createbus(); try { bus.publish(msg, x => x.withtopic(msg.messagerouter)); } catch (easynetqexception ex) { //處理連接消息服務器異常 } bus.dispose(); //與數據庫connection類似,使用后記得銷毀bus對象 } /// <summary> /// 消費者 /// </summary> private static void consumemessage(mymessage msg) { //// 創建消息bus ibus bus = createbus(); try { bus.subscribe<mymessage>(msg.messagerouter, message => console.writeline(msg.messagebody), x => x.withtopic( "user.notice.#" )); } catch (easynetqexception ex) { //處理連接消息服務器異常 } } |
使用場景:
新聞的分類更新
同意任務多個工作者協調完成
同一問題需要特定人員知曉
topic exchange的使用場景很多,我們公司就在使用這種模式,將足球事件信息發布,需要使用這些事件消息的人只需要綁定對應的exchange就可以獲取最新消息。
以上這篇基于rabbitmq幾種exchange 模式詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/wangzhongqiu/p/7792112.html