一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - springboot整合rabbitmq的示例代碼

springboot整合rabbitmq的示例代碼

2021-03-03 13:59胡運凡 JAVA教程

本篇文章主要介紹了springboot整合rabbitmq的示例代碼,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

概述

  1. rabbitmq是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據,或者簡單地將作業隊列以便讓分布式服務器進行處理。
  2. 它現實了amqp協議,并且遵循mozilla public license開源協議,它支持多種語言,可以方便的和spring集成。
  3. 消息隊列使用消息將應用程序連接起來,這些消息通過像rabbitmq這樣的消息代理服務器在應用程序之間路由。

基本概念

broker

用來處理數據的消息隊列服務器實體

vhost

由rabbitmq服務器創建的虛擬消息主機,擁有自己的權限機制,一個broker里可以開設多個vhost,用于不同用戶的權限隔離,vhost之間是也完全隔離的。

productor

產生用于消息通信的數據

channel

消息通道,在amqp中可以建立多個channel,每個channel代表一個會話任務。

exchange

direct

轉發消息到routing-key指定的隊列

springboot整合rabbitmq的示例代碼fanout

fanout

轉發消息到所有綁定的隊列,類似于一種廣播發送的方式。

springboot整合rabbitmq的示例代碼topic

topic

按照規則轉發消息,這種規則多為模式匹配,也顯得更加靈活

springboot整合rabbitmq的示例代碼queue

queue

  1.  隊列是rabbitmq的內部對象,存儲消息
  2. 以動態的增加消費者,隊列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個消費者。

binding

表示交換機和隊列之間的關系,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相匹配。

consumer

監聽消息隊列來進行消息數據的讀取

springboot下三種exchange模式(fanout,direct,topic)實現

pom.xml中引用spring-boot-starter-amqp

?
1
2
3
4
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

增加rabbitmq配置

?
1
2
3
4
5
6
spring:
 rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest

direct

direct模式一般情況下只需要定義queue 使用自帶交換機(defaultexchange)無需綁定交換機

?
1
2
3
4
5
6
7
8
9
  @configuration
public class rabbitp2pconfigure { 
 public static final string queue_name = "p2p-queue";
  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }
 
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
@slf4j
public class rabbittest {
  @autowired
  private amqptemplate amqptemplate;
 
  /**
  * 發送
  */
  @test
  public void sendlazy() throws interruptedexception {
    city city = new city(234556666l, "direct_name", "direct_code");
    amqptemplate.convertandsend(rabbitlazyconfigure.queue_name, city);
  }
  
  /**
  * 領取
  */
  @test
  public void receive() throws interruptedexception {
    object obj = amqptemplate.receiveandconvert(rabbitlazyconfigure.queue_name);
    assert.notnull(obj, "");
    log.debug(obj.tostring());
  }
}

適用場景:點對點

fanout

fanout則模式需要將多個queue綁定在同一個交換機上

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@configuration
public class rabbitfanoutconfigure {
  public static final string exchange_name = "fanout-exchange";
  public static final string fanout_a = "fanout.a";
  public static final string fanout_b = "fanout.b";
  public static final string fanout_c = "fanout.c";
 
  @bean
  public queue amessage() {
    return new queue(fanout_a);
  }
 
  @bean
  public queue bmessage() {
    return new queue(fanout_b);
  }
 
  @bean
  public queue cmessage() {
    return new queue(fanout_c);
  }
 
  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(exchange_name);
  }
 
  @bean
  public binding bindingexchangea(queue amessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(amessage).to(fanoutexchange);
  }
 
  @bean
  public binding bindingexchangeb(queue bmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(bmessage).to(fanoutexchange);
  }
 
  @bean
  public binding bindingexchangec(queue cmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(cmessage).to(fanoutexchange);
  }
 
}

發送者

?
1
2
3
4
5
6
7
8
9
10
11
12
@slf4j
public class sender {
 
  @autowired
  private amqptemplate rabbittemplate;
 
  public void sendfanout(object message) {
    log.debug("begin send fanout message<" + message + ">");
    rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name, "", message);
  }
 
}

我們可以通過@rabbitlistener監聽多個queue來進行消費

?
1
2
3
4
5
6
7
8
9
10
11
12
13
@slf4j
@rabbitlistener(queues = {
    rabbitfanoutconfigure.fanout_a,
    rabbitfanoutconfigure.fanout_b,
    rabbitfanoutconfigure.fanout_c
})
public class receiver {
 
  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

適用場景
- 大規模多用戶在線(mmo)游戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發消息給參與群聊的用戶

topic

這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”,exchange會將消息轉發到所有關注主題能與routekey模糊匹配的隊列。

在進行綁定時,要提供一個該隊列關心的主題,如“topic.# (“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。 )

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@configuration
public class rabbittopicconfigure {
  public static final string exchange_name = "topic-exchange";
  public static final string topic = "topic";
  public static final string topic_a = "topic.a";
  public static final string topic_b = "topic.b";
 
  @bean
  public queue queuetopic() {
    return new queue(rabbittopicconfigure.topic);
  }
 
  @bean
  public queue queuetopica() {
    return new queue(rabbittopicconfigure.topic_a);
  }
 
  @bean
  public queue queuetopicb() {
    return new queue(rabbittopicconfigure.topic_b);
  }
 
  @bean
  public topicexchange exchange() {
    topicexchange topicexchange = new topicexchange(exchange_name);
    topicexchange.setdelayed(true);
    return new topicexchange(exchange_name);
  }
 
  @bean
  public binding bindingexchangetopic(queue queuetopic, topicexchange exchange) {
    return bindingbuilder.bind(queuetopic).to(exchange).with(rabbittopicconfigure.topic);
  }
 
  @bean
  public binding bindingexchangetopics(queue queuetopica, topicexchange exchange) {
    return bindingbuilder.bind(queuetopica).to(exchange).with("topic.#");
  }
}

同時去監聽三個queue

?
1
2
3
4
5
6
7
8
9
10
11
12
@slf4j
@rabbitlistener(queues = {
    rabbittopicconfigure.topic,
    rabbittopicconfigure.topic_a,
    rabbittopicconfigure.topic_b
})
public class receiver {
  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

通過測試我們可以發現

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
public class rabbittest {
  @autowired
  private amqptemplate rabbittemplate;
 
  @test
  public void sendall() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, "topic.test", "send all");
  }
 
  @test
  public void sendtopic() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "send topic");
  }
 
  @test
  public void sendtopica() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "send topica");
  }
}

適用場景
- 分發有關于特定地理位置的數據,例如銷售點
- 由多個工作者(workers)完成的后臺任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他類型的金融數據更新)
- 涉及到分類或者標簽的新聞更新(例如,針對特定的運動項目或者隊伍)
- 云端的不同種類服務的協調
- 分布式架構/基于系統的軟件封裝,其中每個構建者僅能處理一個特定的架構或者系統。

延遲隊列

延遲消費:

  1. 如用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。
  2. 用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。

延遲重試:

  1. 如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。
  2. 如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便于開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。

設置交換機延遲屬性為true

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@configuration
public class rabbitlazyconfigure {
  public static final string queue_name = "lazy-queue-t";
  public static final string exchange_name = "lazy-exchange-t";
 
  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }
 
  @bean
  public directexchange defaultexchange() {
    directexchange directexchange = new directexchange(exchange_name, true, false);
    directexchange.setdelayed(true);
    return directexchange;
  }
 
  @bean
  public binding binding() {
    return bindingbuilder.bind(queue()).to(defaultexchange()).with(queue_name);
  }
 
}

發送時設置延遲時間即可

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@slf4j
public class sender {
  @autowired
  private amqptemplate rabbittemplate;
  public void sendlazy(object msg) {
    log.debug("begin send lazy message<" + msg + ">");
    rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name,
        rabbitlazyconfigure.queue_name, msg, message -> {
          message.getmessageproperties().setheader("x-delay", 10000);
          return message;
        }
    );
  }
}

結束

各種使用案例請直接查看 官方文檔

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.cnblogs.com/huyunfan/p/8024131.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产特黄a级在线视频 | 思敏1一5集国语版免费观看 | 久久热这里只有 精品 | 欧美男人的天堂 | 亚洲国产日韩欧美在线vip1区 | 跪趴好紧h | 操bb| 乌克兰黄色录像 | 无码一区二区三区视频 | 欧美成人午夜片一一在线观看 | 国模大胆一区二区三区 | 国产青草视频在线观看免费影院 | 三极黄色| 久久WWW免费人成一看片 | 欧美整片完整片视频在线 | 女主被当众调教虐np | 国产小视频在线免费观看 | 91日本 | japan孕妇孕交 | 色婷婷在线视频 | 蜜桃成熟3在线观看 | 精品久久香蕉国产线看观看麻豆 | 日本一区二区三区久久 | 精品国产乱码久久久久久人妻 | 鬼吹灯之天星术免费观看 | 99pao在线视频精品免费 | 人禽l交免费视频观看+视频 | 国产精品1 | 欧美日韩高清完整版在线观看免费 | 亚州精品永久观看视频 | 卫生间被教官做好爽HH视频 | 香蕉久久久久久狠狠色 | 国产精品边做边接电话在线观看 | 色综合综合 | ffyybb免费福利视频 | 青青草在线播放 | 传说之下羊妈挤羊奶网站 | 精品久久国产 | 厨房里摸着乳丰满在线观看 | 无套内射在线观看THEPORN | 婷婷中文|