簡介
rabbitmq是實現amqp(高級消息隊列協議)的消息中間件的一種,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗
概念:
- 生產者 消息的產生方,負責將消息推送到消息隊列
- 消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息
- 隊列 消息的寄存器,負責存放生產者發送的消息
- 交換機 負責根據一定規則分發生產者產生的消息
- 綁定 完成交換機和隊列之間的綁定
模式:
- direct:直連模式,用于實例間的任務分發
- topic:話題模式,通過可配置的規則分發給綁定在該exchange上的隊列
- headers:適用規則復雜的分發,用headers里的參數表達規則
- fanout:分發給所有綁定到該exchange上的隊列,忽略routing key
springboot集成rabbitmq
一、引入maven依賴
1
2
3
4
5
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> <version> 1.5 . 2 .release</version> </dependency> |
二、配置application.properties
1
2
3
4
5
6
|
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualhost = /message-test/ |
三、編寫amqpconfiguration配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
package message.test.configuration; import org.springframework.amqp.core.acknowledgemode; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.rabbitproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class amqpconfiguration { /** * 消息編碼 */ public static final string message_encoding = "utf-8" ; public static final string exchange_issue = "exchange_message_issue" ; public static final string queue_issue_user = "queue_message_issue_user" ; public static final string queue_issue_all_user = "queue_message_issue_all_user" ; public static final string queue_issue_all_device = "queue_message_issue_all_device" ; public static final string queue_issue_city = "queue_message_issue_city" ; public static final string routing_key_issue_user = "routing_key_message_issue_user" ; public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user" ; public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device" ; public static final string routing_key_issue_city = "routing_key_message_issue_city" ; public static final string exchange_push = "exchange_message_push" ; public static final string queue_push_result = "queue_message_push_result" ; @autowired private rabbitproperties rabbitproperties; @bean public queue issueuserqueue() { return new queue(queue_issue_user); } @bean public queue issuealluserqueue() { return new queue(queue_issue_all_user); } @bean public queue issuealldevicequeue() { return new queue(queue_issue_all_device); } @bean public queue issuecityqueue() { return new queue(queue_issue_city); } @bean public queue pushresultqueue() { return new queue(queue_push_result); } @bean public directexchange issueexchange() { return new directexchange(exchange_issue); } @bean public directexchange pushexchange() { // 參數1:隊列 // 參數2:是否持久化 // 參數3:是否自動刪除 return new directexchange(exchange_push, true , true ); } @bean public binding issueuserqueuebinding( @qualifier ( "issueuserqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user); } @bean public binding issuealluserqueuebinding( @qualifier ( "issuealluserqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user); } @bean public binding issuealldevicequeuebinding( @qualifier ( "issuealldevicequeue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device); } @bean public binding issuecityqueuebinding( @qualifier ( "issuecityqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city); } @bean public binding pushresultqueuebinding( @qualifier ( "pushresultqueue" ) queue queue, @qualifier ( "pushexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).withqueuename(); } @bean public connectionfactory defaultconnectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(rabbitproperties.gethost()); connectionfactory.setport(rabbitproperties.getport()); connectionfactory.setusername(rabbitproperties.getusername()); connectionfactory.setpassword(rabbitproperties.getpassword()); connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost()); return connectionfactory; } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory( @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory); factory.setacknowledgemode(acknowledgemode.manual); return factory; } @bean public amqptemplate rabbittemplate( @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } } |
三、編寫生產者
1
2
3
|
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding); rabbittemplate.convertandsend(amqpconfiguration.exchange_issue, amqpconfiguration.routing_key_issue_user, body); |
四、編寫消費者
1
2
3
4
5
|
@rabbitlistener (queues = amqpconfiguration.queue_push_result) public void handlepushresult( @payload byte [] data, channel channel, @header (amqpheaders.delivery_tag) long deliverytag) { } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://segmentfault.com/a/1190000018555963