用過Java并發包的朋友或許對Future (interface) 已經比較熟悉了,其實Future 本身是一種被廣泛運用的并發設計模式,可在很大程度上簡化需要數據流同步的并發應用開發。在一些領域語言(如Alice ML )中甚至直接于語法層面支持Future。
這里就以java.util.concurrent.Future 為例簡單說一下Future的具體工作方式。Future對象本身可以看作是一個顯式的引用,一個對異步處理結果的引用。由于其異步性質,在創建之初,它所引用的對象可能還并不可用(比如尚在運算中,網絡傳輸中或等待中)。這時,得到Future的程序流程如果并不急于使用Future所引用的對象,那么它可以做其它任何想做的事兒,當流程進行到需要Future背后引用的對象時,可能有兩種情況:
希望能看到這個對象可用,并完成一些相關的后續流程。如果實在不可用,也可以進入其它分支流程。
“沒有你我的人生就會失去意義,所以就算海枯石爛,我也要等到你。”(當然,如果實在沒有毅力枯等下去,設一個超時也是可以理解的)
對于前一種情況,可以通過調用Future.isDone()判斷引用的對象是否就緒,并采取不同的處理;而后一種情況則只需調用get()或
get(long timeout, TimeUnit unit)通過同步阻塞方式等待對象就緒。實際運行期是阻塞還是立即返回就取決于get()的調用時機和對象就緒的先后了。
簡單而言,Future模式可以在連續流程中滿足數據驅動的并發需求,既獲得了并發執行的性能提升,又不失連續流程的簡潔優雅。
與其它并發設計模式的對比
除了Future外,其它比較常見的并發設計模式還包括“回調驅動(多線程環境下)”、“消息/事件驅動(Actor模型中)”等。
回調是最常見的異步并發模式,它有即時性高、接口設計簡單等有點。但相對于Future,其缺點也非常明顯。首先,多線程環境下的回調一般是在觸發回調的模塊線程中執行的,這就意味著編寫回調方法時通常必須考慮線程互斥問題;其次,回調方式接口的提供者在本模塊的線程中執行用戶應用的回調也是相對不安全的,因為你無法確定它會花費多長時間或出現什么異常,從而可能間接導致本模塊的即時性和可靠性受影響;再者,使用回調接口不利于順序流程的開發,因為回調方法的執行是孤立的,要與正常流程匯合是比較困難的。因此回調接口適合于在回調中只需要完成簡單任務,并且不必與其它流程匯合的場景。
上述這些回調模式的缺點恰恰正是Future的長項。由于Future的使用是將異步的數據驅動天然的融入順序流程中,因此你完全不必考慮線程互斥問題,Future甚至可以在單線程的程序模型(例如協程)中實現(參見下文將要提到的“Lazy Future”)。另一方面,提供Future接口的模塊完全不必擔心像回調接口那樣的可靠性問題和可能對本模塊的即時性影響。
另一類常見的并發設計模式是“消息(事件)驅動”,它一般運用在Actor模型中:服務請求者向服務提供者發送消息,然后繼續進行后續不依賴服務處理結果的任務,在需要依賴結果前終止當前流程并記錄狀態;在等到回應消息后根據記錄的狀態觸發后續流程。這種基于狀態機的并發控制雖然比回調更適合于有延續性的順序流程,但開發者卻不得不將連續流程按照異步服務的調用切斷為多個按狀態區分的子流程,這樣又人為的增加了開發的復雜性。運用Future模式可以避免這個問題,不必為了異步調用而打碎連續的流程。但是有一點應當特別注意:Future.get()方法可能會阻塞線程的執行,所以它通常無法直接融入常規的Actor模型中。(基于協程的Actor模型可以較好的解決這個沖突)
Future的靈活性還體現在其同步和異步的自由取舍,開發者可以根據流程的需要自由決定是否需要等待[Future.isDone()],何時等待[Future.get()],等待多久[Future.get(timeout)]。比如可以根據數據是否就緒而決定要不要借這個空檔完成點其它任務,這對于實現“異步分支預測”機制是相當方便的。
Future的衍生
除了上面提到的基礎形態之外,Future還有豐富的衍生變化,這里就列舉幾個常見的。
Lazy Future
與一般的Future不同,Lazy Future在創建之初不會主動開始準備引用的對象,而是等到請求對象時才開始相應的工作。因此,Lazy Future本身并不是為了實現并發,而是以節約不必要的運算資源為出發點,效果上與Lambda/Closure類似。例如設計某些API時,你可能需要返回一組信息,而其中某些信息的計算可能會耗費可觀的資源。但調用者不一定都關心所有的這些信息,因此將那些需要耗費較多資源的對象以Lazy Future的形式提供,可以在調用者不需要用到特定的信息時節省資源。
另外Lazy Future也可以用于避免過早的獲取或鎖定資源而產生的不必要的互斥。
Promise
Promise可以看作是Future的一個特殊分支,常見的Future一般是由服務調用者直接觸發異步處理流程,比如調用服務時立即觸發處理或 Lazy Future的取值時觸發處理。但Promise則用于顯式表示那些異步流程并不直接由服務調用者觸發的情景。例如Future接口的定時控制,其異步流程不是由調用者,而是由系統時鐘觸發,再比如淘寶的分布式訂閱框架提供的Future式訂閱接口,其等待數據的可用性不是由訂閱者決定,而在于發布者何時發布或更新數據。因此,相對于標準的Future,Promise接口一般會多出一個set()或fulfill()接口。
可復用的Future
常規的Future是一次性的,也就是說當你獲得了異步的處理結果后,Future對象本身就失去意義了。但經過特殊設計的Future也可以實現復用,這對于可多次變更的數據顯得非常有用。例如前面提到的淘寶分布式訂閱框架所提供的Future式接口,它允許多次調用waitNext()方法(相當于Future.get()),每次調用時是否阻塞取決于在上次調用后是否又有數據發布,如果尚無更新,則阻塞直到下一次的數據發布。這樣設計的好處是,接口的使用者可以在其任何合適的時機,或者直接簡單的在獨立的線程中通過一個無限循環響應訂閱數據的變化,同時還可兼顧其它定時任務,甚至同時等待多個Future。簡化的例子如下:
1
2
3
4
5
6
7
8
9
10
|
for (;;) { schedule = getNextScheduledTaskTime(); while (schedule > now()) { try { data = subscription.waitNext(schedule - now()); processData(data); } catch (Exception e) {...} } doScheduledTask(); } |
Future的使用
首先列舉一段Thinking in Java中的代碼,這是出在并發中的Callable使用的一個例子,代碼,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
//: concurrency/CallableDemo.java import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult( int id) { this .id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for ( int i = 0 ; i < 10 ; i++) results.add(exec.submit( new TaskWithResult(i))); for (Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch (InterruptedException e) { System.out.println(e); return ; } catch (ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /* Output: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 */ //:~ |
解釋一下Future的使用過程,首先ExecutorService對象exec調用submit()方法會產生Future對象,他用Callable返回結果的特定類型進行了參數化。你可以使用isDone()方法來查詢Future是否已經完成。當任務完成時,他具有一個結果,你可以調用get()方法獲取結果。你也可以不用isDone()進行檢查直接調用get(),在這種情況下,get()將會阻塞知道結果準備就緒。你可以在調用具有超時的get()函數,或者先調用isDone()來查看任務是否完成,然后再調用get()。