一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - futuretask源碼分析(推薦)

futuretask源碼分析(推薦)

2021-01-16 11:35孤客_ Java教程

這篇文章主要介紹了futuretask源碼分析(推薦),小編覺得還是挺不錯的,這里給大家分享下,供各位參考。

FutureTask只實現(xiàn)RunnableFuture接口:

該接口繼承了java.lang.Runnable和Future接口,也就是繼承了這兩個接口的特性。

1.可以不必直接繼承Thread來生成子類,只要實現(xiàn)run方法,且把實例傳入到Thread構造函數(shù),Thread就可以執(zhí)行該實例的run方法了( Thread(Runnable) )。

2.可以讓任務獨立執(zhí)行,get獲取任務執(zhí)行結果時,可以阻塞直至執(zhí)行結果完成。也可以中斷執(zhí)行,判斷執(zhí)行狀態(tài)等。

FutureTask是一個支持取消行為的異步任務執(zhí)行器。該類實現(xiàn)了Future接口的方法。

如: 1. 取消任務執(zhí)行

2. 查詢任務是否執(zhí)行完成

3. 獲取任務執(zhí)行結果(”get“任務必須得執(zhí)行完成才能獲取結果,否則會阻塞直至任務完成)。

注意:一旦任務執(zhí)行完成,則不能執(zhí)行取消任務或者重新啟動任務。(除非一開始就使用runAndReset模式運行任務)
FutureTask支持執(zhí)行兩種任務, Callable 或者 Runnable的實現(xiàn)類。且可把FutureTask實例交由Executor執(zhí)行。

源碼部分(很簡單):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
public class FutureTask<V> implements RunnableFuture<V> {
  /*
   * Revision notes: This differs from previous versions of this
   * class that relied on AbstractQueuedSynchronizer, mainly to
   * avoid surprising users about retaining interrupt status during
   * cancellation races. Sync control in the current design relies
   * on a "state" field updated via CAS to track completion, along
   * with a simple Treiber stack to hold waiting threads.
   *
   * Style note: As usual, we bypass overhead of using
   * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
   */
  /**
   * The run state of this task, initially NEW. The run state
   * transitions to a terminal state only in methods set,
   * setException, and cancel. During completion, state may take on
   * transient values of COMPLETING (while outcome is being set) or
   * INTERRUPTING (only while interrupting the runner to satisfy a
   * cancel(true)). Transitions from these intermediate to final
   * states use cheaper ordered/lazy writes because values are unique
   * and cannot be further modified.
   *
   * Possible state transitions:
   * NEW -> COMPLETING -> NORMAL
   * NEW -> COMPLETING -> EXCEPTIONAL
   * NEW -> CANCELLED
   * NEW -> INTERRUPTING -> INTERRUPTED
   */
  private volatile int state;
  private static final int NEW     = 0;
  private static final int COMPLETING  = 1;
  private static final int NORMAL    = 2;
  private static final int EXCEPTIONAL = 3;
  private static final int CANCELLED  = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;
  /** The underlying callable; nulled out after running */
  private Callable<V> callable;
  /** 用來存儲任務執(zhí)行結果或者異常對象,根據任務state在get時候選擇返回執(zhí)行結果還是拋出異常 */
  private Object outcome; // non-volatile, protected by state reads/writes
  /** 當前運行Run方法的線程 */
  private volatile Thread runner;
  /** Treiber stack of waiting threads */
  private volatile WaitNode waiters;
  /**
   * Returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @SuppressWarnings("unchecked")
  private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
      return (V)x;
    if (s >= CANCELLED)
      throw new CancellationException();
    throw new ExecutionException((Throwable)x);
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Callable}.
   *
   * @param callable the callable task
   * @throws NullPointerException if the callable is null
   */
  public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Runnable}, and arrange that {@code get} will return the
   * given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. If
   * you don't need a particular result, consider using
   * constructions of the form:
   * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
   * @throws NullPointerException if the runnable is null
   */
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }
  //判斷任務是否已取消(異常中斷、取消等)
  public boolean isCancelled() {
    return state >= CANCELLED;
  }
  /**
  判斷任務是否已結束(取消、異常、完成、NORMAL都等于結束)
  **
  public boolean isDone() {
    return state != NEW;
  }
  /**
  mayInterruptIfRunning用來決定任務的狀態(tài)。
          true : 任務狀態(tài)= INTERRUPTING = 5。如果任務已經運行,則強行中斷。如果任務未運行,那么則不會再運行
          false:CANCELLED  = 4。如果任務已經運行,則允許運行完成(但不能通過get獲取結果)。如果任務未運行,那么則不會再運行
  **/
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
      return false;
    if (mayInterruptIfRunning) {
      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
        return false;
      Thread t = runner;
      if (t != null)
        t.interrupt();
      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
      return false;
    finishCompletion();
    return true;
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任務未徹底完成,那么則阻塞直至任務完成后喚醒該線程
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
      throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
      (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
      throw new TimeoutException();
    return report(s);
  }
  /**
   * Protected method invoked when this task transitions to state
   * {@code isDone} (whether normally or via cancellation). The
   * default implementation does nothing. Subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. Note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   */
  protected void done() { }
  /**
  該方法在FutureTask里只有run方法在任務完成后調用。
  主要保存任務執(zhí)行結果到成員變量outcome 中,和切換任務執(zhí)行狀態(tài)。
  由該方法可以得知:
  COMPLETING : 任務已執(zhí)行完成(也可能是異常完成),但還未設置結果到成員變量outcome中,也意味著還不能get
  NORMAL  : 任務徹底執(zhí)行完成
  **/
  protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
  }
  /**
   * Causes this future to report an {@link ExecutionException}
   * with the given throwable as its cause, unless this future has
   * already been set or has been cancelled.
   *
   * <p>This method is invoked internally by the {@link #run} method
   * upon failure of the computation.
   *
   * @param t the cause of failure
   */
  protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
  }
  /**
  由于實現(xiàn)了Runnable接口的緣故,該方法可由執(zhí)行線程所調用。
  **/
  public void run() {
    //只有當任務狀態(tài)=new時才被運行繼續(xù)執(zhí)行
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          //調用Callable的Call方法
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }
  /**
  如果該任務在執(zhí)行過程中不被取消或者異常結束,那么該方法不記錄任務的執(zhí)行結果,且不修改任務執(zhí)行狀態(tài)。
  所以該方法可以重復執(zhí)行N次。不過不能直接調用,因為是protected權限。
  **/
  protected boolean runAndReset() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return false;
    boolean ran = false;
    int s = state;
    try {
      Callable<V> c = callable;
      if (c != null && s == NEW) {
        try {
          c.call(); // don't set result
          ran = true;
        } catch (Throwable ex) {
          setException(ex);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
  }
  /**
   * Ensures that any interrupt from a possible cancel(true) is only
   * delivered to a task while in run or runAndReset.
   */
  private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    if (s == INTERRUPTING)
      while (state == INTERRUPTING)
        Thread.yield(); // wait out pending interrupt
    // assert state == INTERRUPTED;
    // We want to clear any interrupt we may have received from
    // cancel(true). However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
  }
  /**
   * Simple linked list nodes to record waiting threads in a Treiber
   * stack. See other classes such as Phaser and SynchronousQueue
   * for more detailed explanation.
   */
  static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
  /**
  該方法在任務完成(包括異常完成、取消)后調用。刪除所有正在get獲取等待的節(jié)點且喚醒節(jié)點的線程。和調用done方法和置空callable.
  **/
  private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
      if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
        for (;;) {
          Thread t = q.thread;
          if (t != null) {
            q.thread = null;
            LockSupport.unpark(t);
          }
          WaitNode next = q.next;
          if (next == null)
            break;
          q.next = null; // unlink to help gc
          q = next;
        }
        break;
      }
    }
    done();
    callable = null;    // to reduce footprint
  }
  /**
  阻塞等待任務執(zhí)行完成(中斷、正常完成、超時)
  **/
  private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      /**
      這里的if else的順序也是有講究的。
      1.先判斷線程是否中斷,中斷則從隊列中移除(也可能該線程不存在于隊列中)
      2.判斷當前任務是否執(zhí)行完成,執(zhí)行完成則不再阻塞,直接返回。
      3.如果任務狀態(tài)=COMPLETING,證明該任務處于已執(zhí)行完成,正在切換任務執(zhí)行狀態(tài),CPU讓出片刻即可
      4.q==null,則證明還未創(chuàng)建節(jié)點,則創(chuàng)建節(jié)點
      5.q節(jié)點入隊
      6和7.阻塞
      **/
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
      else if (q == null)
        q = new WaitNode();
      else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      }
      else
        LockSupport.park(this);
    }
  }
  /**
   * Tries to unlink a timed-out or interrupted wait node to avoid
   * accumulating garbage. Internal nodes are simply unspliced
   * without CAS since it is harmless if they are traversed anyway
   * by releasers. To avoid effects of unsplicing from already
   * removed nodes, the list is retraversed in case of an apparent
   * race. This is slow when there are a lot of nodes, but we don't
   * expect lists to be long enough to outweigh higher-overhead
   * schemes.
   */
  private void removeWaiter(WaitNode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {     // restart on removeWaiter race
        for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
          s = q.next;
          if (q.thread != null)
            pred = q;
          else if (pred != null) {
            pred.next = s;
            if (pred.thread == null) // check for race
              continue retry;
          }
          else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                             q, s))
            continue retry;
        }
        break;
      }
    }
  }
  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long stateOffset;
  private static final long runnerOffset;
  private static final long waitersOffset;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> k = FutureTask.class;
      stateOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("state"));
      runnerOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("runner"));
      waitersOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("waiters"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

總結

以上就是本文關于futuretask源碼分析(推薦)的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,歡迎大家一起交流討論。

原文鏈接:http://blog.csdn.net/wojiaolinaaa/article/details/50434817

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美日韩国产手机在线观看视频 | 亚洲第一色网 | 国产综合图区 | www.福利| 国色天香视频资源网 | 特级av毛片免费观看 | 久久强奷乱码老熟女 | 日本特级a禁片在线播放 | 禁止的爱善良的未删减版hd | 欧美精品一国产成人性影视 | 欧美一级片免费看 | 久久精品热在线观看85 | 亚洲系列国产系列 | 97青草香蕉依人在线播放 | 99热成人精品热久久669 | 87影院在线观看视频在线观看 | ckinese中国男同gay男男 | 日韩无砖2021特黄 | 奇米影视7777久久精品 | babes性欧美30| 精品亚洲国产一区二区 | 欧美s级人做人爱c视频 | 黑人巨大初黑人解禁作品 | 国内精品视频一区二区三区 | 亚洲国产精久久久久久久 | 图片专区亚洲欧美另类 | 99久久精品国产综合一区 | 亚洲干综合 | 免费在线视频观看 | 扒开双腿疯狂进出爽爽动态图 | naruto堂同人本子汉化gg | 欧美专区在线观看 | 国内精品视频一区二区三区 | 数学老师扒开腿让我爽快 | 高h扶她文肉 | 日韩欧美中文字幕出 | 99热久久这里只有精品23 | 亚洲国产成人在线视频 | 色漫在线观看 | 无遮挡h肉动漫在线观看电车 | 精品无人区麻豆乱码无限制 |