一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服務(wù)器之家 - 編程語言 - JAVA教程 - RxJava中多種場景的實(shí)現(xiàn)總結(jié)

RxJava中多種場景的實(shí)現(xiàn)總結(jié)

2020-06-21 15:11lluo2010 JAVA教程

這篇文章給大家詳細(xì)介紹了RxJava中多種場景的實(shí)現(xiàn),對大家學(xué)習(xí)使用RxJava具有一定的參考借鑒價(jià)值,有需要的朋友們可以參考學(xué)習(xí),下面來一起看看吧。

一、推遲執(zhí)行動作

可以使用timer+map方法實(shí)現(xiàn).代碼如下:

?
1
2
3
4
Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }

二、推遲發(fā)送執(zhí)行的結(jié)果

這種場景要求產(chǎn)生數(shù)據(jù)的動作是馬上執(zhí)行,但是結(jié)果推遲發(fā)送.這和上面場景的是不一樣的.

這種場景可以使用Observable.zip來實(shí)現(xiàn).

zip操作符將多個(gè)Observable發(fā)射的數(shù)據(jù)按順序組合起來,每個(gè)數(shù)據(jù)只能組合一次,而且都是有序的。最終組合的數(shù)據(jù)的數(shù)量由發(fā)射數(shù)據(jù)最少的Observable來決定。

對于各個(gè)observable相同位置的數(shù)據(jù),需要相互等待,也就說,第一個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生后,要等待第二個(gè)observable第一個(gè)位置的數(shù)據(jù)產(chǎn)生,等各個(gè)Observable相同位置的數(shù)據(jù)都產(chǎn)生后,才能按指定規(guī)則進(jìn)行組合.這真是我們要利用的.

zip有很多種聲明,但大致上是一樣的,就是傳入幾個(gè)observable,然后指定一個(gè)規(guī)則,對每個(gè)observable對應(yīng)位置的數(shù)據(jù)進(jìn)行處理,產(chǎn)生一個(gè)新的數(shù)據(jù), 下面是其中一個(gè)最簡單的:

?
1
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);

用zip實(shí)現(xiàn)推送發(fā)送執(zhí)行結(jié)果如下:

?
1
2
3
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
        ,Observable.just(doSomething()), (x,y)->y)
  .subscribe(System.out::println));

三、使用defer在指定線程里執(zhí)行某種動作

如下面的代碼,雖然我們指定了線程的運(yùn)行方式,但是doSomething()這個(gè)函數(shù)還是在當(dāng)前代碼調(diào)用的線程中執(zhí)行的.

?
1
2
3
4
Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->Utils.printlnWithThread(v.toString()););

通常我們采用下面的方法達(dá)到目的:

?
1
2
3
4
5
6
Observable.create(s->{s.onNext(doSomething());})
   .subscribeOn(Schedulers.io())
   .observeOn(Schedulers.computation())
   .subscribe(v->{
    Utils.printlnWithThread(v.toString());
 });

但其實(shí)我們采用defer也能達(dá)到相同的目的.

關(guān)于defer

defer 操作符與create、just、from等操作符一樣,是創(chuàng)建類操作符,不過所有與該操作符相關(guān)的數(shù)據(jù)都是在訂閱是才生效的。

聲明:

?
1
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

defer的Func0里的Observable是在訂閱(subscribe)的時(shí)候才創(chuàng)建的.

作用:

Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

也就說observable是在訂閱的時(shí)候才創(chuàng)建的.

上面的問題用defer實(shí)現(xiàn):

?
1
2
3
4
5
Observable.defer(()->Observable.just(doSomething()))
   .subscribeOn(Schedulers.io())
   .observeOn(Schedulers.computation())
   .subscribe(v->{Utils.printlnWithThread(v.toString());
 });

四、使用compose不要打斷鏈?zhǔn)浇Y(jié)構(gòu)

我們經(jīng)??吹较旅娴拇a:

?
1
2
3
4
Observable.just(doSomething())
   .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
   .subscribe(v->{Utils.printlnWithThread(v.toString());

上面的代碼中,subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一樣的, 如果我們打算把它統(tǒng)一在某一個(gè)地方實(shí)現(xiàn), 我們可以這么寫:

?
1
2
3
4
private static <T> Observable<T> applySchedulers(Observable<T> observable) {
 return observable.subscribeOn(Schedulers.io())
   .observeOn(Schedulers.computation());
}

但是這樣每次我們需要調(diào)用上面的方法, 大致會像下面這樣,最外面是一個(gè)函數(shù),等于打破了鏈接結(jié)構(gòu):

?
1
2
3
4
5
6
7
8
9
10
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
  @Override public Data call(Data data) {
  return manipulate(data);
  }
 })
).subscribe(new Action1<Data>() {
 @Override public void call(Data data) {
 doSomething(data);
 }
});

可以使用compose操作符達(dá)到不打破鏈接結(jié)構(gòu)的目的.

compose的申明如下:

?
1
public Observable compose(Transformer<? super T, ? extends R> transformer);

它的入?yún)⑹且粋€(gè)Transformer接口,輸出是一個(gè)Observable. 而Transformer實(shí)際上就是一個(gè)Func1<Observable<T>, Observable<R>> ,換言之就是:可以通過它將一種類型的Observable轉(zhuǎn)換成另一種類型的Observable.

簡單的說,compose可以通過指定的轉(zhuǎn)化方式(輸入?yún)?shù)transformer),將原來的observable轉(zhuǎn)化為另外一種Observable.

通過compose, 采用下面方式指定線程方式:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
private static <T> Transformer<T, T> applySchedulers() {
  return new Transformer<T, T>() {
   @Override
   public Observable<T> call(Observable<T> observable) {
    return observable.subscribeOn(Schedulers.io())
      .observeOn(Schedulers.computation());
   }
  };
 }
 
Observable.just(doSomething()).compose(applySchedulers())
   .subscribe(v->{Utils.printlnWithThread(v.toString());
  });

函數(shù)applySchedulers可以使用lambda表達(dá)式進(jìn)一步簡化為下面為:

?
1
2
3
4
private static <T> Transformer<T, T> applySchedulers() {
 return observable->observable.subscribeOn(Schedulers.io())
   .observeOn(Schedulers.computation());
}

五、按優(yōu)先級使用不同的執(zhí)行結(jié)果

上面這個(gè)標(biāo)題估計(jì)沒表達(dá)清楚我想表達(dá)的場景. 其實(shí)我想表達(dá)的場景類似于平常的獲取網(wǎng)絡(luò)數(shù)據(jù)場景:如果緩存有,從緩存獲取,如果沒有,再從網(wǎng)絡(luò)獲取.

這里要求,如果緩存有,不會做從網(wǎng)絡(luò)獲取數(shù)據(jù)的動作.

這個(gè)可以采用concat+first實(shí)現(xiàn).

concat將幾個(gè)Observable合并成一個(gè)Observable,返回最終的一個(gè)Observable. 而那些數(shù)據(jù)就像從一個(gè)Observable發(fā)出來一樣. 參數(shù)可以是多個(gè)Observable,也可以是包含Observalbe的Iterator.

新的observable內(nèi)的數(shù)據(jù)排列按原來concat里的observable順序排列,即新結(jié)果內(nèi)的數(shù)據(jù)是按原來的順序排序的.

下面是上述需求的實(shí)現(xiàn):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
   .subscribe(v->System.out.println("result:"+v));
//從緩存獲取數(shù)據(jù)
private static Observable<String> getDataFromCache(){
 return Observable.create(s -> {
  //dosomething to get data
  int value = new Random().nextInt();
  value = value%2;
  if (value!=0){
   s.onNext("data from cache:"+value); //產(chǎn)生數(shù)據(jù)
  }
  //s.onError(new Throwable("none"));
  s.onCompleted();
 }
   );
}
//從網(wǎng)絡(luò)獲取數(shù)據(jù)
private static Observable<String> getDataFromNetwork(){
 return Observable.create(s -> {
  for (int i = 0; i < 10; i++) {
   Utils.println("obs2 generate "+i);
   s.onNext("data from network:" + i); //產(chǎn)生數(shù)據(jù)
  }
  s.onCompleted();
 }
   );
}

上面的實(shí)現(xiàn),如果getDataFromCache有數(shù)據(jù), getDataFromNetwork這里的代碼是不會執(zhí)行的, 這正是我們想要的.

上面實(shí)現(xiàn)有幾個(gè)需要注意:

    1、有可能從兩個(gè)地方都獲取不到數(shù)據(jù), 這種場景下使用first會拋出異常NoSuchElementException,如果是這樣的場景,需要用firstOrDefault替換上面的first.

    2、上面getDataFromCache()里,如果沒有數(shù)據(jù),我們直接調(diào)用onCompleted,如果不調(diào)用onCompleted,而是調(diào)用onError,則上述采用concat是得不到任何結(jié)果的.因?yàn)閏oncat在收到任何一個(gè)error,合并就會停止.所以,如果要用onError, 則需要用concatDelayError替代concat.concatDelayError會先忽略error,將error推遲到最后在處理.

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 性free非洲老妇 | 全弄乱纶小说 | 久久机热免费视频 | 亚洲欧美日韩一区成人 | 日韩精品视频在线播放 | 国产精自产拍久久久久久 | 久久久久久久国产精品视频 | 欧美一级特黄aaa大片 | www.男人的天堂.com | 好紧水好多 | 国产精品视频在这里有精品 | 国产人成激情视频在线观看 | 美日毛片 | 狠狠燥 | 国产综合成人久久大片91 | 手机av影院 | 暖暖免费高清完整版观看日本 | 久久AV喷吹AV高潮欧美 | 偷拍综合网| 人人爽人人看 | 日韩欧美不卡片 | 国产精品久久久久毛片 | 极品美女aⅴ高清在线观看 极品ts赵恩静和直男激战啪啪 | 国产精品自在线拍 | 亚洲欧美综合一区 | 91九色视频无限观看免费 | 亚洲波多野结衣日韩在线 | 国产日韩精品一区二区在线观看播放 | 校草太大了h | 美女脱一净二净不带胸罩 | 9久热这里只有精品免费 | 国产成人综合网亚洲欧美在线 | a亚洲天堂| 国产99视频精品免费视频免里 | 久久99热狠狠色一区二区 | 日本不卡在线观看免费v | 调教校花浣肠开菊 | 精品无人区乱码1区2区3区免费 | 精品国产一区二区三区在线观看 | 牛人国产偷窥女洗浴在线观看 | 四虎e234hcom |