一、推遲執(zhí)行動作
可以使用timer+map方法實(shí)現(xiàn).代碼如下:
1
2
3
4
|
Observable.timer( 5 , TimeUnit.MILLISECONDS).map(value->{ return doSomething(); }).subscribe(System.out::println); } |
二、推遲發(fā)送執(zhí)行的結(jié)果
這種場景要求產(chǎn)生數(shù)據(jù)的動作是馬上執(zhí)行,但是結(jié)果推遲發(fā)送.這和上面場景的是不一樣的.
這種場景可以使用Observable.zip
來實(shí)現(xiàn).
zip操作符將多個(gè)Observable發(fā)射的數(shù)據(jù)按順序組合起來,每個(gè)數(shù)據(jù)只能組合一次,而且都是有序的。最終組合的數(shù)據(jù)的數(shù)量由發(fā)射數(shù)據(jù)最少的Observable來決定。
對于各個(gè)observable相同位置的數(shù)據(jù),需要相互等待,也就說,第一個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生后,要等待第二個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生,等各個(gè)Observable相同位置的數(shù)據(jù)都產(chǎn)生后,才能按指定規(guī)則進(jìn)行組合.這真是我們要利用的.
zip有很多種聲明,但大致上是一樣的,就是傳入幾個(gè)observable,然后指定一個(gè)規(guī)則,對每個(gè)observable對應(yīng)位置的數(shù)據(jù)進(jìn)行處理,產(chǎn)生一個(gè)新的數(shù)據(jù), 下面是其中一個(gè)最簡單的:
1
|
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction); |
用zip實(shí)現(xiàn)推送發(fā)送執(zhí)行結(jié)果如下:
1
2
3
|
Observable.zip(Observable.timer( 5 ,TimeUnit.MILLISECONDS) ,Observable.just(doSomething()), (x,y)->y) .subscribe(System.out::println)); |
三、使用defer在指定線程里執(zhí)行某種動作
如下面的代碼,雖然我們指定了線程的運(yùn)行方式,但是doSomething()
這個(gè)函數(shù)還是在當(dāng)前代碼調(diào)用的線程中執(zhí)行的.
1
2
3
4
|
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->Utils.printlnWithThread(v.toString());); |
通常我們采用下面的方法達(dá)到目的:
1
2
3
4
5
6
|
Observable.create(s->{s.onNext(doSomething());}) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{ Utils.printlnWithThread(v.toString()); }); |
但其實(shí)我們采用defer也能達(dá)到相同的目的.
關(guān)于defer
defer 操作符與create、just、from等操作符一樣,是創(chuàng)建類操作符,不過所有與該操作符相關(guān)的數(shù)據(jù)都是在訂閱是才生效的。
聲明:
1
|
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory); |
defer的Func0里的Observable是在訂閱(subscribe)的時(shí)候才創(chuàng)建的.
作用:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
也就說observable是在訂閱的時(shí)候才創(chuàng)建的.
上面的問題用defer實(shí)現(xiàn):
1
2
3
4
5
|
Observable.defer(()->Observable.just(doSomething())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString()); }); |
四、使用compose不要打斷鏈?zhǔn)浇Y(jié)構(gòu)
我們經(jīng)??吹较旅娴拇a:
1
2
3
4
|
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString()); |
上面的代碼中,subscribeOn(xxx).observeOn(xxx)
可能在很多地方都是一樣的, 如果我們打算把它統(tǒng)一在某一個(gè)地方實(shí)現(xiàn), 我們可以這么寫:
1
2
3
4
|
private static <T> Observable<T> applySchedulers(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); } |
但是這樣每次我們需要調(diào)用上面的方法, 大致會像下面這樣,最外面是一個(gè)函數(shù),等于打破了鏈接結(jié)構(gòu):
1
2
3
4
5
6
7
8
9
10
|
applySchedulers(Observable.from(someSource).map( new Func1<Data, Data>() { @Override public Data call(Data data) { return manipulate(data); } }) ).subscribe( new Action1<Data>() { @Override public void call(Data data) { doSomething(data); } }); |
可以使用compose操作符達(dá)到不打破鏈接結(jié)構(gòu)的目的.
compose的申明如下:
1
|
public Observable compose(Transformer<? super T, ? extends R> transformer); |
它的入?yún)⑹且粋€(gè)Transformer接口,輸出是一個(gè)Observable. 而Transformer實(shí)際上就是一個(gè)Func1<Observable<T>
, Observable<R>>
,換言之就是:可以通過它將一種類型的Observable轉(zhuǎn)換成另一種類型的Observable.
簡單的說,compose可以通過指定的轉(zhuǎn)化方式(輸入?yún)?shù)transformer),將原來的observable轉(zhuǎn)化為另外一種Observable.
通過compose, 采用下面方式指定線程方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private static <T> Transformer<T, T> applySchedulers() { return new Transformer<T, T>() { @Override public Observable<T> call(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); } }; } Observable.just(doSomething()).compose(applySchedulers()) .subscribe(v->{Utils.printlnWithThread(v.toString()); }); |
函數(shù)applySchedulers可以使用lambda表達(dá)式進(jìn)一步簡化為下面為:
1
2
3
4
|
private static <T> Transformer<T, T> applySchedulers() { return observable->observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); } |
五、按優(yōu)先級使用不同的執(zhí)行結(jié)果
上面這個(gè)標(biāo)題估計(jì)沒表達(dá)清楚我想表達(dá)的場景. 其實(shí)我想表達(dá)的場景類似于平常的獲取網(wǎng)絡(luò)數(shù)據(jù)場景:如果緩存有,從緩存獲取,如果沒有,再從網(wǎng)絡(luò)獲取.
這里要求,如果緩存有,不會做從網(wǎng)絡(luò)獲取數(shù)據(jù)的動作.
這個(gè)可以采用concat+first實(shí)現(xiàn).
concat將幾個(gè)Observable合并成一個(gè)Observable,返回最終的一個(gè)Observable. 而那些數(shù)據(jù)就像從一個(gè)Observable發(fā)出來一樣. 參數(shù)可以是多個(gè)Observable,也可以是包含Observalbe的Iterator.
新的observable內(nèi)的數(shù)據(jù)排列按原來concat里的observable順序排列,即新結(jié)果內(nèi)的數(shù)據(jù)是按原來的順序排序的.
下面是上述需求的實(shí)現(xiàn):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Observable.concat(getDataFromCache(),getDataFromNetwork()).first() .subscribe(v->System.out.println( "result:" +v)); //從緩存獲取數(shù)據(jù) private static Observable<String> getDataFromCache(){ return Observable.create(s -> { //dosomething to get data int value = new Random().nextInt(); value = value% 2 ; if (value!= 0 ){ s.onNext( "data from cache:" +value); //產(chǎn)生數(shù)據(jù) } //s.onError(new Throwable("none")); s.onCompleted(); } ); } //從網(wǎng)絡(luò)獲取數(shù)據(jù) private static Observable<String> getDataFromNetwork(){ return Observable.create(s -> { for ( int i = 0 ; i < 10 ; i++) { Utils.println( "obs2 generate " +i); s.onNext( "data from network:" + i); //產(chǎn)生數(shù)據(jù) } s.onCompleted(); } ); } |
上面的實(shí)現(xiàn),如果getDataFromCache有數(shù)據(jù), getDataFromNetwork這里的代碼是不會執(zhí)行的, 這正是我們想要的.
上面實(shí)現(xiàn)有幾個(gè)需要注意:
1、有可能從兩個(gè)地方都獲取不到數(shù)據(jù), 這種場景下使用first會拋出異常NoSuchElementException,如果是這樣的場景,需要用firstOrDefault替換上面的first.
2、上面getDataFromCache()
里,如果沒有數(shù)據(jù),我們直接調(diào)用onCompleted,如果不調(diào)用onCompleted,而是調(diào)用onError,則上述采用concat是得不到任何結(jié)果的.因?yàn)閏oncat在收到任何一個(gè)error,合并就會停止.所以,如果要用onError, 則需要用concatDelayError替代concat.concatDelayError
會先忽略error,將error推遲到最后在處理.
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。