一、基本概念
reactive x中有幾個核心的概念,先來簡單介紹一下。
1.1、observable和observer(可觀察對象和觀察者)
首先是observable和observer,它們分別是可觀察對象和觀察者。observable可以理解為一個異步的數(shù)據(jù)源,會發(fā)送一系列的值。observer則類似于消費者,需要先訂閱observable,然后才可以接收到其發(fā)射的值。可以說這組概念是設(shè)計模式中的觀察者模式和生產(chǎn)者-消費者模式的綜合體。
1.2、operator(操作符)
另外一個非常重要的概念就是操作符了。操作符作用于observable的數(shù)據(jù)流上,可以對其施加各種各樣的操作。更重要的是,操作符還可以鏈式組合起來。這樣的鏈式函數(shù)調(diào)用不僅將數(shù)據(jù)和操作分隔開來,而且代碼更加清晰可讀。一旦熟練掌握之后,你就會愛上這種感覺的。
1.3、single(單例)
在rxjava和其變體中,還有一個比較特殊的概念叫做single,它是一種只會發(fā)射同一個值的observable,說白了就是單例。當然如果你對java等語言比較熟悉,那么單例想必也很熟悉。
1.4、subject(主體)
主體這個概念非常特殊,它既是observable又是observer。正是因為這個特點,所以subject可以訂閱其他observable,也可以將發(fā)射對象給其他observer。在某些場景中,subject會有很大的作用。
1.5、scheduler(調(diào)度器)
默認情況下reactive x只運行在當前線程下,但是如果有需要的話,也可以用調(diào)度器來讓reactive x運行在多線程環(huán)境下。有很多調(diào)度器和對應(yīng)的操作符,可以處理多線程場景下的各種要求。
1.6、observer和observable
先來看看一個最簡單的例子,運行的結(jié)果會依次打印這些數(shù)字。這里的of是一個操作符,可以根據(jù)給定的參數(shù)創(chuàng)建一個新的observable。創(chuàng)建之后,就可以訂閱observable,三個回調(diào)方法在對應(yīng)的時機執(zhí)行。一旦observer訂閱了observable,就會接收到后續(xù)observable發(fā)射的各項值。
1
2
3
4
5
6
7
8
9
|
from rx import of ob = of( 1 , 2 , 34 , 5 , 6 , 7 , 7 ) ob.subscribe( on_next = lambda i: print (f 'received: {i}' ), on_error = lambda e: print (f 'error: {e}' ), on_completed = lambda : print ( 'completed' ) ) |
這個例子看起來好像很簡單,并且看起來沒什么用。但是當你了解了rx的一些核心概念,就會理解到這是一個多么強大的工具。更重要的是,observable生成數(shù)據(jù)和訂閱的過程是異步的,如果你熟悉的話,就可以利用這個特性做很多事情。
1.7、操作符
在rxpy中另一個非常重要的概念就是操作符了,甚至可以說操作符就是最重要的一個概念了。幾乎所有的功能都可以通過組合各個操作符來實現(xiàn)。熟練掌握操作符就是學好rxpy的關(guān)鍵了。操作符之間也可以用pipe函數(shù)連接起來,構(gòu)成復(fù)雜的操作鏈。
1
2
3
4
5
6
7
8
|
from rx import of, operators as op import rx ob = of( 1 , 2 , 34 , 5 , 6 , 7 , 7 ) ob.pipe( op. map ( lambda i: i * * 2 ), op. filter ( lambda i: i > = 10 ) ).subscribe( lambda i: print (f 'received: {i}' )) |
在rxpy中有大量操作符,可以完成各種各樣的功能。我們來簡單看看其中一些常用的操作符。如果你熟悉java8的流類庫或者其他函數(shù)式編程類庫的話,應(yīng)該對這些操作符感到非常親切。
1.8、創(chuàng)建型操作符
首先是創(chuàng)建observable的操作符,列舉了一些比較常用的創(chuàng)建型操作符。
1.9、過濾型操作符
過濾型操作符的主要作用是對observable進行篩選和過濾。
1.10、轉(zhuǎn)換型操作符
1.11、算術(shù)操作符
1.12、subject
subject是一種特殊的對象,它既是observer又是observable。不過這個對象一般不太常用,但是假如某些用途還是很有用的。所以還是要介紹一下。下面的代碼,因為訂閱的時候第一個值已經(jīng)發(fā)射出去了,所以只會打印訂閱之后才發(fā)射的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from rx.subject import subject, asyncsubject, behaviorsubject, replaysubject # subject同時是observer和observable print ( '--------subject---------' ) subject = subject() subject.on_next( 1 ) subject.subscribe( lambda i: print (i)) subject.on_next( 2 ) subject.on_next( 3 ) subject.on_next( 4 ) subject.on_completed() # 2 3 4 |
另外還有幾個特殊的subject,下面來介紹一下。
1.13、replaysubject
replaysubject是一個特殊的subject,它會記錄所有發(fā)射過的值,不論什么時候訂閱的。所以它可以用來當做緩存來使用。replaysubject還可以接受一個buffersize參數(shù),指定可以緩存的最近數(shù)據(jù)數(shù),默認情況下是全部。
下面的代碼和上面的代碼幾乎完全一樣,但是因為使用了replaysubject,所以所有的值都會被打印。當然大家也可以試試把訂閱語句放到其他位置,看看輸出是否會產(chǎn)生變化。
1
2
3
4
5
6
7
8
9
10
|
# replaysubject會緩存所有值,如果指定參數(shù)的話只會緩存最近的幾個值 print ( '--------replaysubject---------' ) subject = replaysubject() subject.on_next( 1 ) subject.subscribe( lambda i: print (i)) subject.on_next( 2 ) subject.on_next( 3 ) subject.on_next( 4 ) subject.on_completed() # 1 2 3 4 |
1.14、behaviorsubject
behaviorsubject是一個特殊的subject,它只會記錄最近一次發(fā)射的值。而且在創(chuàng)建它的時候,必須指定一個初始值,所有訂閱它的對象都可以接收到這個初始值。當然如果訂閱的晚了,這個初始值同樣會被后面發(fā)射的值覆蓋,這一點要注意。
1
2
3
4
5
6
7
8
9
10
|
# behaviorsubject會緩存上次發(fā)射的值,除非observable已經(jīng)關(guān)閉 print ( '--------behaviorsubject---------' ) subject = behaviorsubject( 0 ) subject.on_next( 1 ) subject.on_next( 2 ) subject.subscribe( lambda i: print (i)) subject.on_next( 3 ) subject.on_next( 4 ) subject.on_completed() # 2 3 4 |
1.15、asyncsubject
asyncsubject是一個特殊的subject,顧名思義它是一個異步的subject,它只會在observer完成的時候發(fā)射數(shù)據(jù),而且只會發(fā)射最后一個數(shù)據(jù)。因此下面的代碼僅僅會輸出4.假如注釋掉最后一行co_completed調(diào)用,那么什么也不會輸出。
1
2
3
4
5
6
7
8
9
10
|
# asyncsubject會緩存上次發(fā)射的值,而且僅會在observable關(guān)閉后開始發(fā)射 print ( '--------asyncsubject---------' ) subject = asyncsubject() subject.on_next( 1 ) subject.on_next( 2 ) subject.subscribe( lambda i: print (i)) subject.on_next( 3 ) subject.on_next( 4 ) subject.on_completed() # 4 |
1.16、scheduler
雖然rxpy算是異步的框架,但是其實它默認還是運行在單個線程之上的,因此如果使用了某些會阻礙線程運行的操作,那么程序就會卡死。當然針對這些情況,我們就可以使用其他的scheduler來調(diào)度任務(wù),保證程序能夠高效運行。
下面的例子創(chuàng)建了一個threadpoolscheduler,它是基于線程池的調(diào)度器。兩個observable用subscribe_on方法指定了調(diào)度器,因此它們會使用不同的線程來工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import rx from rx.scheduler import threadpoolscheduler from rx import operators as op import multiprocessing import time import threading import random def long_work(value): time.sleep(random.randint( 5 , 20 ) / 10 ) return value pool_schedular = threadpoolscheduler(multiprocessing.cpu_count()) rx. range ( 5 ).pipe( op. map ( lambda i: long_work(i + 1 )), op.subscribe_on(pool_schedular) ).subscribe( lambda i: print (f 'work 1: {threading.current_thread().name}, {i}' )) rx.of( 1 , 2 , 3 , 4 , 5 ).pipe( op. map ( lambda i: i * 2 ), op.subscribe_on(pool_schedular) ).subscribe( lambda i: print (f 'work 2: {threading.current_thread().name}, {i}' )) |
如果你觀察過各個操作符的api的話,可以發(fā)現(xiàn)大部分操作符都支持可選的scheduler參數(shù),為操作符指定一個調(diào)度器。如果操作符上指定了調(diào)度器的話,會優(yōu)先使用這個調(diào)度器;其次的話,會使用subscribe方法上指定的調(diào)度器;如果以上都沒有指定的話,就會使用默認的調(diào)度器。
二、應(yīng)用場景
好了,介紹了一些reactive x的知識之后,下面來看看如何來使用reactive x。在很多應(yīng)用場景下,都可以利用reactive x來抽象數(shù)據(jù)處理,把概念簡單化。
2.1、防止重復(fù)發(fā)送
很多情況下我們都需要控制事件的發(fā)生間隔,比如有一個按鈕不小心按了好幾次,只希望第一次按鈕生效。這種情況下可以使用debounce操作符,它會過濾observable,小于指定時間間隔的數(shù)據(jù)會被過濾掉。debounce操作符會等待一段時間,直到過了間隔時間,才會發(fā)射最后一次的數(shù)據(jù)。如果想要過濾后面的數(shù)據(jù),發(fā)送第一次的數(shù)據(jù),則要使用throttle_first操作符。
下面的代碼可以比較好的演示這個操作符,快速按回車鍵發(fā)送數(shù)據(jù),注意觀察按鍵和數(shù)據(jù)顯示之間的關(guān)系,還可以把throttle_first操作符換成debounce操作符,然后再看看輸出會發(fā)生什么變化,還可以完全注釋掉pipe中的操作符,再看看輸出會有什么變化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import rx from rx import operators as op from rx.subject import subject import datetime # debounce操作符,僅在時間間隔之外的可以發(fā)射 ob = subject() ob.pipe( op.throttle_first( 3 ) # op.debounce(3) ).subscribe( on_next = lambda i: print (i), on_completed = lambda : print ( 'completed' ) ) print ( 'press enter to print, press other key to exit' ) while true: s = input () if s = = '': ob.on_next(datetime.datetime.now().time()) else : ob.on_completed() break |
2.2、操作數(shù)據(jù)流
如果需要對一些數(shù)據(jù)進行操作,那么同樣有一大堆操作符可以滿足需求。當然這部分功能并不是reactive x獨有的,如果你對java 8的流類庫有所了解,會發(fā)現(xiàn)這兩者這方面的功能幾乎是完全一樣的。
下面是個簡單的例子,將兩個數(shù)據(jù)源結(jié)合起來,然后找出來其中所有的偶數(shù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import rx from rx import operators as op from rx.subject import subject import datetime # 操作數(shù)據(jù)流 some_data = rx.of( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ) some_data2 = rx.from_iterable( range ( 10 , 20 )) some_data.pipe( op.merge(some_data2), op. filter ( lambda i: i % 2 = = 0 ), # op.map(lambda i: i * 2) ).subscribe( lambda i: print (i)) |
再或者一個利用reduce的簡單例子,求1-100的整數(shù)和。
1
2
3
4
5
6
7
8
|
import rx from rx import operators as op from rx.subject import subject import datetime rx. range ( 1 , 101 ).pipe( op. reduce ( lambda acc, i: acc + i, 0 ) ).subscribe( lambda i: print (i)) |
以上就是淺談python響應(yīng)式類庫rxpy的詳細內(nèi)容,更多關(guān)于python響應(yīng)式類庫rxpy的資料請關(guān)注服務(wù)器之家其它相關(guān)文章!
原文鏈接:https://blog.csdn.net/u011054333/article/details/107240111