一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

2021-02-14 22:55夜有所思,日有所夢 Java教程

本篇文章主要介紹了Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例,具有一定的參考價值,感興趣的小伙伴們可以參考一下

背景

何為延遲隊列

顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。

場景一:在訂單系統(tǒng)中,一個用戶下單之后通常有30分鐘的時間進行支付,如果30分鐘之內(nèi)沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延時隊列將訂單信息發(fā)送到延時隊列。

場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發(fā)送到延時隊列,當指令設定的時間到了再將指令推送到只能設備。

延遲隊列能做什么?

延遲隊列多用于需要延遲工作的場景。最常見的是以下兩種場景:

1、延遲消費。比如:

  1. 用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態(tài),如果訂單仍未支付則需要及時地關閉訂單。
  2. 用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來提醒用戶使用。

2、延遲重試。比如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。

如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務便于開發(fā)人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。

如何實現(xiàn)?

別急,在下文中,我們將詳細介紹如何利用spring boot加rabbitmq來實現(xiàn)延遲隊列。

本文出現(xiàn)的示例代碼都已push到github倉庫中:https://github.com/lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

實現(xiàn)思路

在介紹具體的實現(xiàn)思路之前,我們先來介紹一下rabbitmq的兩個特性,一個是time-to-live extensions,另一個是dead letter exchanges。

time-to-live extensions

rabbitmq允許我們?yōu)橄⒒蛘哧犃性O置ttl(time to live),也就是過期時間。ttl表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了ttl或者當某條消息進入了設置了ttl的隊列時,這條消息會在經(jīng)過ttl秒后“死亡”,成為dead letter。如果既配置了消息的ttl,又配置了隊列的ttl,那么較小的那個值會被取用。更多資料請查閱官方文檔。

dead letter exchange

剛才提到了,被設置了ttl的消息在過期后會成為dead letter。其實在rabbitmq中,一共有三種消息的“死亡”形式:

  1. 消息被拒絕。通過調(diào)用basic.reject或者basic.nack并且設置的requeue參數(shù)為false。
  2. 消息因為設置了ttl而過期。
  3. 消息進入了一條已經(jīng)達到最大長度的隊列。

如果隊列設置了dead letter exchange(dlx),那么這些dead letter就會被重新publish到dead letter exchange,通過dead letter exchange路由到其他隊列。更多資料請查閱官方文檔。

流程圖

聰明的你肯定已經(jīng)想到了,如何將rabbitmq的ttl和dlx特性結(jié)合在一起,實現(xiàn)一個延遲隊列。

針對于上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:

延遲消費

延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產(chǎn)者產(chǎn)生的消息首先會進入緩沖隊列(圖中紅色隊列)。通過rabbitmq提供的ttl擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之后,這些消息會通過配置好的dlx轉(zhuǎn)發(fā)到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

 Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

延遲重試

延遲重試本質(zhì)上也是延遲消費的一種,但是這種模式的結(jié)構(gòu)與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。

如下圖所示,消費者發(fā)現(xiàn)該消息處理出現(xiàn)了異常,比如是因為網(wǎng)絡波動引起的異常。那么如果不等待一段時間,直接就重試的話,很可能會導致在這期間內(nèi)一直無法成功,造成一定的資源浪費。那么我們可以將其先放在緩沖隊列中(圖中紅色隊列),等消息經(jīng)過一段的延遲時間后再次進入實際消費隊列中(圖中藍色隊列),此時由于已經(jīng)過了“較長”的時間了,異常的一些波動通常已經(jīng)恢復,這些消息可以被正常地消費。

Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例  

代碼實現(xiàn)

接下來我們將介紹如何在spring boot中實現(xiàn)基于rabbitmq的延遲隊列。我們假設讀者已經(jīng)擁有了spring boot與rabbitmq的基本知識。

初始化工程

首先我們在intellij中創(chuàng)建一個spring boot工程,并且添加spring-boot-starter-amqp擴展。

配置隊列

從上述的流程圖中我們可以看到,一個延遲隊列的實現(xiàn),需要一個緩沖隊列以及一個實際的消費隊列。又由于在rabbitmq中,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:

  1. delay_queue_per_message_ttl:ttl配置在消息上的緩沖隊列。
  2. delay_queue_per_queue_ttl:ttl配置在隊列上的緩沖隊列。
  3. delay_process_queue:實際消費隊列。

我們通過java config的方式將上述的隊列配置為bean。由于我們添加了spring-boot-starter-amqp擴展,spring boot在啟動時會根據(jù)我們的配置自動創(chuàng)建這些隊列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的dlx配置為同一個,且過期的消息都會通過dlx轉(zhuǎn)發(fā)到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

?
1
2
3
4
5
6
7
@bean
queue delayqueuepermessagettl() {
  return queuebuilder.durable(delay_queue_per_message_ttl_name)
            .withargument("x-dead-letter-exchange", delay_exchange_name) // dlx,dead letter發(fā)送到的exchange
            .withargument("x-dead-letter-routing-key", delay_process_queue_name) // dead letter攜帶的routing key
            .build();
}

其中,x-dead-letter-exchange聲明了隊列里的死信轉(zhuǎn)發(fā)到的dlx名稱,x-dead-letter-routing-key聲明了這些死信在轉(zhuǎn)發(fā)時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置代碼:

?
1
2
3
4
5
6
7
8
@bean
queue delayqueueperqueuettl() {
  return queuebuilder.durable(delay_queue_per_queue_ttl_name)
            .withargument("x-dead-letter-exchange", delay_exchange_name) // dlx
            .withargument("x-dead-letter-routing-key", delay_process_queue_name) // dead letter攜帶的routing key
            .withargument("x-message-ttl", queue_expiration) // 設置隊列的過期時間
            .build();
}

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個x-message-ttl,該配置用來設置隊列的過期時間。

delay_process_queue

delay_process_queue的配置最為簡單:

?
1
2
3
4
5
@bean
queue delayprocessqueue() {
  return queuebuilder.durable(delay_process_queue_name)
            .build();
}

配置exchange

配置dlx

首先,我們需要配置dlx,代碼如下:

?
1
2
3
4
@bean
directexchange delayexchange() {
  return new directexchange(delay_exchange_name);
}

然后再將該dlx綁定到實際消費隊列即delay_process_queue上。這樣所有的死信都會通過dlx被轉(zhuǎn)發(fā)到delay_process_queue:

?
1
2
3
4
5
6
@bean
binding dlxbinding(queue delayprocessqueue, directexchange delayexchange) {
  return bindingbuilder.bind(delayprocessqueue)
             .to(delayexchange)
             .with(delay_process_queue_name);
}

配置延遲重試所需的exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之后,我們需要將消息轉(zhuǎn)發(fā)到緩沖隊列,所以緩沖隊列也需要綁定一個exchange。在本例中,我們將delay_process_per_queue_ttl作為延遲重試里的緩沖隊列。具體代碼是如何配置的,這里就不贅述了,大家可以查閱我github中的代碼。

定義消費者

我們創(chuàng)建一個最簡單的消費者processreceiver,這個消費者監(jiān)聽delay_process_queue隊列,對于接受到的消息,他會:

  1. 如果消息里的消息體不等于fail_message,那么他會輸出消息體。
  2. 如果消息里的消息體恰好是fail_message,那么他會模擬拋出異常,然后將該消息重定向到緩沖隊列(對應延遲重試場景)。

另外,我們還需要新建一個監(jiān)聽容器用于存放消費者,代碼如下:

?
1
2
3
4
5
6
7
8
@bean
simplemessagelistenercontainer processcontainer(connectionfactory connectionfactory, processreceiver processreceiver) {
  simplemessagelistenercontainer container = new simplemessagelistenercontainer();
  container.setconnectionfactory(connectionfactory);
  container.setqueuenames(delay_process_queue_name); // 監(jiān)聽delay_process_queue
  container.setmessagelistener(new messagelisteneradapter(processreceiver));
  return container;
}

至此,我們前置的配置代碼已經(jīng)全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲隊列。

編寫測試用例

延遲消費場景

首先我們編寫用于測試ttl設置在消息上的測試代碼。

我們借助spring-rabbit包下提供的rabbittemplate類來發(fā)送消息。由于我們添加了spring-boot-starter-amqp擴展,spring boot會在初始化時自動地將rabbittemplate當成bean加載到容器中。

解決了消息的發(fā)送問題,那么又該如何為每個消息設置ttl呢?這里我們需要借助messagepostprocessor。

messagepostprocessor通常用來設置消息的header以及消息的屬性。我們新建一個expirationmessagepostprocessor類來負責設置消息的ttl屬性: 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 設置消息的失效時間
 */
public class expirationmessagepostprocessor implements messagepostprocessor {
  private final long ttl; // 毫秒
  public expirationmessagepostprocessor(long ttl) {
    this.ttl = ttl;
  }
  @override
  public message postprocessmessage(message message) throws amqpexception {
    message.getmessageproperties()
        .setexpiration(ttl.tostring()); // 設置per-message的失效時間
    return message;
  }
}

然后在調(diào)用rabbittemplate的convertandsend方法時,傳入expirationmessagepostporcessor即可。我們向緩沖隊列中發(fā)送3條消息,過期時間依次為1秒,2秒和3秒。具體的代碼如下所示:

?
1
2
3
4
5
6
7
8
9
10
@test
public void testdelayqueuepermessagettl() throws interruptedexception {
  processreceiver.latch = new countdownlatch(3);
  for (int i = 1; i <= 3; i++) {
    long expiration = i * 1000;
    rabbittemplate.convertandsend(queueconfig.delay_queue_per_message_ttl_name,
        (object) ("message from delay_queue_per_message_ttl with expiration " + expiration), new expirationmessagepostprocessor(expiration));
  }
  processreceiver.latch.await();
}

細心的朋友一定會問,為什么要在代碼中加一個countdownlatch呢?這是因為如果沒有l(wèi)atch阻塞住測試方法的話,測試用例會直接結(jié)束,程序退出,我們就看不到消息被延遲消費的表現(xiàn)了。

那么類似地,測試ttl設置在隊列上的代碼如下:

?
1
2
3
4
5
6
7
8
9
@test
public void testdelayqueueperqueuettl() throws interruptedexception {
  processreceiver.latch = new countdownlatch(3);
  for (int i = 1; i <= 3; i++) {
    rabbittemplate.convertandsend(queueconfig.delay_queue_per_queue_ttl_name,
        "message from delay_queue_per_queue_ttl with expiration " + queueconfig.queue_expiration);
  }
  processreceiver.latch.await();
}

我們向緩沖隊列中發(fā)送3條消息。理論上這3條消息會在4秒后同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景。

?
1
2
3
4
5
6
7
8
@test
public void testfailmessage() throws interruptedexception {
  processreceiver.latch = new countdownlatch(6);
  for (int i = 1; i <= 3; i++) {
    rabbittemplate.convertandsend(queueconfig.delay_process_queue_name, processreceiver.fail_message);
  }
  processreceiver.latch.await();
}

我們向delay_process_queue發(fā)送3條會觸發(fā)fail的消息,理論上這3條消息會在4秒后自動重試。

查看測試結(jié)果

延遲消費場景

延遲消費的場景測試我們分為了ttl設置在消息上和ttl設置在隊列上兩種。首先,我們先看一下ttl設置在消息上的測試結(jié)果:

Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

從上圖中我們可以看到,processreceiver分別經(jīng)過1秒、2秒、3秒收到消息。測試結(jié)果表明消息不僅被延遲消費了,而且每條消息的延遲時間是可以被個性化設置的。ttl設置在消息上的延遲消費場景測試成功。

然后,ttl設置在隊列上的測試結(jié)果如下圖:

Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

從上圖中我們可以看到,processreceiver經(jīng)過了4秒的延遲之后,同時收到了3條消息。測試結(jié)果表明消息不僅被延遲消費了,同時也證明了當ttl設置在隊列上的時候,消息的過期時間是固定的。ttl設置在隊列上的延遲消費場景測試成功。

延遲重試場景

接下來,我們再來看一下延遲重試的測試結(jié)果:

Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列的示例

processreceiver首先收到了3條會觸發(fā)fail的消息,然后將其移動到緩沖隊列之后,過了4秒,又收到了剛才的那3條消息。延遲重試場景測試成功。

總結(jié)

本文首先介紹了延遲隊列的概念以及用途,并且通過代碼詳細講解了如何通過spring boot和rabbitmq實現(xiàn)一個延遲隊列。希望本文能夠?qū)Υ蠹移綍r的學習和工作能有所啟發(fā)和幫助。也希望大家多多支持服務器之家。

原文鏈接:http://www.kissyu.org/

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 98免费视频| 双性受合不垅腿攻np | 关晓彤被调教出奶水 | 白丝校花掀起短裙呻吟小说 | 欧美亚洲国产一区二区三区 | 亚洲网站在线 | free白嫩性hd| 欧美x×x| 99视频久久精品久久 | 91在线视频免费观看 | 国产成人盗拍精品免费视频 | 免费永久观看美女视频网站网址 | 91精品啪在线观看国产日本 | 双性总裁(h) | 国产aⅴ一区二区三区 | 操碰97| 天天噜| 和两个男人玩3p好爽视频 | 欧美精品一二三区 | chinese456老年gay china外卖员gay帮口 | 四色6677最新永久网站 | futa百合文 | 91精品国产91热久久久久福利 | 夫承子液by免费阅读 | 色综合久久日韩国产 | 亚洲品质自拍视频 | 久久精品国产免费播高清无卡 | 国产日韩免费视频 | 特黄特黄一级片 | 色综合伊人色综合网站中国 | 99精品视频一区在线观看miya | 日本亚欧乱色视频在线观看 | 青青青手机在线观看 | 调教老师肉色丝袜的故事 | 国产a免费观看 | 欧美日韩在线一区二区三区 | 欧美日韩久久中文字幕 | 日本免费v片一二三区 | 999精品视频这里只有精品 | 成人丁香乱小说 | 91高清国产视频 |