1.Topic交換器介紹
Topic Exchange 轉發消息主要是根據通配符。 在這種交換機下,隊列和交換機的綁定會定義一種路由模式,那么,通配符就要在這種路由模式和路由鍵之間匹配后交換機才能轉發消息。
在這種交換機模式下:
路由鍵必須是一串字符,用句號(.) 隔開,比如說 agreements.us,或者 agreements.eu.stockholm 等。
路由模式必須包含一個 星號(*),主要用于匹配路由鍵指定位置的一個單詞,比如說,一個路由模式是這樣子:agreements..b.*,那么就只能匹配路由鍵是這樣子的:第一個單詞是 agreements,第四個單詞是 b。 井號(#)就表示相當于一個或者多個單詞,例如一個匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin開頭的路由鍵都是可以的。
具體代碼發送的時候還是一樣,第一個參數表示交換機,第二個參數表示routing key,第三個參數即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 類似, 只是匹配上支持了"模式", 在"點分"的 routing_key 形式中, 可以使用兩個通配符:
*表示一個詞.
#表示零個或多個詞.
如上圖所示:此類交換器使得來自不同的源頭的消息可以到達一個對列,其實說的更明白一點就是模糊匹配的意思,例如:上圖中紅色對列的routekey為usa.#,#代表匹配任意字符,但是要想消息能到達此對列,usa.必須匹配后面的#好可以隨意。圖中usa.news,usa.weather都能找到紅色隊列,符號“#”匹配一個或多個詞,符號“”匹配不多不少一個詞。因此“usa.#”能夠匹配到“usa.news.XXX”,但是“usa.” 只會匹配到“usa.XXX”。
注:交換器說到底是一個名稱與隊列綁定的列表。當消息發布到交換器時,實際上是由你所連接的信道,將消息路由鍵同交換器上綁定的列表進行比較,最后路由消息
2.示例代碼
1).RabbitMQ的Topic的bean配置
RabbitTopic.java類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.example.rabbitmqtopic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTopic { final static String message = "topic.message" ; final static String messages = "topic.messages" ; //創建隊列 @Bean public Queue queueMessage() { return new Queue(RabbitTopic.message); } //創建隊列 @Bean public Queue queueMessages() { return new Queue(RabbitTopic.messages); } //創建交換器 @Bean TopicExchange exchange() { return new TopicExchange( "topicExchange" ); } //對列綁定并關聯到ROUTINGKEY @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with( "topic.message" ); } //對列綁定并關聯到ROUTINGKEY @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with( "topic.#" ); //*表示一個詞,#表示零個或多個詞 } } |
2).消息生產者生產消息
TopicSender.java類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package com.example.rabbitmqtopic.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message all" ; System.out.println( "Sender : " + context); this .rabbitTemplate.convertAndSend( "topicExchange" , "topic.1" , context); } public void send1() { String context = "hi, i am message 1" ; System.out.println( "Sender : " + context); this .rabbitTemplate.convertAndSend( "topicExchange" , "topic.message" , context); } public void send2() { String context = "hi, i am messages 2" ; System.out.println( "Sender : " + context); this .rabbitTemplate.convertAndSend( "topicExchange" , "topic.messages" , context); } } |
3).消息消費者
TopicReceiver.java類:
1
2
3
4
5
6
7
8
9
10
11
12
|
package com.example.rabbitmqtopic.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener (queues = "topic.message" ) public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println( "Topic Receiver1 : " + message); } } |
TopicReceiver2.java類:
1
2
3
4
5
6
7
8
9
10
11
12
|
package com.example.rabbitmqtopic.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener (queues = "topic.messages" ) public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println( "Topic Receiver2 : " + message); } } |
4).測試
RabbitMQTopicTest.java類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package com.example.rabbitmqtopic.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith (SpringRunner. class ) @SpringBootTest public class RabbitMQTopicTest { @Autowired private TopicSender sender; @Test public void topic() throws Exception { sender.send(); } @Test public void topic1() throws Exception { sender.send1(); } @Test public void topic2() throws Exception { sender.send2(); } } |
以上所述是小編給大家介紹的Spring Boot整合RabbitMQ實例(Topic模式),希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:http://www.cnblogs.com/web424/p/6767314.html