一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服務器之家 - 編程語言 - JAVA教程 - Java多線程之異步Future機制的原理和實現

Java多線程之異步Future機制的原理和實現

2020-06-07 13:12java教程網 JAVA教程

這篇文章主要為大家詳細介紹了Java多線程之異步Future機制的原理和實現,感興趣的小伙伴們可以參考一下

項目中經常有些任務需要異步(提交到線程池中)去執行,而主線程往往需要知道異步執行產生的結果,這時我們要怎么做呢?用runnable是無法實現的,我們需要用callable看下面的代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class AddTask implements Callable<Integer> {
 
 private int a,b;
 
 public AddTask(int a, int b) {
 this.a = a;
 this.b = b;
 }
 
 @Override
 public Integer call throws Exception {
 Integer result = a + b;
 return result;
 }
 
 public static void main(String[] args) throws InterruptedException, ExecutionException {
 ExecutorService executor = Executors.newSingleThreadExecutor;
 //JDK目前為止返回的都是FutureTask的實例
 Future<Integer> future = executor.submit(new AddTask(1, 2));
 Integer result = future.get;// 只有當future的狀態是已完成時(future.isDone = true),get方法才會返回
 }
}

雖然可以實現獲取異步執行結果的需求,但是我們發現這個Future其實很不好用,因為它沒有提供通知的機制,也就是說我們不知道future什么時候完成(如果我們需要輪詢isDone()來判斷的話感覺就沒有用這個的必要了)。看下java.util.concurrent.future.Future 的接口方法:

?
1
2
3
4
5
6
7
8
public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled;
  boolean isDone;
  V get throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

由此可見JDK的Future機制其實并不好用,如果能給這個future加個監聽器,讓它在完成時通知監聽器的話就比較好用了,就像下面這個IFuture:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package future;
 
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
/**
 * The result of an asynchronous operation.
 *
 * @author lixiaohui
 * @param <V> 執行結果的類型參數
 */
public interface IFuture<V> extends Future<V> {
 boolean isSuccess; // 是否成功
 V getNow; //立即返回結果(不管Future是否處于完成狀態)
 Throwable cause; //若執行失敗時的原因
    boolean isCancellable; //是否可以取消
 IFuture<V> await throws InterruptedException; //等待future的完成
 boolean await(long timeoutMillis) throws InterruptedException; // 超時等待future的完成
 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
    IFuture<V> awaitUninterruptibly; //等待future的完成,不響應中斷
    boolean awaitUninterruptibly(long timeoutMillis);//超時等待future的完成,不響應中斷
 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
 IFuture<V> addListener(IFutureListener<V> l); //當future完成時,會通知這些加進來的監聽器
 IFuture<V> removeListener(IFutureListener<V> l);
 
}

接下來就一起來實現這個IFuture,在這之前要說明下Object.wait,Object.notifyAll方法,因為整個Future實現的原???的核心就是這兩個方法.看看JDK里面的解釋:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Object {
  /**
   * Causes the current thread to wait until another thread invokes the
   * {@link java.lang.Object#notify} method or the
   * {@link java.lang.Object#notifyAll} method for this object.
   * In other words, this method behaves exactly as if it simply
   * performs the call {@code wait(0)}.
   * 調用該方法后,當前線程會釋放對象監視器鎖,并讓出CPU使用權。直到別的線程調用notify/notifyAll
   */
  public final void wait throws InterruptedException {
    wait(0);
  }
 
  /**
   * Wakes up all threads that are waiting on this object's monitor. A
   * thread waits on an object's monitor by calling one of the
   * {@code wait} methods.
   * <p>
   * The awakened threads will not be able to proceed until the current
   * thread relinquishes the lock on this object. The awakened threads
   * will compete in the usual manner with any other threads that might
   * be actively competing to synchronize on this object; for example,
   * the awakened threads enjoy no reliable privilege or disadvantage in
   * being the next thread to lock this object.
   */
  public final native void notifyAll;
}

知道這個后,我們要自己實現Future也就有了思路,當線程調用了IFuture.await等一系列的方法時,如果Future還未完成,那么就調用future.wait 方法使線程進入WAITING狀態。而當別的線程設置Future為完成狀態(注意這里的完成狀態包括正常結束和異常結束)時,就需要調用future.notifyAll方法來喚醒之前因為調用過wait方法而處于WAITING狀態的那些線程。完整的實現如下(代碼應該沒有很難理解的地方,我是參考netty的Future機制的。有興趣的可以去看看netty的源碼):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package future;
 
import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
/**
 * <pre>
 * 正常結束時, 若執行的結果不為null, 則result為執行結果; 若執行結果為null, 則result = {@link AbstractFuture#SUCCESS_SIGNAL}
 * 異常結束時, result為 {@link CauseHolder} 的實例;若是被取消而導致的異常結束, 則result為 {@link CancellationException} 的實例, 否則為其它異常的實例
 * 以下情況會使異步操作由未完成狀態轉至已完成狀態, 也就是在以下情況發生時調用notifyAll方法:
 * <ul>
 * <li>異步操作被取消時(cancel方法)</li>
 * <li>異步操作正常結束時(setSuccess方法)</li>
 * <li>異步操作異常結束時(setFailure方法)</li>
 * </ul>
 * </pre>
 *
 * @author lixiaohui
 *
 * @param <V>
 * 異步執行結果的類型
 */
public class AbstractFuture<V> implements IFuture<V> {
 
 protected volatile Object result; // 需要保證其可見性
    /**
     * 監聽器集
     */
 protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>;
 
 /**
 * 當任務正常執行結果為null時, 即客戶端調用{@link AbstractFuture#setSuccess(null)}時,
 * result引用該對象
 */
 private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal;
 
 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
 if (isDone) { // 已完成了不能取消
  return false;
 }
 
 synchronized (this) {
  if (isDone) { // double check
  return false;
  }
  result = new CauseHolder(new CancellationException);
  notifyAll; // isDone = true, 通知等待在該對象的wait的線程
 }
 notifyListeners; // 通知監聽器該異步操作已完成
 return true;
 }
 
 @Override
 public boolean isCancellable {
 return result == null;
 }
 
 @Override
 public boolean isCancelled {
 return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
 }
 
 @Override
 public boolean isDone {
 return result != null;
 }
 
 @Override
 public V get throws InterruptedException, ExecutionException {
 await; // 等待執行結果
 
 Throwable cause = cause;
 if (cause == null) { // 沒有發生異常,異步操作正常結束
  return getNow;
 }
 if (cause instanceof CancellationException) { // 異步操作被取消了
  throw (CancellationException) cause;
 }
 throw new ExecutionException(cause); // 其他異常
 }
 
 @Override
 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 if (await(timeout, unit)) {// 超時等待執行結果
  Throwable cause = cause;
  if (cause == null) {// 沒有發生異常,異步操作正常結束
  return getNow;
  }
  if (cause instanceof CancellationException) {// 異步操作被取消了
  throw (CancellationException) cause;
  }
  throw new ExecutionException(cause);// 其他異常
 }
 // 時間到了異步操作還沒有結束, 拋出超時異常
 throw new TimeoutException;
 }
 
 @Override
 public boolean isSuccess {
 return result == null ? false : !(result instanceof CauseHolder);
 }
 
 @SuppressWarnings("unchecked")
 @Override
 public V getNow {
 return (V) (result == SUCCESS_SIGNAL ? null : result);
 }
 
 @Override
 public Throwable cause {
 if (result != null && result instanceof CauseHolder) {
  return ((CauseHolder) result).cause;
 }
 return null;
 }
 
 @Override
 public IFuture<V> addListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }
 if (isDone) { // 若已完成直接通知該監聽器
  notifyListener(listener);
  return this;
 }
 synchronized (this) {
  if (!isDone) {
  listeners.add(listener);
  return this;
  }
 }
 notifyListener(listener);
 return this;
 }
 
 @Override
 public IFuture<V> removeListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }
 
 if (!isDone) {
  listeners.remove(listener);
 }
 
 return this;
 }
 
 @Override
 public IFuture<V> await throws InterruptedException {
 return await0(true);
 }
 
 
 private IFuture<V> await0(boolean interruptable) throws InterruptedException {
 if (!isDone) { // 若已完成就直接返回了
  // 若允許終端且被中斷了則拋出中斷異常
  if (interruptable && Thread.interrupted) {
  throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");
  }
 
  boolean interrupted = false;
  synchronized (this) {
  while (!isDone) {
   try {
   wait; // 釋放鎖進入waiting狀態,等待其它線程調用本對象的notify/notifyAll方法
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }
  }
  }
  if (interrupted) {
  // 為什么這里要設中斷標志位?因為從wait方法返回后, 中斷標志是被clear了的,
  // 這里重新設置以便讓其它代碼知道這里被中斷了。
  Thread.currentThread.interrupt;
  }
 }
 return this;
 }
 
 @Override
 public boolean await(long timeoutMillis) throws InterruptedException {
 return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
 }
 
 @Override
 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
 return await0(unit.toNanos(timeout), true);
 }
 
 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
 if (isDone) {
  return true;
 }
 
 if (timeoutNanos <= 0) {
  return isDone;
 }
 
 if (interruptable && Thread.interrupted) {
  throw new InterruptedException(toString);
 }
 
 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime;
 long waitTime = timeoutNanos;
 boolean interrupted = false;
 
 try {
  synchronized (this) {
  if (isDone) {
   return true;
  }
 
  if (waitTime <= 0) {
   return isDone;
  }
 
  for (;;) {
   try {
   wait(waitTime / 1000000, (int) (waitTime % 1000000));
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }
 
   if (isDone) {
   return true;
   } else {
   waitTime = timeoutNanos - (System.nanoTime - startTime);
   if (waitTime <= 0) {
    return isDone;
   }
   }
  }
  }
 } finally {
  if (interrupted) {
  Thread.currentThread.interrupt;
  }
 }
 }
 
 @Override
 public IFuture<V> awaitUninterruptibly {
 try {
  return await0(false);
 } catch (InterruptedException e) { // 這里若拋異常了就無法處理了
  throw new java.lang.InternalError;
 }
 }
 
 @Override
 public boolean awaitUninterruptibly(long timeoutMillis) {
 try {
  return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }
 
 @Override
 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
 try {
  return await0(unit.toNanos(timeout), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }
 
 protected IFuture<V> setFailure(Throwable cause) {
 if (setFailure0(cause)) {
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }
 
 private boolean setFailure0(Throwable cause) {
 if (isDone) {
  return false;
 }
 
 synchronized (this) {
  if (isDone) {
  return false;
  }
  result = new CauseHolder(cause);
  notifyAll;
 }
 
 return true;
 }
 
 protected IFuture<V> setSuccess(Object result) {
 if (setSuccess0(result)) { // 設置成功后通知監聽器
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }
 
 private boolean setSuccess0(Object result) {
 if (isDone) {
  return false;
 }
 
 synchronized (this) {
  if (isDone) {
  return false;
  }
  if (result == null) { // 異步操作正常執行完畢的結果是null
  this.result = SUCCESS_SIGNAL;
  } else {
  this.result = result;
  }
  notifyAll;
 }
 return true;
 }
 
 private void notifyListeners {
 for (IFutureListener<V> l : listeners) {
  notifyListener(l);
 }
 }
 
 private void notifyListener(IFutureListener<V> l) {
 try {
  l.operationCompleted(this);
 } catch (Exception e) {
  e.printStackTrace;
 }
 }
 
 private static class SuccessSignal {
 
 }
 
 private static final class CauseHolder {
 final Throwable cause;
 
 CauseHolder(Throwable cause) {
  this.cause = cause;
 }
 }
}

那么要怎么使用這個呢,有了上面的骨架實現,我們就可以定制各種各樣的異步結果了。下面模擬一下一個延時的任務:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package future.test;
 
import future.IFuture;
import future.IFutureListener;
 
/**
 * 延時加法
 * @author lixiaohui
 *
 */
public class DelayAdder {
 
 public static void main(String[] args) {
 new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> {
  
  @Override
  public void operationCompleted(IFuture<Integer> future) throws Exception {
  System.out.println(future.getNow);
  }
  
 });
 }
 /**
 * 延遲加
 * @param delay 延時時長 milliseconds
 * @param a 加數
 * @param b 加數
 * @return 異步結果
 */
 public DelayAdditionFuture add(long delay, int a, int b) {
 DelayAdditionFuture future = new DelayAdditionFuture;
 new Thread(new DelayAdditionTask(delay, a, b, future)).start;
 return future;
 }
 
 private class DelayAdditionTask implements Runnable {
 
 private long delay;
 
 private int a, b;
 
 private DelayAdditionFuture future;
 
 public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
  super;
  this.delay = delay;
  this.a = a;
  this.b = b;
  this.future = future;
 }
 
 @Override
 public void run {
  try {
  Thread.sleep(delay);
  Integer i = a + b;
  // TODO 這里設置future為完成狀態(正常執行完畢)
  future.setSuccess(i);
  } catch (InterruptedException e) {
  // TODO 這里設置future為完成狀態(異常執行完畢)
  future.setFailure(e.getCause);
  }
 }
 
 }
} package future.test;
 
import future.AbstractFuture;
import future.IFuture;
//只是把兩個方法對外暴露
public class DelayAdditionFuture extends AbstractFuture<Integer> {
 
 @Override
 public IFuture<Integer> setSuccess(Object result) {
 return super.setSuccess(result);
 }
 
 @Override
 public IFuture<Integer> setFailure(Throwable cause) {
 return super.setFailure(cause);
 }
 
}

可以看到客戶端不用主動去詢問future是否完成,而是future完成時自動回調operationcompleted方法,客戶端只需在回調里實現邏輯即可。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 精品国产自在在线在线观看 | 好男人天堂网 | a∨79成人网 | 天堂日韩 | 精品国产精品国产偷麻豆 | 国产视频久久 | 我的妹妹最近有点怪免费播放 | 国产一区国产二区国产三区 | 九九精品国产兔费观看久久 | 国产专区亚洲欧美另类在线 | 美女张开腿黄网站免费精品动漫 | 秋霞理论最新三级理论最 | 精品久久99麻豆蜜桃666 | 日本高清视频网站 | 娇妻与公陈峰姚瑶小说在线阅读 | 精品久久久久久亚洲精品 | 国产精品亚洲综合第一区 | 亚洲高清在线天堂精品 | 日本一区二区免费在线 | 手机在线观看精品国产片 | 91制片厂果冻传媒首页 | 99视频免费在线观看 | 国产在线观看99 | 免费在线看片网站 | 日本理论片中文在线观看2828 | 很黄的孕妇a级黄毛片 | 欧美艳星kagneyiynn高清 | www.色小妹 | sese在线| 边摸边操| 日韩一品在线播放视频一品免费 | 大肚孕妇的高h辣文 | 1024毛片| 国产成人福利色视频 | 青草视频网址 | 娇喘嗯嗯 轻点啊视频福利 九九九九在线精品免费视频 | b站免费 | yjsp妖精视频在线观看免费 | 五月天91| 91精品国产高清久久久久久91 | 成人男女网免费 |