一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - ThreadPoolExecutor線程池原理及其execute方法(詳解)

ThreadPoolExecutor線程池原理及其execute方法(詳解)

2020-11-19 10:47Java教程網 Java教程

下面小編就為大家帶來一篇ThreadPoolExecutor線程池原理及其execute方法(詳解)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

jdk1.7.0_79

對于線程池大部分人可能會用,也知道為什么用。無非就是任務需要異步執行,再者就是線程需要統一管理起來。對于從線程池中獲取線程,大部分人可能只知道,我現在需要一個線程來執行一個任務,那我就把任務丟到線程池里,線程池里有空閑的線程就執行,沒有空閑的線程就等待。實際上對于線程池的執行原理遠遠不止這么簡單。

Java并發包中提供了線程池類——ThreadPoolExecutor,實際上更多的我們可能用到的是Executors工廠類為我們提供的線程池newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,這三個線程池并不是ThreadPoolExecutor的子類,關于這幾者之間的關系,我們先來查看ThreadPoolExecutor,查看源碼發現其一共有4個構造方法。

?
1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

首先就從這幾個參數開始來了解線程池ThreadPoolExecutor的執行原理

corePoolSize:核心線程池的線程數量

maximumPoolSize:最大的線程池線程數量

keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。

unit:線程活動保持時間的單位。

workQueue:指定任務隊列所使用的阻塞隊列

corePoolSizemaximumPoolSize都在指定線程池中的線程數量,好像平時用到線程池的時候最多就只需要傳遞一個線程池大小的參數就能創建一個線程池啊,Java為我們提供了一些常用的線程池類就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,當然如果我們想要自己發揮創建自定義的線程池就得自己來“配置”有關線程池的一些參數。

當把一個任務交給線程池來處理的時候,線程池的執行原理如下圖所示參考自《Java并發編程的藝術》

ThreadPoolExecutor線程池原理及其execute方法(詳解)

首先會判斷核心線程池里是否有線程可執行,有空閑線程則創建一個線程來執行任務。

②當核心線程池里已經沒有線程可執行的時候,此時將任務丟到任務隊列中去。

③如果任務隊列(有界)也已經滿了的話,但運行的線程數小于最大線程池的數量的時候,此時將會新建一個線程用于執行任務,但如果運行的線程數已經達到最大線程池的數量的時候,此時將無法創建線程執行任務。

所以實際上對于線程池不僅是單純地將任務丟到線程池,線程池中有線程就執行任務,沒線程就等待。

為鞏固一下線程池的原理,現在再來了解上面提到的常用的3個線程池:

Executors.newFixedThreadPool創建一個固定數量線程的線程池。

?
1
2
3
4
// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

可以看到newFixedThreadPool中調用的是ThreadPoolExecutor類,傳遞的參數corePoolSize= maximumPoolSize=nThread。回顧線程池的執行原理,當一個任務提交到線程池中,首先判斷核心線程池里有沒有空閑線程,有則創建線程,沒有則將任務放到任務隊列(這里是有界阻塞隊列LinkedBlockingQueue)中,如果任務隊列已經滿了的話,對于newFixedThreadPool來說,它的最大線程池數量=核心線程池數量,此時任務隊列也滿了,將不能擴展創建新的線程來執行任務。

Executors.newSingleThreadExecutor:創建只包含一個線程的線程池。  

?
1
2
3
4
//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

只有一個線程的線程池好像有點奇怪,并且并沒有直接將返回ThreadPoolExecutor,甚至也沒有直接將線程池數量1傳遞給newFixedThreadPool返回。那就說明這個只含有一個線程的線程池,或許并沒有只包含一個線程那么簡單。在其源碼注釋中這么寫到:創建只有一個工作線程的線程池用于操作一個無界隊列(如果由于前驅節點的執行被終止結束了,一個新的線程將會繼續執行后繼節點線程)任務得以繼續執行,不同于newFixedThreadPool(1)不會有額外的線程來重新繼續執行后繼節點。也就是說newSingleThreadExecutor自始至終都只有一個線程在執行,這和newFixedThreadPool一樣,但如果線程終止結束過后newSingleThreadExecutor則會重新創建一個新的線程來繼續執行任務隊列中的線程,而newFixedThreaPool則不會。

Executors.newCachedThreadPool:根據需要創建新線程的線程池。

?
1
2
3
4
//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

可以看到newCachedThread返回的是ThreadPoolExecutor,其參數核心線程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,這也就是說當任務被提交到newCachedThread線程池時,將會直接把任務放到SynchronousQueue任務隊列中,maximumPool從任務隊列中獲取任務。注意SynchronousQueue是一個沒有容量的隊列,也就是說每個入隊操作必須等待另一個線程的對應出隊操作,如果主線程提交任務的速度高于maximumPool中線程處理任務的速度時,newCachedThreadPool會不斷創建線程,線程多并不是一件好事,嚴重會耗盡CPU和內存資源。

題外話:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,這三者都直接或間接調用了ThreadPoolExecutor,為什么它們三者沒有直接是其子類,而是通過Executors來實例化呢?這是所采用的靜態工廠方法,在java.util.Connections接口中同樣也是采用的靜態工廠方法來創建相關的類。這樣有很多好處,靜態工廠方法是用來產生對象的,產生什么對象沒關系,只要返回原返回類型或原返回類型的子類型都可以,降低API數目和使用難度,在《Effective Java》中的第1條就是靜態工廠方法。

回到ThreadPoolExecutor,首先來看它的繼承關系:

ThreadPoolExecutor線程池原理及其execute方法(詳解)

ThreadPoolExecutor它的頂級父類是Executor接口,只包含了一個方法——execute,這個方法也就是線程池的“執行”。

?
1
2
3
4
//Executor#execute
public interface Executor {
 void execute(Runnable command);
}

Executor#execute的實現則是在ThreadPoolExecutor中實現的:

?
1
2
3
4
5
6
7
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null)
  throw new NullPointerException();
  int c = ctl.get();
 
}

一來就碰到個不知所云的ctl變量它的定義:

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

這個變量使用來干嘛的呢?它的作用有點類似我們在《ReadWriteLock接口及其實現ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態,而AQS則只提供了state一個int型變量,此時將state高16位表示為讀狀態,低16位表示為寫狀態。這里的clt同樣也是,它表示了兩個概念:

workerCount:當前有效的線程數

runState:當前線程池的五種狀態,Running、Shutdown、Stop、Tidying、Terminate。

int型變量一共有32位,線程池的五種狀態runState至少需要3位來表示,故workCount只能有29位,所以代碼中規定線程池的有效線程數最多為229-1。

?
1
2
3
4
5
6
7
8
9
//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3//32-3=29,線程數量所占位數
private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大線程數,229-1
//五種線程池狀態
private static final int RUNNING = -1 << COUNT_BITS; /int型變量高3位(含符號位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000
private static final int STOP = 1 << COUNT_BITS; //高3位001
private static final int TIDYING = 2 << COUNT_BITS; //高3位010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位011

再次回到ThreadPoolExecutor#execute方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
   int c = ctl.get(); //由它可以獲取到當前有效的線程數和線程池的狀態
/*1.獲取當前正在運行線程數是否小于核心線程池,是則新創建一個線程執行任務,否則將任務放到任務隊列中*/
 if (workerCountOf(c) < corePoolSize){
  if (addWorker(command, tre))  //在addWorker中創建工作線程執行任務
   return ;
  c = ctl.get();
 }
/*2.當前核心線程池中全部線程都在運行workerCountOf(c) >= corePoolSize,所以此時將線程放到任務隊列中*/
 if (isRunning(c) && workQueue.offer(command)) { //線程池是否處于運行狀態,且是否任務插入任務隊列成功
  int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))  //線程池是否處于運行狀態,如果不是則使剛剛的任務出隊
       reject(command); //拋出RejectedExceptionException異常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
/*3.插入隊列不成功,且當前線程數數量小于最大線程池數量,此時則創建新線程執行任務,創建失敗拋出異常*/
  else if (!addWorker(command, false)){
    reject(command); //拋出RejectedExceptionException異常
  }
}

上面代碼注釋第7行的即判斷當前核心線程池里是否有空閑線程,有則通過addWorker方法創建工作線程執行任務。addWorker方法較長,篩選出重要的代碼來解析。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查線程池是否處于運行狀態,核心線程池中是否還有空閑線程,都滿足條件過后則會調用compareAndIncrementWorkerCount先將正在運行的線程數+1,數量自增成功則跳出循環,自增失敗則繼續從頭繼續循環*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
/*正在運行的線程數自增成功后則將線程封裝成工作線程Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;  //全局鎖
    w = new Woker(firstTask);  //將線程封裝為Worker工作線程
    final Thread t = w.thread;
    if (t != null) {
      mainLock.lock(); //獲取全局鎖
/*當持有了全局鎖的時候,還需要再次檢查線程池的運行狀態等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);  //線程池運行狀態
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){  //線程池處于運行狀態,或者線程池關閉且任務線程為空
          if (t.isAlive()) //線程處于活躍狀態,即線程已經開始執行或者還未死亡,正確的應線程在這里應該是還未開始執行的
            throw new IllegalThreadStateException();
          workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含線程池中所有的工作線程,只有在獲取了全局的時候才能訪問它。將新構造的工作線程加入到工作線程集合中
          int s = worker.size(); //工作線程數量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true; //新構造的工作線程加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start(); //在被構造為Worker工作線程,且被加入到工作線程集合中后,執行線程任務,注意這里的start實際上執行Worker中run方法,所以接下來分析Worker的run方法
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted) //未能成功創建執行工作線程
      addWorkerFailed(w); //在啟動工作線程失敗后,將工作線程從集合中移除
  }
  return workerStarted;
}

在上面第35代碼中,工作線程被成功添加到工作線程集合中后,則開始start執行,這里start執行的是Worker工作線程中的run方法。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1); //設置AQS的同步狀態為-1,禁止中斷,直到調用runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //通過線程工廠來創建一個線程,將自身作為Runnable傳遞傳遞
  }
  public void run() {
    runWorker(this); //運行工作線程
  }
}

ThreadPoolExecutor#runWorker,在此方法中,Worker在執行完任務后,還會循環獲取任務隊列里的任務執行(其中的getTask方法),也就是說Worker不僅僅是在執行完給它的任務就釋放或者結束,它不會閑著,而是繼續從任務隊列中獲取任務,直到任務隊列中沒有任務可執行時,它才退出循環完成任務。理解了以上的源碼過后,往后線程池執行原理的第二步、第三步的理解實則水到渠成。

以上這篇ThreadPoolExecutor線程池原理及其execute方法(詳解)就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产精品www视频免费看 | freesex 18 19处xx | 国产成人啪精品视频站午夜 | 欧美8x8x | 婷婷综合缴情亚洲五月伊 | 羲义嫁密着中出交尾gvg794 | 亚洲免费视频播放 | 欧美亚洲欧美 | 婷婷色综合网 | 国产主播99| 村妇超级乱淫伦小说全集 | 91久久精品青青草原伊人 | 日本色频 | 亚洲精品在线免费观看视频 | 韩国一大片a毛片女同 | chinese国产人妖videos | 青草久久伊人 | 日韩影院在线观看 | 肉文高h调教 | 欧美贵妇videos办公室360 | 国产18在线 | 日本五十路六十30人8时间 | 精品成人一区二区三区免费视频 | 精品国产自在现线久久 | 欧美成人精品第一区二区三区 | 九九在线精品视频 | 性做久久久久久久 | 精品久久成人免费第三区 | 亚洲香蕉伊在人在线观婷婷 | 日韩国产欧美一区二区三区 | 成年看片免费高清观看 | 添逼逼视频 | 亚洲日韩中文字幕一区 | 星球大战成人h无删减版 | 国产精品福利久久2020 | 99视频九九精品视频在线观看 | 国产一区二区在线看 | 韩日视频在线 | 免费观看无遮挡www的小视频 | 欧美日韩看看2015永久免费 | 男人综合网 |