一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

2020-12-01 23:18JavaEdge Java教程

RabbitMQ雖可將消息落地磁盤,即使MQ異常消息數(shù)據(jù)也不會(huì)丟失,但異步流程在消息發(fā)送、傳輸、處理等環(huán)節(jié),都可能發(fā)生消息丟失。所有MQ都無法確保百分百可用,業(yè)務(wù)設(shè)計(jì)都需考慮不可用時(shí)異步流程將如何繼續(xù)。

優(yōu)秀的項(xiàng)目都由同步、異步和定時(shí)任務(wù)三種處理模式相輔相成。當(dāng)屬異步編程充滿坑點(diǎn)。

1 適用場(chǎng)景

1.1 服務(wù)于主流程的分支流程

在注冊(cè)流程中,數(shù)據(jù)寫DB是主流程,但注冊(cè)后給用戶發(fā)優(yōu)惠券或歡迎短信是分支流程,時(shí)效性也不強(qiáng)。

1.2 用戶無需實(shí)時(shí)看到結(jié)果

比如外賣下單后的配貨、送貨流程完全可異步處理,每個(gè)階段處理完成后,再給用戶發(fā)推送或短信讓用戶知曉即可。

1.3 MQ

任務(wù)的緩沖的分發(fā),流量削峰、服務(wù)解耦和消息廣播。

當(dāng)然了異步處理不僅僅是通過 MQ 來實(shí)現(xiàn),還有其他方式

比如開新線程執(zhí)行,返回 Future

還有各種異步框架,比如 Vertx,它是通過 callback 的方式實(shí)現(xiàn)

2 異步處理之坑

2.1 異步處理需做消息補(bǔ)償以閉環(huán)

RabbitMQ雖可將消息落地磁盤,即使MQ異常消息數(shù)據(jù)也不會(huì)丟失,但異步流程在消息發(fā)送、傳輸、處理等環(huán)節(jié),都可能發(fā)生消息丟失。所有MQ都無法確保百分百可用,業(yè)務(wù)設(shè)計(jì)都需考慮不可用時(shí)異步流程將如何繼續(xù)。

因此,對(duì)于異步處理流程,必須考慮補(bǔ)償或建立主備雙活流程。

2.1.1 案例

用戶注冊(cè)后異步發(fā)送歡迎消息。

  • 用戶注冊(cè)落DB為同步流程
  • 會(huì)員服務(wù)收到消息后發(fā)送歡迎消息為異步流程

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

  • 藍(lán)線

         MQ異步處理(主線),消息可能丟失(虛線代表異步調(diào)用)

  • 綠線

        補(bǔ)償Job定期消息補(bǔ)償(備線),以補(bǔ)償主線丟失的消息

  • 考慮極端的MQ中間件失效場(chǎng)景

        要求備線的處理吞吐能力達(dá)到主線性能

代碼示例

UserController 注冊(cè)+發(fā)送異步消息。注冊(cè)方法,一次性注冊(cè)10個(gè)用戶,用戶注冊(cè)消息不能發(fā)送出去的概率為50%。

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

MemberService 會(huì)員服務(wù)監(jiān)聽用戶注冊(cè)成功的消息,并發(fā)送歡迎短信。使用ConcurrentHashMap存放那些發(fā)過短信的用戶ID實(shí)現(xiàn)冪等,避免相同的用戶補(bǔ)償時(shí)重復(fù)發(fā)短信

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

對(duì)于MQ消費(fèi)程序,處理邏輯須考慮去重(支持冪等):

  • MQ消息可能會(huì)因中間件本身配置錯(cuò)誤、穩(wěn)定性等原因出現(xiàn)重復(fù)
  • 自動(dòng)補(bǔ)償重復(fù)
  • 比如本例,同一消息可能既走M(jìn)Q也走補(bǔ)償,肯定會(huì)出現(xiàn)重復(fù),而且考慮到高內(nèi)聚,補(bǔ)償Job本身不會(huì)做去重
  • 人工補(bǔ)償重復(fù)

出現(xiàn)消息堆積時(shí),異步處理流程必然延遲。若提供補(bǔ)償功能,則在處理遇到延遲時(shí),很可能會(huì)先人工補(bǔ)償,過段時(shí)間后處理程序又收到消息了,重復(fù)處理。

有次MQ故障,MQ中堆積了幾十萬條發(fā)放資金消息,導(dǎo)致業(yè)務(wù)無法及時(shí)處理,運(yùn)營(yíng)以為程序出錯(cuò),就先通過后臺(tái)進(jìn)行人工處理,結(jié)果MQ系統(tǒng)恢復(fù)后消息又被重復(fù)處理一次,造成大量資金重復(fù)發(fā)放。

異步處理須考慮消息重復(fù)可能性,因此處理邏輯須實(shí)現(xiàn)冪等,防止重復(fù)處理。

接著定義補(bǔ)償Job即備線操作。

定時(shí)任務(wù),5秒做一次補(bǔ)償,因Job并不知道哪些用戶注冊(cè)的消息可能丟失,所以是全量補(bǔ)償。

  • 補(bǔ)償邏輯

         每5秒補(bǔ)償一次,按順序一次補(bǔ)償5個(gè)用戶,下一次補(bǔ)償操作從上一次補(bǔ)償?shù)淖詈笠粋€(gè)用戶ID開始

         補(bǔ)償任務(wù)提交到線程池以“異步”處理,提高處理能力

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

為實(shí)現(xiàn)高內(nèi)聚,主線和備線處理消息,最好使用同一方法。本案例的MemberService監(jiān)聽到MQ消息和CompensationJob補(bǔ)償,調(diào)用的都是welcome。

這里的補(bǔ)償邏輯簡(jiǎn)單僅為 demo,實(shí)際生產(chǎn)代碼須:

  • 考慮配置補(bǔ)償?shù)念l次、每次處理數(shù)量,以及補(bǔ)償線程池大小等參數(shù)為合適值,以滿足補(bǔ)償?shù)耐掏铝?/li>
  • 考慮備線補(bǔ)償數(shù)據(jù)進(jìn)行適當(dāng)延遲
  • 比如,對(duì)注冊(cè)時(shí)間在30s前的用戶再進(jìn)行補(bǔ)償,以方便和主線MQ實(shí)時(shí)流程錯(cuò)開,避免沖突
  • 諸如當(dāng)前補(bǔ)償?shù)侥膫€(gè)用戶的offset數(shù)據(jù),需要落地DB
  • 補(bǔ)償Job本身須高可用,可使用類似xxl-job或ElasticJob等任務(wù)系統(tǒng)。

運(yùn)行程序,執(zhí)行注冊(cè)方法注冊(cè)10個(gè)用戶,查看日志

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

可見

  • 共10個(gè)用戶,MQ發(fā)送成功的用戶有四個(gè):1、5、7、8
  • 補(bǔ)償任務(wù)第一次運(yùn)行,補(bǔ)償了用戶2、3、4,第二次運(yùn)行補(bǔ)償了用戶6、9,第三次運(yùn)行補(bǔ)充了用戶10

消息補(bǔ)償閉環(huán)的最高標(biāo)準(zhǔn)

能夠達(dá)到補(bǔ)償全量數(shù)據(jù)的吞吐量。即若補(bǔ)償備線足夠完善,即使直接停機(jī)MQ,雖會(huì)稍微影響處理及時(shí)性,但至少確保流程都能正常執(zhí)行。

小結(jié)

實(shí)際開發(fā)要考慮異步流程丟消息或處理中斷場(chǎng)景。

異步流程需有備線以補(bǔ)償,比如這里的全量補(bǔ)償方式,即便異步流程徹底失效,通過補(bǔ)償也能讓業(yè)務(wù)繼續(xù)進(jìn)行。

2.2 RabbitMQ廣播、工作隊(duì)列模式坑

消息模式是廣播 Or 工作隊(duì)列

  • 消息廣播

        同一消息,不同消費(fèi)者都能分別消費(fèi)

  • 隊(duì)列模式

        不同消費(fèi)者共享消費(fèi)同一個(gè)隊(duì)列的數(shù)據(jù),相同消息只能被某一個(gè)消費(fèi)者消費(fèi)一次。

比如同一用戶的注冊(cè)消息

  • 會(huì)員服務(wù)需監(jiān)聽以發(fā)送歡迎短信
  • 營(yíng)銷服務(wù)需監(jiān)聽以發(fā)送新用戶小禮物

但會(huì)員、營(yíng)銷服務(wù)都可能有多實(shí)例,業(yè)務(wù)需求同一用戶的消息,可同時(shí)廣播給不同的服務(wù)(廣播模式),但對(duì)同一服務(wù)的不同實(shí)例(比如會(huì)員服務(wù)1和會(huì)員服務(wù)2),不管哪個(gè)實(shí)例來處理,處理一次即可(工作隊(duì)列模式):

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

實(shí)現(xiàn)代碼時(shí)務(wù)必確認(rèn)MQ系統(tǒng)的機(jī)制,確保消息的路由按期望。

RocketMQ實(shí)現(xiàn)類似功能比較簡(jiǎn)單直白:若消費(fèi)者屬于一個(gè)組,那么消息只會(huì)由同組的一個(gè)消費(fèi)者消費(fèi);若消費(fèi)者屬不同組,每個(gè)組都能消費(fèi)一遍消息。

而RabbitMQ的消息路由模式采用隊(duì)列+交換器,隊(duì)列是消息載體,交換器決定消息路由到隊(duì)列的方式。

step1:會(huì)員服務(wù)-監(jiān)聽用戶服務(wù)發(fā)出的新用戶注冊(cè)消息

若啟動(dòng)倆會(huì)員服務(wù),那么同一用戶的注冊(cè)消息應(yīng)只能被其中一個(gè)實(shí)例消費(fèi)。

分別實(shí)現(xiàn)RabbitMQ隊(duì)列、交換器、綁定三件套。

  • 隊(duì)列使用匿名隊(duì)列
  • 交換器使用DirectExchange,交換器綁定到匿名隊(duì)列的路由Key是空字符串

收到消息之后,打印所在實(shí)例使用的端口。

  • 消息發(fā)布者、消費(fèi)者、以及MQ的配置

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

使用12345和45678兩個(gè)端口啟動(dòng)倆程序?qū)嵗螅l(fā)條消息,輸出的日志,顯示同一會(huì)員服務(wù)兩個(gè)實(shí)例都收到了消息:

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

所以問題在于不明

RabbitMQ直接交換器和隊(duì)列的綁定關(guān)系

RabbitMQ的直接交換器根據(jù)routingKey路由消息。而程序每次啟動(dòng)都會(huì)創(chuàng)建匿名(隨機(jī)命名)隊(duì)列,所以每個(gè)會(huì)員服務(wù)實(shí)例都對(duì)應(yīng)獨(dú)立的隊(duì)列,以空routingKey綁定到直接交換器。

用戶服務(wù)發(fā)消息時(shí)也設(shè)置了空routingKey,所以直接交換器收到消息后,發(fā)現(xiàn)匹配倆隊(duì)列,于是都轉(zhuǎn)發(fā)消息

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

修復(fù)

對(duì)會(huì)員服務(wù)不要使用匿名隊(duì)列,而使用同一隊(duì)列。

將上面代碼中的匿名隊(duì)列換做普通隊(duì)列:

private static final String QUEUE = "newuserQueue";@Beanpublic Queue queue() { return new Queue(QUEUE);}

這樣對(duì)同一消息,倆實(shí)例中只有一個(gè)實(shí)例可收到,不同消息被輪詢發(fā)給不同實(shí)例。

現(xiàn)在的交換器和隊(duì)列關(guān)系

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

step2:用戶服務(wù)-廣播消息給會(huì)員、營(yíng)銷服務(wù)

期望會(huì)員、營(yíng)銷服務(wù)都能收到廣播消息,但會(huì)員/營(yíng)銷服務(wù)中的每個(gè)實(shí)例只需收到一次消息。

聲明一個(gè)隊(duì)列和一個(gè)FanoutExchange,然后模擬倆用戶服務(wù)和倆營(yíng)銷服務(wù):

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

注冊(cè)四個(gè)用戶。日志發(fā)現(xiàn)一條用戶注冊(cè)的消息,要么被會(huì)員服務(wù)收到,要么被營(yíng)銷服務(wù)收到,這不是廣播。可使用的明明是FanoutExchange,為什么沒起效呢?

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

因?yàn)閺V播交換器會(huì)忽略routingKey,廣播消息到所有綁定的隊(duì)列。該案例的倆會(huì)員服務(wù)和兩個(gè)營(yíng)銷服務(wù)都綁定了同一隊(duì)列,所以四服務(wù)只能收到一次消息:

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

修復(fù)

拆分隊(duì)列,會(huì)員和營(yíng)銷兩組服務(wù)分別使用一條獨(dú)立隊(duì)列綁定到廣播交換器

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

現(xiàn)在的交換器和隊(duì)列結(jié)構(gòu)

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

從日志輸出可以驗(yàn)證,對(duì)每條MQ消息,會(huì)員服務(wù)和營(yíng)銷服務(wù)分別都會(huì)收到一次,一條消息廣播到兩個(gè)服務(wù)同時(shí),在每一個(gè)服務(wù)的兩個(gè)實(shí)例中通過輪詢接收:

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

異步的消息路由模式一旦配置出錯(cuò),輕則可能導(dǎo)致消息重復(fù)處理,重則可能導(dǎo)致重要的服務(wù)無法接收到消息,最終造成業(yè)務(wù)邏輯錯(cuò)誤。

小結(jié)

微服務(wù)場(chǎng)景下不同服務(wù)多個(gè)實(shí)例監(jiān)聽消息的情況,一般不同服務(wù)需要同時(shí)收到相同的消息,而相同服務(wù)的多個(gè)實(shí)例只需要輪詢接收消息。我們需要確認(rèn)MQ的消息路由配置是否滿足需求,以避免消息重復(fù)或漏發(fā)問題。

2.3 死信堵塞MQ之坑

始終無法處理的死信消息,可能會(huì)引發(fā)堵塞MQ。

若線程池的任務(wù)隊(duì)列無上限,最終可能導(dǎo)致OOM,類似的MQ也要注意任務(wù)堆積問題。對(duì)于突發(fā)流量引起的MQ堆積,問題并不大,適當(dāng)調(diào)整消費(fèi)者的消費(fèi)能力應(yīng)該就可以解決。但在很多時(shí)候,消息隊(duì)列的堆積堵塞,是因?yàn)橛写罅渴冀K無法處理的消息。

2.3.1 案例

用戶服務(wù)在用戶注冊(cè)后發(fā)出一條消息,會(huì)員服務(wù)監(jiān)聽到消息后給用戶派發(fā)優(yōu)惠券,但因用戶并沒有保存成功,會(huì)員服務(wù)處理消息始終失敗,消息重新進(jìn)入隊(duì)列,然后還是處理失敗。這種在MQ中回蕩的同一條消息,就是死信。

隨著MQ被越來越多的死信填滿,消費(fèi)者需花費(fèi)大量時(shí)間反復(fù)處理死信,導(dǎo)致正常消息的消費(fèi)受阻,最終MQ可能因數(shù)據(jù)量過大而崩潰。

定義一個(gè)隊(duì)列、一個(gè)直接交換器,然后把隊(duì)列綁定到交換器

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

sendMessage發(fā)送消息到MQ,訪問一次提交一條消息,使用自增標(biāo)識(shí)作為消息內(nèi)容

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

收到消息后,直接NPE,模擬處理出錯(cuò)

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

調(diào)用sendMessage接口發(fā)送兩條消息,然后來到RabbitMQ管理臺(tái),可以看到這兩條消息始終在隊(duì)列,不斷被重新投遞,導(dǎo)致重新投遞QPS達(dá)到1063。

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

在日志中也可看到大量異常信息。

修復(fù)方案

  • 解決死信無限重復(fù)進(jìn)入隊(duì)列最簡(jiǎn)單方案

        程序處理出錯(cuò)時(shí),直接拋AmqpRejectAndDontRequeueException,避免消息重新進(jìn)入隊(duì)列

throw new AmqpRejectAndDontRequeueException("error"); 

但更希望對(duì)同一消息,能夠先進(jìn)行幾次重試,解決因?yàn)榫W(wǎng)絡(luò)問題導(dǎo)致的偶發(fā)消息處理失敗,若依舊失敗,再把消息投遞到專門設(shè)置的DLX。對(duì)于來自DLX的數(shù)據(jù),可能只是記錄日志發(fā)送報(bào)警,即使出現(xiàn)異常也不會(huì)再重復(fù)投遞。

邏輯如下

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

針對(duì)該問題,我們來看

Spring AMQP的簡(jiǎn)便解決方案

  1. 定義死信交換器、死信隊(duì)列。其實(shí)都是普通交換器和隊(duì)列,只不過專門用于處理死信消息
  2. 通過RetryInterceptorBuilder構(gòu)建一個(gè)RetryOperationsInterceptor以處理失敗時(shí)候的重試。策略是最多嘗試5次(重試4次);并且采取指數(shù)退避重試,首次重試延遲1秒,第二次2秒,以此類推,最大延遲是10秒;如果第4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個(gè)DLX
  3. 定義死信隊(duì)列的處理程序。本案例只記錄日志

代碼

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

執(zhí)行程序,發(fā)送兩條消息,查看日志:

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!
  • msg2的4次重試間隔分別是1秒、2秒、4秒、8秒,再加上首次的失敗,所以最大嘗試次數(shù)是5
  • 4次重試后,RepublishMessageRecoverer把消息發(fā)往DLX
  • 死信處理程序輸出了got dead message msg2。

雖然幾乎同時(shí)發(fā)倆消息,但msg2在msg1四次重試全部結(jié)束后才開始處理,因?yàn)槟J(rèn)SimpleMessageListenerContainer只有一個(gè)消費(fèi)線程。可通過增加消費(fèi)線程避免性能問題:

直接設(shè)置concurrentConsumers參數(shù)為10,來增加到10個(gè)工作線程

用了這么久的RabbitMQ異步編程竟然都是錯(cuò)的!

也可設(shè)置maxConcurrentConsumers參數(shù),讓SimpleMessageListenerContainer動(dòng)態(tài)調(diào)整消費(fèi)者線程數(shù)。

小結(jié)

一般在遇到消息處理失敗的時(shí)候,可設(shè)置重試。若重試還是不行,可把該消息扔到專門的死信隊(duì)列處理,不要讓死信影響到正常消息處理。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲色图第一页 | 丝袜老师好湿好紧我要进去了 | 西西人体大胆啪啪私拍色约约 | 男人猛激烈吃奶gif动态图 | 精品国产乱码久久久久久免费流畅 | 四虎国产精品免费久久麻豆 | 国产精品视频免费看 | 亚洲免费视频在线观看 | 欧美国产在线视频 | 欧美区一区 | 情侣宾馆愉拍自拍视频 | 欧美亚洲高清日韩成人 | 国产成人精品曰本亚洲77美色 | 99热er | 免费一看一级欧美 | 天天做天天爱天天综合网 | 视频免费观看在线播放高清 | 免费特黄视频 | 午夜桃色剧场 | 亚洲色图影院 | 99re这里都是精品 | 欧美区在线 | 久久青青草原 | 手机看片自拍自自拍日韩免费 | 日本肥熟| 视频在线观看一区二区三区 | 国产精品四虎在线观看免费 | 五花大绑esebdsm国产 | 久久r视频 | 朝鲜女人性猛交 | 精品久久免费视频 | 513热点网深夜影院影院诶 | 男人影院天堂网址 | 99国产精品久久久久久久... | 亚洲成人综合在线 | 国产在线视频福利 | 久久电影午夜 | 香蕉久久久久久狠狠色 | 国内会所按摩推拿国产 | 丁香久久婷婷 | 麻豆视频入口 |