executors
executors 是一個java中的工具類. 提供工廠方法來創建不同類型的線程池.
從上圖中也可以看出, executors的創建線程池的方法, 創建出來的線程池都實現了 executorservice
接口. 常用方法有以下幾個:
-
newfixedthreadpool(int threads)
: 創建固定數目線程的線程池, 超出的線程會在隊列中等待. -
newcachedthreadpool()
: 創建一個可緩存線程池, 如果線程池長度超過處理需要, 可靈活回收空閑線程(60秒), 若無可回收,則新建線程. -
newsinglethreadexecutor()
: 創建一個單線程化的線程池, 它只會用唯一的工作線程來執行任務, 保證所有任務按照指定順序(fifo, lifo, 優先級)執行. 如果某一個任務執行出錯, 將有另一個線程來繼續執行. -
newscheduledthreadpool(int corepoolsize)
: 創建一個支持定時及周期性的任務執行的線程池, 多數情況下可用來替代timer類.
executors 例子
newcachedthreadpool
線程最大數為 integer.max_value
, 當我們往線程池添加了 n 個任務, 這 n 個任務都是一起執行的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice cachedthreadpool = executors.newcachedthreadpool(); cachedthreadpool.execute( new runnable() { @override public void run() { for (;;) { try { thread.currentthread().sleep( 1000 ); system.out.println(thread.currentthread().getname()); } catch (interruptedexception e) { e.printstacktrace(); } } } }); cachedthreadpool.execute( new runnable() { @override public void run() { for (;;) { try { thread.currentthread().sleep( 1000 ); system.out.println(thread.currentthread().getname()); } catch (interruptedexception e) { e.printstacktrace(); } } } }); |
newfixedthreadpool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice cachedthreadpool = executors.newfixedthreadpool( 1 ); cachedthreadpool.execute( new runnable() { @override public void run() { for (;;) { try { thread.currentthread().sleep( 1000 ); system.out.println(thread.currentthread().getname()); } catch (interruptedexception e) { e.printstacktrace(); } } } }); cachedthreadpool.execute( new runnable() { @override public void run() { for (;;) { try { thread.currentthread().sleep( 1000 ); system.out.println(thread.currentthread().getname()); } catch (interruptedexception e) { e.printstacktrace(); } } } }); |
newscheduledthreadpool
三秒執行一次, 只有執行完這一次后, 才會執行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
scheduledexecutorservice scheduledexecutorservice = executors.newscheduledthreadpool( 5 ); scheduledexecutorservice.schedule( new runnable() { @override public void run() { for (;;) { try { thread.currentthread().sleep( 2000 ); system.out.println(thread.currentthread().getname()); } catch (interruptedexception e) { e.printstacktrace(); } } } }, 3 , timeunit.seconds); |
newsinglethreadexecutor
順序執行各個任務, 第一個任務執行完, 才會執行下一個.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
executorservice executorservice = executors.newsinglethreadexecutor(); executorservice.execute( new runnable() { @override public void run() { for (;;) { try { system.out.println(thread.currentthread().getname()); thread.currentthread().sleep( 10000 ); } catch (interruptedexception e) { e.printstacktrace(); } } } }); executorservice.execute( new runnable() { @override public void run() { for (;;) { try { system.out.println(thread.currentthread().getname()); thread.currentthread().sleep( 2 ); } catch (interruptedexception e) { e.printstacktrace(); } } } }); |
executors存在什么問題
在阿里巴巴java開發手冊中提到,使用executors創建線程池可能會導致oom(outofmemory ,內存溢出),但是并沒有說明為什么,那么接下來我們就來看一下到底為什么不允許使用executors?
我們先來一個簡單的例子,模擬一下使用executors導致oom的情況.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/** * @author hollis */ public class executorsdemo { private static executorservice executor = executors.newfixedthreadpool( 15 ); public static void main(string[] args) { for ( int i = 0 ; i < integer.max_value; i++) { executor.execute( new subthread()); } } } class subthread implements runnable { @override public void run() { try { thread.sleep( 10000 ); } catch (interruptedexception e) { //do nothing } } } |
通過指定jvm參數:-xmx8m -xms8m
運行以上代碼,會拋出oom:
exception in thread "main" java.lang.outofmemoryerror: gc overhead limit exceeded
at java.util.concurrent.linkedblockingqueue.offer(linkedblockingqueue.java:416)
at java.util.concurrent.threadpoolexecutor.execute(threadpoolexecutor.java:1371)
at com.hollis.executorsdemo.main(executorsdemo.java:16)
以上代碼指出,executorsdemo.java 的第16行,就是代碼中的 executor.execute(new subthread());
java中的 blockingqueue
主要有兩種實現, 分別是 arrayblockingqueue
和 linkedblockingqueue
.
arrayblockingqueue
是一個用數組實現的有界阻塞隊列, 必須設置容量.
1
2
3
4
5
6
7
8
|
public arrayblockingqueue( int capacity, boolean fair) { if (capacity <= 0 ) throw new illegalargumentexception(); this .items = new object[capacity]; lock = new reentrantlock(fair); notempty = lock.newcondition(); notfull = lock.newcondition(); } |
linkedblockingqueue
是一個用鏈表實現的有界阻塞隊列, 容量可以選擇進行設置, 不設置的話, 將是一個無邊界的阻塞隊列, 最大長度為 integer.max_value
.
1
2
3
|
public linkedblockingqueue() { this (integer.max_value); } |
這里的問題就出在如果我們不設置 linkedblockingqueue
的容量的話, 其默認容量將會是 integer.max_value
.
而 newfixedthreadpool
中創建 linkedblockingqueue
時, 并未指定容量. 此時, linkedblockingqueue
就是一個無邊界隊列, 對于一個無邊界隊列來說, 是可以不斷的向隊列中加入任務的, 這種情況下就有可能因為任務過多而導致內存溢出問題.
newcachedthreadpool
和 newscheduledthreadpool
這兩種方式創建的最大線程數可能是integer.max_value
, 而創建這么多線程, 必然就有可能導致oom.
threadpoolexecutor 創建線程池
避免使用 executors
創建線程池, 主要是避免使用其中的默認實現, 那么我們可以自己直接調用 threadpoolexecutor
的構造函數來自己創建線程池. 在創建的同時, 給 blockqueue
指定容量就可以了.
1
2
3
|
executorservice executor = new threadpoolexecutor( 10 , 10 , 60l, timeunit.seconds, new arrayblockingqueue( 10 )); |
這種情況下, 一旦提交的線程數超過當前可用線程數時, 就會拋出 java.util.concurrent.rejectedexecutionexception
, 這是因為當前線程池使用的隊列是有邊界隊列, 隊列已經滿了便無法繼續處理新的請求.
除了自己定義 threadpoolexecutor
外. 還有其他方法. 如apache和guava等.
四個構造函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory) public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, rejectedexecutionhandler handler) public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) |
int corepoolsize => 該線程池中核心線程數最大值
線程池新建線程的時候,如果當前線程總數小于corepoolsize, 則新建的是核心線程, 如果超過corepoolsize, 則新建的是非核心線程
核心線程默認情況下會一直存活在線程池中, 即使這個核心線程啥也不干(閑置狀態).
如果指定 threadpoolexecutor 的 allowcorethreadtimeout
這個屬性為 true
, 那么核心線程如果不干活(閑置狀態)的話, 超過一定時間(時長下面參數決定), 就會被銷毀掉
很好理解吧, 正常情況下你不干活我也養你, 因為我總有用到你的時候, 但有時候特殊情況(比如我自己都養不起了), 那你不干活我就要把你干掉了
int maximumpoolsize
該線程池中線程總數最大值
線程總數 = 核心線程數 + 非核心線程數.
long keepalivetime
該線程池中非核心線程閑置超時時長
一個非核心線程, 如果不干活(閑置狀態)的時長超過這個參數所設定的時長, 就會被銷毀掉
如果設置 allowcorethreadtimeout = true
, 則會作用于核心線程
timeunit unit
keepalivetime的單位, timeunit是一個枚舉類型, 其包括:
1
2
3
4
5
6
7
|
timeunit.days; //天 timeunit.hours; //小時 timeunit.minutes; //分鐘 timeunit.seconds; //秒 timeunit.milliseconds; //毫秒 timeunit.microseconds; //微妙 timeunit.nanoseconds; //納秒 |
blockingqueue workqueue
一個阻塞隊列, 用來存儲等待執行的任務. 也就是說現在有10個任務, 核心線程 有四個, 非核心線程有六個, 那么這六個線程會被添加到 workqueue
中, 等待執行.
這個參數的選擇也很重要, 會對線程池的運行過程產生重大影響, 一般來說, 這里的阻塞隊列有以下幾種選擇:
synchronousqueue
: 這個隊列接收到任務的時候, 會直接提交給線程處理, 而不保留它, 如果所有線程都在工作怎么辦? 那就*新建一個線程來處理這個任務!所以為了保證不出現<線程數達到了maximumpoolsize而不能新建線程>的錯誤, 使用這個類型隊列的時候, maximumpoolsize
一般指定成 integer.max_value
, 即無限大.
linkedblockingqueue
: 這個隊列接收到任務的時候, 如果當前線程數小于核心線程數, 則核心線程處理任務; 如果當前線程數等于核心線程數, 則進入隊列等待. 由于這個隊列最大值為 integer.max_value
, 即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了 maximumpoolsize
的設定失效, 因為總線程數永遠不會超過 corepoolsize.
arrayblockingqueue
: 可以限定隊列的長度, 接收到任務的時候, 如果沒有達到 corepoolsize 的值, 則核心線程執行任務, 如果達到了, 則入隊等候, 如果隊列已滿, 則新建線程(非核心線程)執行任務, 又如果總線程數到了maximumpoolsize, 并且隊列也滿了, 則發生錯誤.
delayqueue
: 隊列內元素必須實現 delayed 接口, 這就意味著你傳進去的任務必須先實現delayed接口. 這個隊列接收到任務時, 首先先入隊, 只有達到了指定的延時時間, 才會執行任務.
threadfactory threadfactory
它是threadfactory類型的變量, 用來創建新線程.
默認使用 executors.defaultthreadfactory()
來創建線程. 使用默認的 threadfactory
來創建線程時, 會使新創建的線程具有相同的 norm_priority
優先級并且是非守護線程, 同時也設置了線程的名稱.
rejectedexecutionhandler handler
表示當拒絕處理任務時的策略, 有以下四種取值:
threadpoolexecutor.abortpolicy:丟棄任務并拋出rejectedexecutionexception異常(默認).
threadpoolexecutor.discardpolicy:直接丟棄任務, 但是不拋出異常.
threadpoolexecutor.discardoldestpolicy:丟棄隊列最前面的任務, 然后重新嘗試執行任務(重復此過程)
threadpoolexecutor.callerrunspolicy:用調用者所在的線程來執行任務.
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。