一、簡介
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力,但頻繁的創建線程的開銷是很大的,那么如何來減少這部分的開銷了,那么就要考慮使用線程池了。線程池就是一個線程的容器,每次只執行額定數量的線程,線程池就是用來管理這些額定數量的線程。
二、涉及線程池的類結構圖
其中供我們使用的,主要是threadpoolexecutor類。
三、如何創建線程池
我們創建線程池一般有以下幾種方法:
1、使用executors工廠類
executors主要提供了下面幾種創建線程池的方法:
下面來看下使用示例:
1)newfixedthreadpool(固定大小的線程池)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class fixedthreadpool { public static void main(string[] args) { executorservice pool = executors.newfixedthreadpool( 5 ); // 創建一個固定大小為5的線程池 for ( int i = 0 ; i < 10 ; i++) { pool.submit( new mythread()); } pool.shutdown(); } } public class mythread extends thread { @override public void run() { system.out.println(thread.currentthread().getname() + "正在執行。。。" ); } } |
測試結果如下:
pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-4正在執行。。。
固定大小的線程池:每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程線。
2)newsinglethreadexecutor(單線程的線程池)
1
2
3
4
5
6
7
8
9
|
public class singlethreadpool { public static void main(string[] args) { executorservice pool=executors.newsinglethreadexecutor(); //創建一個單線程池 for ( int i= 0 ;i< 100 ;i++){ pool.submit( new mythread()); } pool.shutdown(); } } |
測試結果如下:
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
單線程的線程池:這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
3)newscheduledthreadpool
1
2
3
4
5
6
7
8
9
10
11
12
|
public class scheduledthreadpool { public static void main(string[] args) { scheduledexecutorservice pool=executors.newscheduledthreadpool( 6 ); for ( int i= 0 ;i< 10000 ;i++){ pool.submit( new mythread()); } pool.schedule( new mythread(), 1000 , timeunit.milliseconds); pool.schedule( new mythread(), 1000 , timeunit.milliseconds); pool.shutdown(); } } |
測試結果如下:
pool-1-thread-1正在執行。。。
pool-1-thread-6正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-6正在執行。。。
pool-1-thread-1正在執行。。。
…………此處會延時1s…………
pool-1-thread-4正在執行。。。
pool-1-thread-1正在執行。。。
測試結果的最后兩個線程都是在延時1s之后,才開始執行的。此線程池支持定時以及周期性執行任務的需求
4)newcachedthreadpool(可緩存的線程池)
1
2
3
4
5
6
7
8
9
|
public class cachedthreadpool { public static void main(string[] args) { executorservice pool=executors.newcachedthreadpool(); for ( int i= 0 ;i< 100 ;i++){ pool.submit( new mythread()); } pool.shutdown(); } } |
測試結果如下:
pool-1-thread-5正在執行。。。
pool-1-thread-7正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-17正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-7正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-18正在執行。。。
pool-1-thread-10正在執行。。。
可緩存的線程池:如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說jvm)能夠創建的最大線程大小。
官方建議程序員使用較為方便的executors工廠方法executors.newcachedthreadpool()(無界線程池,可以進行自動線程回收)、executors.newfixedthreadpool(int)(固定大小線程池)executors.newsinglethreadexecutor()(單個后臺線程),這幾種線程池均為大多數使用場景預定義了默認配置。
2、繼承threadpoolexecutor類,并復寫父類的構造方法。
在介紹這種方式之前,我們來分析下前面幾個創建線程池的底層代碼是怎樣的?
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class executors { public static executorservice newfixedthreadpool( int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); } public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice ( new threadpoolexecutor( 1 , 1 , 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>())); } } |
從executors工廠類的底層代碼可以看出,工廠類提供的創建線程池的方法,其實都是通過構造threadpoolexecutor來實現的。threadpoolexecutor構造方法代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0 ) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null ) throw new nullpointerexception(); this .corepoolsize = corepoolsize; this .maximumpoolsize = maximumpoolsize; this .workqueue = workqueue; this .keepalivetime = unit.tonanos(keepalivetime); this .threadfactory = threadfactory; this .handler = handler; } |
那么接下來,我們就來談談這個threadpoolexecutor構造方法。在這個構造方法中,主要有以下幾個參數:
corepoolsize--池中所保存的線程數,包括空閑線程。
maximumpoolsize--池中允許的最大線程數。
keepalivetime--當線程數大于corepoolsize時,此為終止空閑線程等待新任務的最長時間。
unit--keepalivetime 參數的時間單位。
workqueue--執行前用于保持任務的隊列。此隊列僅保持由 execute方法提交的 runnable任務。
threadfactory--執行程序創建新線程時使用的工廠。
handler--由于超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序。
接下來,咋們來說下這幾個參數之間的關系。當線程池剛創建的時候,線程池里面是沒有任何線程的(注意,并不是線程池一創建,里面就創建了一定數量的線程),當調用execute()方法添加一個任務時,線程池會做如下的判斷:
1)如果當前正在運行的線程數量小于corepoolsize,那么立刻創建一個新的線程,執行這個任務。
2)如果當前正在運行的線程數量大于或等于corepoolsize,那么這個任務將會放入隊列中。
3)如果線程池的隊列已經滿了,但是正在運行的線程數量小于maximumpoolsize,那么還是會創建新的線程,執行這個任務。
4)如果隊列已經滿了,且當前正在運行的線程數量大于或等于maximumpoolsize,那么線程池會根據拒絕執行策略來處理當前的任務。
5)當一個任務執行完后,線程會從隊列中取下一個任務來執行,如果隊列中沒有需要執行的任務,那么這個線程就會處于空閑狀態,如果超過了keepalivetime存活時間,則這個線程會被線程池回收(注:回收線程是有條件的,如果當前運行的線程數量大于corepoolsize的話,這個線程就會被銷毀,如果不大于corepoolsize,是不會銷毀這個線程的,線程的數量必須保持在corepoolsize數量內).為什么不是線程一空閑就回收,而是需要等到超過keepalivetime才進行線程的回收了,原因很簡單:因為線程的創建和銷毀消耗很大,更不能頻繁的進行創建和銷毀,當超過keepalivetime后,發現確實用不到這個線程了,才會進行銷毀。這其中unit表示keepalivetime的時間單位,unit的定義如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public enum timeunit { nanoseconds { // keepalivetime以納秒為單位 }, microseconds { // keepalivetime以微秒為單位 }, milliseconds { // keepalivetime以毫秒為單位 }, seconds { // keepalivetime以秒為單位 }, minutes { // keepalivetime以分鐘為單位 }, hours { // keepalivetime以小時為單位 }, days { // keepalivetime以天為單位 }; |
下面從源碼來分析一下,對于上面的幾種情況,主要涉及到的源碼有以下幾塊:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private boolean addifundercorepoolsize(runnable firsttask) { thread t = null ; final reentrantlock mainlock = this .mainlock; mainlock.lock(); try { if (poolsize < corepoolsize && runstate == running) t = addthread(firsttask); } finally { mainlock.unlock(); } if (t == null ) return false ; t.start(); return true ; } |
其實,這段代碼很簡單,主要描述的就是,如果當前的線程池小于corepoolsize的時候,是直接新建一個線程來處理任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private boolean addifundermaximumpoolsize(runnable firsttask) { thread t = null ; final reentrantlock mainlock = this .mainlock; mainlock.lock(); try { if (poolsize < maximumpoolsize && runstate == running) t = addthread(firsttask); } finally { mainlock.unlock(); } if (t == null ) return false ; t.start(); return true ; } |
上面這段代碼描述的是,如果當前線程池的數量小于maximumpoolsize的時候,也會創建一個線程,來執行任務
四、線程池的隊列
線程池的隊列,總的來說有3種:
直接提交:工作隊列的默認選項是 synchronousqueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumpoolsizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
無界隊列:使用無界隊列(例如,不具有預定義容量的 linkedblockingqueue)將導致在所有 corepoolsize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corepoolsize。(因此,maximumpoolsize的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列;例如,在 web頁服務器中。這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
有界隊列:當使用有限的 maximumpoolsizes時,有界隊列(如 arrayblockingqueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 cpu 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 i/o邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,cpu使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
下面就來說下線程池的隊列,類結構圖如下:
1)synchronousqueue
該隊列對應的就是上面所說的直接提交,首先synchronousqueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由于該queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加。
2)linkedblockingqueue
該隊列對應的就是上面的無界隊列。
3)arrayblockingqueue
該隊列對應的就是上面的有界隊列。arrayblockingqueue有以下3中構造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public arrayblockingqueue( int capacity) { this (capacity, false ); } public arrayblockingqueue( int capacity, boolean fair) { if (capacity <= 0 ) throw new illegalargumentexception(); this .items = (e[]) new object[capacity]; lock = new reentrantlock(fair); notempty = lock.newcondition(); notfull = lock.newcondition(); } public arrayblockingqueue( int capacity, boolean fair, collection<? extends e> c) { this (capacity, fair); if (capacity < c.size()) throw new illegalargumentexception(); for (iterator<? extends e> it = c.iterator(); it.hasnext();) add(it.next()); } |
下面我們重點來說下這個fair,fair表示隊列訪問線程的競爭策略,當為true的時候,任務插入隊列遵從fifo的規則,如果為false,則可以“插隊”。舉個例子,假如現在有很多任務在排隊,這個時候正好一個線程執行完了任務,同時又新來了一個任務,如果為false的話,這個任務不用在隊列中排隊,可以直接插隊,然后執行。如下圖所示:
五、線程池的拒絕執行策略
當線程的數量達到最大值時,這個時候,任務還在不斷的來,這個時候,就只好拒絕接受任務了。
threadpoolexecutor 允許自定義當添加任務失敗后的執行策略。你可以調用線程池的 setrejectedexecutionhandler() 方法,用自定義的rejectedexecutionhandler 對象替換現有的策略,threadpoolexecutor提供的默認的處理策略是直接丟棄,同時拋異常信息,threadpoolexecutor 提供 4 個現有的策略,分別是:
threadpoolexecutor.abortpolicy:表示拒絕任務并拋出異常,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public static class abortpolicy implements rejectedexecutionhandler { /** * creates an <tt>abortpolicy</tt>. */ public abortpolicy() { } /** * always throws rejectedexecutionexception. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws rejectedexecutionexception always. */ public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception(); //拋異常 } } |
threadpoolexecutor.discardpolicy:表示拒絕任務但不做任何動作,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public static class discardpolicy implements rejectedexecutionhandler { /** * creates a <tt>discardpolicy</tt>. */ public discardpolicy() { } /** * does nothing, which has the effect of discarding task r. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedexecution(runnable r, threadpoolexecutor e) { } // 直接拒絕,但不做任何操作 } |
threadpoolexecutor.callerrunspolicy:表示拒絕任務,并在調用者的線程中直接執行該任務,源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public static class callerrunspolicy implements rejectedexecutionhandler { /** * creates a <tt>callerrunspolicy</tt>. */ public callerrunspolicy() { } /** * executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { r.run(); // 直接執行任務 } } } |
threadpoolexecutor.discardoldestpolicy:表示先丟棄任務隊列中的第一個任務,然后把這個任務加進隊列。源碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
public static class discardoldestpolicy implements rejectedexecutionhandler { /** * creates a <tt>discardoldestpolicy</tt> for the given executor. */ public discardoldestpolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { e.getqueue().poll(); // 丟棄隊列中的第一個任務 e.execute(r); // 執行新任務 } } } |
當任務源源不斷到來的時候,會從queue中poll一個任務出來,然后執行新的任務
總結
以上所述是小編給大家介紹的jdk自帶線程池詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:http://blog.csdn.net/liuchuanhong1/article/details/52042182