1. 前言
最近寫關于響應式編程的東西有點多,很多同學反映對flux
和mono
這兩個reactor中的概念有點懵逼。但是目前java響應式編程中我們對這兩個對象的接觸又最多,諸如spring webflux、rsocket、r2dbc。我開始也對這兩個對象頭疼,所以今天我們就簡單來探討一下它們。
2. 響應流的特點
要搞清楚這兩個概念,必須說一下響應流規范。它是響應式編程的基石。他具有以下特點:
響應流必須是無阻塞的。響應流必須是一個數據流。它必須可以異步執行。并且它也應該能夠處理背壓。
背壓是反應流中的一個重要概念,可以理解為,生產者可以感受到消費者反饋的消費壓力,并根據壓力進行動態調整生產速率。形象點可以按照下面理解:
3. publisher
由于響應流的特點,我們不能再返回一個簡單的pojo對象來表示結果了。必須返回一個類似java中的future
的概念,在有結果可用時通知消費者進行消費響應。
reactive stream規范中這種被定義為publisher<t>
,publisher<t>
是一個可以提供0-n個序列元素的提供者,并根據其訂閱者subscriber<? super t>
的需求推送元素。一個publisher<t>
可以支持多個訂閱者,并可以根據訂閱者的邏輯進行推送序列元素。下面這個excel計算就能說明一些publisher<t>
的特點。
a1-a9就可以看做publisher<t>
及其提供的元素序列。a10-a13分別是求和函數sum(a1:a9)
、平均函數average(a1:a9)
、最大值函數max(a1:a9)
、最小值函數min(a1:a9)
,可以看作訂閱者subscriber
。假如說我們沒有a10-a13,那么a1-a9就沒有實際意義,它們并不產生計算。這也是響應式的一個重要特點:當沒有訂閱時發布者什么也不做。
而flux
和mono
都是publisher<t>
在reactor 3實現。publisher<t>
提供了subscribe
方法,允許消費者在有結果可用時進行消費。如果沒有消費者publisher<t>
不會做任何事情,他根據消費情況進行響應。 publisher<t>
可能返回零或者多個,甚至可能是無限的,為了更加清晰表示期待的結果就引入了兩個實現模型mono
和flux
。
4. flux
flux
是一個發出(emit)0-n
個元素組成的異步序列的publisher<t>
,可以被oncomplete
信號或者onerror
信號所終止。在響應流規范中存在三種給下游消費者調用的方法 onnext
, oncomplete
, 和onerror
。下面這張圖表示了flux的抽象模型:
以上的的講解對于初次接觸反應式編程的依然是難以理解的,所以這里有一個循序漸進的理解過程。
有些類比并不是很妥當,但是對于你循序漸進的理解這些新概念還是有幫助的。
傳統數據處理
我們在平常是這么寫的:
1
2
3
4
|
public list<clientuser> allusers() { return arrays.aslist( new clientuser( "felord.cn" , "reactive" ), new clientuser( "felordcn" , "reactor" )); } |
我們通過迭代返回值list
來get
這些元素進行再處理(消費),這種方式有點類似廚師做了很多菜,吃不吃在于食客。需要食客主動去來吃就行了(pull的方式),至于喜歡吃什么不喜歡吃什么自己隨意,怎么吃也自己隨意。
流式數據處理
在java 8中我們可以改寫為流的表示:
1
2
3
4
|
public stream<clientuser> allusers() { return stream.of( new clientuser( "felord.cn" , "reactive" ), new clientuser( "felordcn" , "reactor" )); } |
依然是廚師做了很多菜,但是這種就更加高級了一些,提供了菜品的搭配方式(不包含具體細節),食客可以按照說明根據自己的習慣搭配著去吃,一但開始概不退換,吃完為止,過期不候。
反應式數據處理
在reactor中我們又可以改寫為flux
表示:
1
2
3
4
|
public flux<clientuser> allusers(){ return flux.just( new clientuser( "felord.cn" , "reactive" ), new clientuser( "felordcn" , "reactor" )); } |
這時候食客只需要訂餐就行了,做好了自然就呈上來,而且可以隨時根據食客的飯量進行調整。如果沒有食客訂餐那么廚師就什么都不用做。當然不止有這么點特性,不過對于方便我們理解來說這就夠了。
5. mono
mono
是一個發出(emit)0-1
個元素的publisher<t>
,可以被oncomplete
信號或者onerror
信號所終止。
這里就不翻譯了,整體和flux
差不多,只不過這里只會發出0-1個元素。也就是說不是有就是沒有。象flux
一樣,我們來看看mono
的演化過程以幫助理解。
傳統數據處理
1
2
3
|
public clientuser currentuser () { return isauthenticated ? new clientuser( "felord.cn" , "reactive" ) : null ; } |
直接返回符合條件的對象或者null
。
optional的處理方式
1
2
3
4
|
public optional<clientuser> currentuser () { return isauthenticated ? optional.of( new clientuser( "felord.cn" , "reactive" )) : optional.empty(); } |
這個optional
我覺得就有反應式的那種味兒了,當然它并不是反應式。當我們不從返回值optional
取其中具體的對象時,我們不清楚里面到底有沒有,但是optional
是一定客觀存在的,不會出現npe問題。
反應式數據處理
1
2
3
4
|
public mono<clientuser> currentuser () { return isauthenticated ? mono.just( new clientuser( "felord.cn" , "reactive" )) : mono.empty(); } |
和optional
有點類似的機制,當然mono
不是為了解決npe問題的,它是為了處理響應流中單個值(也可能是void
)而存在的。
6. 總結
flux
和mono
是java反應式中的重要概念,但是很多同學包括我在開始都難以理解它們。這其實是規定了兩種流式范式,這種范式讓數據具有一些新的特性,比如基于發布訂閱的事件驅動,異步流、背壓等等。另外數據是推送(push)給消費者的以區別于平時我們的拉(pull)模式。同時我們可以像stream api一樣使用類似map
、flatmap
等操作符(operator)來操作它們。對flux
和mono
這兩個概念需要花一些時間去理解它們,不能操之過急。
到此這篇關于java反應式框架reactor中的mono和flux的文章就介紹到這了,更多相關java框架 reactor中的mono和flux內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.cnblogs.com/felordcn/p/13747262.html