一、簡(jiǎn)介
在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái),由于該主題中產(chǎn)生的消息會(huì)被所有實(shí)例監(jiān)聽(tīng)和消費(fèi),所以我們稱(chēng)它為消息總線(xiàn)。
二、消息代理
消息代理(message broker)是一種消息驗(yàn)證、傳輸、路由的架構(gòu)模式。它在應(yīng)用程序之間起到通信調(diào)度并最小化應(yīng)用之間的依賴(lài)的作用,使得應(yīng)用程序可以高效地解耦通信過(guò)程。消息代理是一個(gè)中間件產(chǎn)品,它的核心是一個(gè)消息的路由程序,用來(lái)實(shí)現(xiàn)接收和分發(fā)消息, 并根據(jù)設(shè)定好的消息處理流來(lái)轉(zhuǎn)發(fā)給正確的應(yīng)用。 它包括獨(dú)立的通信和消息傳遞協(xié)議,能夠?qū)崿F(xiàn)組織內(nèi)部和組織間的網(wǎng)絡(luò)通信。設(shè)計(jì)代理的目的就是為了能夠從應(yīng)用程序中傳入消息,并執(zhí)行一些特別的操作,下面這些是在企業(yè)應(yīng)用中,我們經(jīng)常需要使用消息代理的場(chǎng)景:
- 將消息路由到一個(gè)或多個(gè)目的地。
- 消息轉(zhuǎn)化為其他的表現(xiàn)方式。
- 執(zhí)行消息的聚集、消息的分解,并將結(jié)果發(fā)送到它們的目的地,然后重新組合響應(yīng)返回給消息用戶(hù)。
- 調(diào)用web服務(wù)來(lái)檢索數(shù)據(jù)。
- 響應(yīng)事件或錯(cuò)誤。
- 使用發(fā)布-訂閱模式來(lái)提供內(nèi)容或基千主題的消息路由。
目前已經(jīng)有非常多的開(kāi)源產(chǎn)品可以供大家使用, 比如:
- activemqkafka
- rabbitmq
- rocketmq
- 等......
三、springcloud+rabbitmq
(1)rabbitmq簡(jiǎn)介、安裝不贅述。
(2)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
|
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> |
(3)application.yml
1
2
3
4
5
6
7
8
|
spring: application: name: rabbitmq-hello rabbitmq: host: ***.***.***.*** port: 5672 username: guest password: guest |
(4)發(fā)送者sender
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@component public class sender { private static final logger log = loggerfactory.getlogger(sender. class ); @autowired private amqptemplate amqptemplate; public void send() { string context = "hello " + new date(); log.info( "sender : " + context); this .amqptemplate.convertandsend( "hello" , context); } } |
(5)接受者receiver
1
2
3
4
5
6
7
8
9
10
11
|
@component @rabbitlistener (queues = "hello" ) public class receiver { private static final logger log = loggerfactory.getlogger(receiver. class ); @rabbithandler public void process(string hello) { log.info( "receiver : " + hello); } } |
(6)創(chuàng)建rabbitmq的配置類(lèi) rabbitconfig
1
2
3
4
5
6
7
8
|
@configuration public class rabbitconfig { @bean public queue helloqueue(){ return new queue( "hello" ); } } |
(7)創(chuàng)建單元測(cè)試類(lèi), 用來(lái)調(diào)用消息生產(chǎn)
1
2
3
4
5
6
7
8
9
10
11
12
|
@runwith (springjunit4classrunner. class ) @springboottest (classes = springcloudbusrabbitmqapplication. class ) public class helloapplicationtests { @autowired private sender sender; @test public void hello() throws exception { sender.send(); } } |
(8)測(cè)試,執(zhí)行helloapplicationtests
(9)訪(fǎng)問(wèn)host:15672
四、改造config-client(整合springcloud bus)
(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-config</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-eureka</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-bus-amqp</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-actuator</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> |
(2)bootstrap.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring.application.name=configspace spring.cloud.config.label=master spring.cloud.config.profile=dev spring.cloud.config.uri= http: //localhost:5588/ eureka.client.serviceurl.defaultzone=http: //localhost:5555/eureka/ server.port= 5589 spring.rabbitmq.host= 118.89 . 237.88 spring.rabbitmq.port= 5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest management.security.enabled= false |
(3)其他不用改變
五、測(cè)試
(1)測(cè)試準(zhǔn)備
一個(gè)服務(wù)注冊(cè)中心,eurekaserver,端口為5555;
一個(gè)分布式配置中心,configserver,端口為5588;
二個(gè)分布式配置,configclient,端口為5589、5590;(2)訪(fǎng)問(wèn)http://localhost:5589/from
(3)訪(fǎng)問(wèn)http://localhost:5590/from
rabbitmq:
(4)去倉(cāng)庫(kù)修改password的值
1
2
3
|
from=git-dev-v1. 0 by springcloud config-server username=springcloud password= 1234567890 |
(5)post請(qǐng)求http://localhost:5589/bus/refresh或者h(yuǎn)ttp://localhost:5590/bus/refresh
成功請(qǐng)求后config-client會(huì)重新讀取配置文件
(6)再次訪(fǎng)問(wèn)
- 如果post請(qǐng)求的是:http://localhost:5589/bus/refresh,請(qǐng)?jiān)L問(wèn)http://localhost:5590/from
- 如果訪(fǎng)問(wèn)出現(xiàn)401,則配置需要加上management.security.enabled=false
如果post請(qǐng)求的是:http://localhost:5590/bus/refresh,請(qǐng)?jiān)L問(wèn)http://localhost:5589/from
另/bus/refresh接口可以指定服務(wù),即使用“username”參數(shù),比如 “/bus/refresh?destination=username:**”即刷新服務(wù)名為username的所有服務(wù),不管ip地址。
(7)架構(gòu)
(8)架構(gòu)調(diào)整
既然springcloud bus的/bus/refresh接口提供了針對(duì)服務(wù)和實(shí)例進(jìn)行配置更新的參數(shù),那么我們的架構(gòu)也可以相應(yīng)做出一些調(diào)整。在之前的架構(gòu)中,服務(wù)的配置更新需要通過(guò)向具體服務(wù)中的某個(gè)實(shí)例發(fā)送請(qǐng)求,再觸發(fā)對(duì)整個(gè)服務(wù)集群的配置更新。雖然能實(shí)現(xiàn)功能,但是這樣的結(jié)果是,我們指定的應(yīng)用實(shí)例會(huì)不同千集群中的其他應(yīng)用實(shí)例,這樣會(huì)增加集群內(nèi)部的復(fù)雜度,不利于將來(lái)的運(yùn)維工作。比如, 需要對(duì)服務(wù)實(shí)例進(jìn)行遷移,那么我們不得不修改web hook中的配置等。所以要盡可能地讓服務(wù)集群中的各個(gè)節(jié)點(diǎn)是對(duì)等的。
因此, 我們將之前的架構(gòu)做了 一些調(diào)整, 如下圖所示:
主要做了以下這些改動(dòng):
- 在configserver中也引入springcloud bus,將配置服務(wù)端也加入到消息總線(xiàn)中來(lái)。
- /bus/refresh請(qǐng)求不再發(fā)送到具體服務(wù)實(shí)例上,而是發(fā)送給config server,并通過(guò)des巨nation參數(shù)來(lái)指定需要更新配置的服務(wù)或?qū)嵗?/li>
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/smartdt/article/details/79073765