java中Executor,ExecutorService,ThreadPoolExecutor詳解
1.Excutor
源碼非常簡(jiǎn)單,只有一個(gè)execute(Runnable command)回調(diào)接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution. * @throws NullPointerException if command is null */ void execute(Runnable command); } |
執(zhí)行已提交的 Runnable 任務(wù)對(duì)象。此接口提供一種將任務(wù)提交與每個(gè)任務(wù)將如何運(yùn)行的機(jī)制(包括線程使用的細(xì)節(jié)、調(diào)度等)分離開(kāi)來(lái)的方法。通常使用 Executor 而不是顯式地創(chuàng)建線程。例如,可能會(huì)使用以下方法,而不是為一組任務(wù)中的每個(gè)任務(wù)調(diào)用 new Thread(new(RunnableTask())).start():
1
2
3
4
|
Executor executor = anExecutor; executor.execute( new RunnableTask1()); executor.execute( new RunnableTask2()); ... |
不過(guò),Executor 接口并沒(méi)有嚴(yán)格地要求執(zhí)行是異步的。在最簡(jiǎn)單的情況下,執(zhí)行程序可以在調(diào)用方的線程中立即運(yùn)行已提交的任務(wù):
1
2
3
4
5
|
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } } |
更常見(jiàn)的是,任務(wù)是在某個(gè)不是調(diào)用方線程的線程中執(zhí)行的。以下執(zhí)行程序?qū)槊總€(gè)任務(wù)生成一個(gè)新線程。
1
2
3
4
5
|
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } } |
許多 Executor 實(shí)現(xiàn)都對(duì)調(diào)度任務(wù)的方式和時(shí)間強(qiáng)加了某種限制。以下執(zhí)行程序使任務(wù)提交與第二個(gè)執(zhí)行程序保持連續(xù),這說(shuō)明了一個(gè)復(fù)合執(zhí)行程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this .executor = executor; } public synchronized void execute( final Runnable r) { tasks.offer( new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null ) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null ) { executor.execute(active); } } } |
2.ExcutorService接口
ExecutorService提供了管理終止的方法,以及可為跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況而生成 Future 的方法。
可以關(guān)閉 ExecutorService,這將導(dǎo)致其拒絕新任務(wù)。提供兩個(gè)方法來(lái)關(guān)閉 ExecutorService。
shutdown()方法在終止前允許執(zhí)行以前提交的任務(wù),而 shutdownNow() 方法阻止等待任務(wù)的啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)。在終止后,執(zhí)行程序沒(méi)有任務(wù)在執(zhí)行,也沒(méi)有任務(wù)在等待執(zhí)行,并且無(wú)法提交新任務(wù)。應(yīng)該關(guān)閉未使用的 ExecutorService以允許回收其資源。
通過(guò)創(chuàng)建并返回一個(gè)可用于取消執(zhí)行和/或等待完成的 Future,方法submit擴(kuò)展了基本方法 Executor.execute(java.lang.Runnable)。
方法 invokeAny 和 invokeAll 是批量執(zhí)行的最常用形式,它們執(zhí)行任務(wù) collection,然后等待至少一個(gè),
或全部任務(wù)完成(可使用 ExecutorCompletionService類來(lái)編寫(xiě)這些方法的自定義變體)。
Executors類為創(chuàng)建ExecutorService提供了便捷的工廠方法。
注意1:它只有一個(gè)直接實(shí)現(xiàn)類ThreadPoolExecutor和間接實(shí)現(xiàn)類ScheduledThreadPoolExecutor。
關(guān)于ThreadPoolExecutor的更多內(nèi)容請(qǐng)參考《ThreadPoolExecutor》
關(guān)于ScheduledThreadPoolExecutor的更多內(nèi)容請(qǐng)參考《ScheduledThreadPoolExecutor》
用法示例
下面給出了一個(gè)網(wǎng)絡(luò)服務(wù)的簡(jiǎn)單結(jié)構(gòu),這里線程池中的線程作為傳入的請(qǐng)求。它使用了預(yù)先配置的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
Executors.newFixedThreadPool( int ) 工廠方法: class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService( int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute( new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this .socket = socket; } public void run() { // read and service request on socket } } |
下列方法分兩個(gè)階段關(guān)閉 ExecutorService。第一階段調(diào)用 shutdown 拒絕傳入任務(wù),然后等60秒后,任務(wù)還沒(méi)執(zhí)行完成,就調(diào)用 shutdownNow(如有必要)取消所有遺留的任務(wù):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination( 60 , TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination( 60 , TimeUnit.SECONDS)) System.err.println( "Pool did not terminate" ); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } |
內(nèi)存一致性效果:線程中向 ExecutorService 提交 Runnable 或 Callable 任務(wù)之前的操作 happen-before 由該任務(wù)所提取的所有操作,
后者依次 happen-before 通過(guò) Future.get() 獲取的結(jié)果。
主要函數(shù):
void shutdown()
啟動(dòng)一個(gè)關(guān)閉命令,不再接受新任務(wù),當(dāng)所有已提交任務(wù)執(zhí)行完后,就關(guān)閉。如果已經(jīng)關(guān)閉,則調(diào)用沒(méi)有其他作用。
拋出:
SecurityException - 如果安全管理器存在并且關(guān)閉,此 ExecutorService 可能操作某些不允許調(diào)用者修改的線程(因?yàn)樗鼪](méi)有保持 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒絕訪問(wèn)。
List<Runnable> shutdownNow()
試圖停止所有正在執(zhí)行的活動(dòng)任務(wù),暫停處理正在等待的任務(wù),并返回等待執(zhí)行的任務(wù)列表。
無(wú)法保證能夠停止正在處理的活動(dòng)執(zhí)行任務(wù),但是會(huì)盡力嘗試。例如,通過(guò) Thread.interrupt() 來(lái)取消典型的實(shí)現(xiàn),所以任何任務(wù)無(wú)法響應(yīng)中斷都可能永遠(yuǎn)無(wú)法終止。
返回:
從未開(kāi)始執(zhí)行的任務(wù)的列表
拋出:
SecurityException - 如果安全管理器存在并且關(guān)閉,
此 ExecutorService 可能操作某些不允許調(diào)用者修改的線程(因?yàn)樗鼪](méi)有保持 RuntimePermission("modifyThread")),
或者安全管理器的 checkAccess 方法拒絕訪問(wèn)。
注意1: 它會(huì)返回等待執(zhí)行的任務(wù)列表。
注意2: 無(wú)法保證能夠停止正在處理的活動(dòng)執(zhí)行任務(wù),但是會(huì)盡力嘗試。例如,通過(guò) Thread.interrupt() 來(lái)取消,
所以任何任務(wù)無(wú)法響應(yīng)中斷都可能永遠(yuǎn)無(wú)法終止。
boolean isShutdown()
如果此執(zhí)行程序已關(guān)閉,則返回 true。
返回:
如果此執(zhí)行程序已關(guān)閉,則返回 true
boolean isTerminated()
如果關(guān)閉后所有任務(wù)都已完成,則返回 true。注意,除非首先調(diào)用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。
返回:
如果關(guān)閉后所有任務(wù)都已完成,則返回 true
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException
等待(阻塞)直到關(guān)閉或最長(zhǎng)等待時(shí)間或發(fā)生中斷
參數(shù):
timeout - 最長(zhǎng)等待時(shí)間
unit - timeout 參數(shù)的時(shí)間單位
返回:
如果此執(zhí)行程序終止,則返回 true;如果終止前超時(shí)期滿,則返回 false
拋出:
InterruptedException - 如果等待時(shí)發(fā)生中斷
注意1:如果此執(zhí)行程序終止(關(guān)閉),則返回 true;如果終止前超時(shí)期滿,則返回 false
<T> Future<T> submit(Callable<T> task)
提交一個(gè)返回值的任務(wù)用于執(zhí)行,返回一個(gè)表示任務(wù)的未決結(jié)果的 Future。該 Future 的 get 方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。
如果想立即阻塞任務(wù)的等待,則可以使用 result = exec.submit(aCallable).get(); 形式的構(gòu)造。
注:Executors 類包括了一組方法,可以轉(zhuǎn)換某些其他常見(jiàn)的類似于閉包的對(duì)象,
例如,將 PrivilegedAction 轉(zhuǎn)換為 Callable 形式,這樣就可以提交它們了。
參數(shù):
task - 要提交的任務(wù)
返回:
表示任務(wù)等待完成的 Future
拋出:
RejectedExecutionException - 如果任務(wù)無(wú)法安排執(zhí)行
NullPointerException - 如果該任務(wù)為 null
注意:關(guān)于submit的使用和Callable可以參閱《使用Callable返回結(jié)果》
<T> Future<T> submit(Runnable task,T result)
提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該 Future 的 get 方法在成功完成時(shí)將會(huì)返回給定的結(jié)果。
參數(shù):
task - 要提交的任務(wù)
result - 返回的結(jié)果
返回:
表示任務(wù)等待完成的 Future
拋出:
RejectedExecutionException - 如果任務(wù)無(wú)法安排執(zhí)行
NullPointerException - 如果該任務(wù)為 null
注意:關(guān)于submit的使用可以參閱《Callable》
Future<?> submit(Runnable task)
提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。該 Future 的 get 方法在成功 完成時(shí)將會(huì)返回 null。
參數(shù):
task - 要提交的任務(wù)
返回:
表示任務(wù)等待完成的 Future
拋出:
RejectedExecutionException - 如果任務(wù)無(wú)法安排執(zhí)行
NullPointerException - 如果該任務(wù)為 null
注意:關(guān)于submit的使用可以參閱《使用Callable返回結(jié)果》
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
執(zhí)行給定的任務(wù),當(dāng)所有任務(wù)完成時(shí),返回保持任務(wù)狀態(tài)和結(jié)果的 Future 列表。返回列表的所有元素的 Future.isDone() 為 true。
注意,可以正常地或通過(guò)拋出異常來(lái)終止已完成 任務(wù)。如果正在進(jìn)行此操作時(shí)修改了給定的 collection,則此方法的結(jié)果是不確定的。
參數(shù):
tasks - 任務(wù) collection
返回:
表示任務(wù)的 Future 列表,列表順序與給定任務(wù)列表的迭代器所生成的順序相同,每個(gè)任務(wù)都已完成。
拋出:
InterruptedException - 如果等待時(shí)發(fā)生中斷,在這種情況下取消尚未完成的任務(wù)。
NullPointerException - 如果任務(wù)或其任意元素為 null
RejectedExecutionException - 如果所有任務(wù)都無(wú)法安排執(zhí)行
注意1:該方法會(huì)一直阻塞直到所有任務(wù)完成。
1
2
3
4
|
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException |
執(zhí)行給定的任務(wù),當(dāng)所有任務(wù)完成或超時(shí)期滿時(shí)(無(wú)論哪個(gè)首先發(fā)生),返回保持任務(wù)狀態(tài)和結(jié)果的 Future 列表。返回列表的所有元素的 Future.isDone() 為 true。一旦返回后,即取消尚未完成的任務(wù)。注意,可以正常地或通過(guò)拋出異常來(lái)終止已完成 任務(wù)。如果此操作正在進(jìn)行時(shí)修改了給定的 collection,則此方法的結(jié)果是不確定的。
參數(shù):
tasks - 任務(wù) collection
timeout - 最長(zhǎng)等待時(shí)間
unit - timeout 參數(shù)的時(shí)間單位
返回:
表示任務(wù)的 Future 列表,列表順序與給定任務(wù)列表的迭代器所生成的順序相同。
如果操作未超時(shí),則已完成所有任務(wù)。如果確實(shí)超時(shí)了,則某些任務(wù)尚未完成。
拋出:
InterruptedException - 如果等待時(shí)發(fā)生中斷,在這種情況下取消尚未完成的任務(wù)
NullPointerException - 如果任務(wù)或其任意元素或 unit 為 null
RejectedExecutionException - 如果所有任務(wù)都無(wú)法安排執(zhí)行
注意1:該方法會(huì)一直阻塞直到所有任務(wù)完成或超時(shí)。
注意2:如果確實(shí)超時(shí)了,則某些任務(wù)尚未完成。【那么這些尚未完成的任務(wù)應(yīng)該被系統(tǒng)取消】。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException,
ExecutionException
執(zhí)行給定的任務(wù),如果某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果。一旦正常或異常返回后,則取消尚未完成的任務(wù)。
如果此操作正在進(jìn)行時(shí)修改了給定的 collection,則此方法的結(jié)果是不確定的。
參數(shù):
tasks - 任務(wù) collection
返回:
某個(gè)任務(wù)返回的結(jié)果
拋出:
InterruptedException - 如果等待時(shí)發(fā)生中斷
NullPointerException - 如果任務(wù)或其任意元素為 null
IllegalArgumentException - 如果任務(wù)為空
ExecutionException - 如果沒(méi)有任務(wù)成功完成
RejectedExecutionException - 如果任務(wù)無(wú)法安排執(zhí)行
注意1:該方法會(huì)一直阻塞直到有一個(gè)任務(wù)完成。
注意2:一旦正常或異常返回后,則取消尚未完成的任務(wù)
1
2
3
4
5
6
|
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException |
執(zhí)行給定的任務(wù),如果在給定的超時(shí)期滿前某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果。一旦正常或異常返回后,則取消尚未完成的任務(wù)。如果此操作正在進(jìn)行時(shí)修改了給定的 collection,則此方法的結(jié)果是不確定的。
參數(shù):
tasks - 任務(wù) collection
timeout - 最長(zhǎng)等待時(shí)間
unit - timeout 參數(shù)的時(shí)間單位
返回:
某個(gè)任務(wù)返回的結(jié)果
拋出:
InterruptedException - 如果等待時(shí)發(fā)生中斷
NullPointerException - 如果任務(wù)或其任意元素或 unit 為 null
TimeoutException - 如果在所有任務(wù)成功完成之前給定的超時(shí)期滿
ExecutionException - 如果沒(méi)有任務(wù)成功完成
RejectedExecutionException - 如果任務(wù)無(wú)法安排執(zhí)行
注意1:該方法會(huì)一直阻塞直到有一個(gè)任務(wù)完成。
注意2:一旦正常或異常返回后,則取消尚未完成的任務(wù)
3.ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService的一個(gè)實(shí)現(xiàn)類,它使用可能的幾個(gè)池線程之一執(zhí)行每個(gè)提交的任務(wù),通常使用 Executors 工廠方法配置。
線程池可以解決兩個(gè)不同問(wèn)題:由于減少了每個(gè)任務(wù)調(diào)用的開(kāi)銷,它們通常可以在執(zhí)行大量異步任務(wù)時(shí)提供增強(qiáng)的性能,并且還可以提供綁定和管理資源(包括執(zhí)行任務(wù)集時(shí)使用的線程)的方法。
每個(gè) ThreadPoolExecutor 還維護(hù)著一些基本的統(tǒng)計(jì)數(shù)據(jù),如完成的任務(wù)數(shù)。為了便于跨大量上下文使用,此類提供了很多可調(diào)整的參數(shù)和擴(kuò)展鉤子 (hook)。
但是,強(qiáng)烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無(wú)界線程池,可以進(jìn)行自動(dòng)線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個(gè)后臺(tái)線程),
它們均為大多數(shù)使用場(chǎng)景預(yù)定義了設(shè)置。否則,在手動(dòng)配置和調(diào)整此類時(shí),使用以下指導(dǎo):
核心和最大池大小
ThreadPoolExecutor將根據(jù)corePoolSize(參見(jiàn) getCorePoolSize())和 maximumPoolSize(參見(jiàn) getMaximumPoolSize())
設(shè)置的邊界自動(dòng)調(diào)整池大小。當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時(shí),如果運(yùn)行的線程少于 corePoolSize, 則創(chuàng)建新線程來(lái)處理請(qǐng)求,即使有線程是空閑的。
如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程。
如果設(shè)置的 corePoolSize 和 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。
如果將 maximumPoolSize 設(shè)置為基本的無(wú)界值(如 Integer.MAX_VALUE),則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)。
在大多數(shù)情況下,核心和最大池大小僅基于構(gòu)造來(lái)設(shè)置,不過(guò)也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進(jìn)行動(dòng)態(tài)更改。
注意1:在新任務(wù)被提交時(shí),如果運(yùn)行的core線程少于corePoolSize,才創(chuàng)建新core線程。并不是一開(kāi)始就創(chuàng)建corePoolSize個(gè)core線程。
注意2:"如果運(yùn)行的線程多于corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程"
按需構(gòu)造
核心線程最初只是在新任務(wù)到達(dá)時(shí)才被ThreadPoolExecutor創(chuàng)建和啟動(dòng)的,
但是也可以手動(dòng)調(diào)用方法 prestartCoreThread() 或 prestartAllCoreThreads()來(lái)的提前啟動(dòng)核心線程。
如果構(gòu)造帶有非空隊(duì)列的池,這時(shí)則可能希望預(yù)先啟動(dòng)線程。
注意1:核心線程即core線程,只有當(dāng)前線程數(shù)小于等于corePoolSize時(shí),這時(shí)的線程才叫核心線程。
創(chuàng)建新線程
使用ThreadFactory創(chuàng)建新線程。如果沒(méi)有另外說(shuō)明,則使用 Executors.defaultThreadFactory() 創(chuàng)建線程,他們?cè)谕粋€(gè)ThreadGroup中
并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)。
通過(guò)提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優(yōu)先級(jí)、守護(hù)進(jìn)程狀態(tài),等等。
如果從 newThread 返回 null 時(shí) ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行,但不能執(zhí)行任何任務(wù)。
注意1:可以指定創(chuàng)建線程的ThreadFactory,默認(rèn)的是使用Executors.defaultThreadFactory()來(lái)創(chuàng)建線程,所有的線程都在一個(gè)ThreadGroup中。
保持活動(dòng)時(shí)間
如果池中當(dāng)前有多于corePoolSize 的線程,則這些多出的線程在空閑時(shí)間超過(guò) keepAliveTime 時(shí)將會(huì)終止
(參見(jiàn) getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當(dāng)池處于非活動(dòng)狀態(tài)時(shí)減少資源消耗的方法。
如果池后來(lái)變得更為活動(dòng),則可以創(chuàng)建新的線程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動(dòng)態(tài)地更改此參數(shù)。
如果把值設(shè)為L(zhǎng)ong.MAX_VALUE TimeUnit.NANOSECONDS 的話,空閑線程不會(huì)被回收直到ThreadPoolExecutor為Terminate。
默認(rèn)情況下,保持活動(dòng)策略只在有多于corePoolSizeThreads 的線程時(shí)應(yīng)用。
但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此超時(shí)策略應(yīng)用于核心線程。
注意1:setKeepAliveTime(long, java.util.concurrent.TimeUnit)用于設(shè)置空閑線程最長(zhǎng)的活動(dòng)時(shí)間,
即如果空閑時(shí)間超過(guò)設(shè)定值,就停掉該線程,對(duì)該線程進(jìn)行回收。
該策略默認(rèn)只對(duì)非內(nèi)核線程有用(即當(dāng)前線程數(shù)大于corePoolSize),
可以調(diào)用allowCoreThreadTimeOut(boolean)方法將此超時(shí)策略擴(kuò)大到核心線程
注意2:如果把值設(shè)為L(zhǎng)ong.MAX_VALUE TimeUnit.NANOSECONDS的話,空閑線程不會(huì)被回收直到ThreadPoolExecutor為Terminate。
排隊(duì)
所有 BlockingQueue 都可用于傳輸和保持提交的任務(wù)。可以使用此隊(duì)列與池大小進(jìn)行交互:
* 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。
* 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請(qǐng)求加入隊(duì)列,而不添加新的線程。
* 如果無(wú)法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。
排隊(duì)有三種通用策略:
1. 直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。
在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。
此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。
直接提交通常要求無(wú)界maximumPoolSizes以避免拒絕新提交的任務(wù)。
當(dāng)命令以超過(guò)隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許線程無(wú)界的增長(zhǎng)。
注意1:此策略允許線程無(wú)界的增長(zhǎng)。
2. 無(wú)界隊(duì)列。使用無(wú)界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。
這樣,創(chuàng)建的線程就不會(huì)超過(guò) corePoolSize。(因此,maximumPoolSize 的值也就無(wú)效了。)
當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無(wú)界隊(duì)列;例如,在 Web 頁(yè)服務(wù)器中。
這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過(guò)隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許隊(duì)列無(wú)限的增長(zhǎng)。
注意1:此策略允許隊(duì)列無(wú)限的增長(zhǎng)。
3. 有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。
隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開(kāi)銷,
但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過(guò)您許可的更多線程安排時(shí)間。
使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開(kāi)銷,這樣也會(huì)降低吞吐量。
被拒絕的任務(wù)
當(dāng) Executor 已經(jīng)關(guān)閉,或Executor將有限邊界用于最大線程和工作隊(duì)列容量,且已經(jīng)飽和時(shí),
在方法 execute(java.lang.Runnable) 中提交的新任務(wù)將被拒絕。
在以上兩種情況下,execute 方法都將調(diào)用其
RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor) 方法。
下面提供了四種預(yù)定義的處理程序策略:
1. 在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。
2. 在 ThreadPoolExecutor.CallerRunsPolicy中,線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。
此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
3. 在ThreadPoolExecutor.DiscardPolicy中,不能執(zhí)行的任務(wù)將被刪除。
4. 在ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,
則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)。
定義和使用其他種類的RejectedExecutionHandler類也是可能的,但這樣做需要非常小心,尤其是當(dāng)策略僅用于特定容量或排隊(duì)策略時(shí)
注意1:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy都是rejectedExecution的一種實(shí)現(xiàn)。
當(dāng)然也可以自己定義個(gè)rejectedExecution實(shí)現(xiàn)。
鉤子 (hook) 方法
此類提供 protected 可重寫(xiě)的 beforeExecute(java.lang.Thread, java.lang.Runnable)
和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執(zhí)行每個(gè)任務(wù)之前和之后調(diào)用。
它們可用于操縱執(zhí)行環(huán)境;例如,重新初始化 ThreadLocal、搜集統(tǒng)計(jì)信息或添加日志條目。
此外,還可以重寫(xiě)方法 terminated() 來(lái)執(zhí)行 Executor 完全終止后需要完成的所有特殊處理。
如果鉤子 (hook) 或回調(diào)方法拋出異常,則ThreadPoolExecutor的所有線程將依次失敗并突然終止。
隊(duì)列維護(hù)
方法 getQueue() 允許出于監(jiān)控和調(diào)試目的而訪問(wèn)工作隊(duì)列。強(qiáng)烈反對(duì)出于其他任何目的而使用此方法。
remove(java.lang.Runnable) 和 purge() 這兩種方法可用于在取消大量已排隊(duì)任務(wù)時(shí)幫助進(jìn)行存儲(chǔ)回收。
注意1:如果任務(wù)取消,ThreadPoolExecutor應(yīng)該自己是可以進(jìn)行存儲(chǔ)回收的。
取消的任務(wù)不會(huì)再次執(zhí)行,但是它們可能在工作隊(duì)列中累積,直到worker線程主動(dòng)將其移除
外部使用remove(java.lang.Runnable)和purge()可以把它們立即從隊(duì)列中移除。
終止
如果ThreadPoolExecutor在程序中沒(méi)有任何引用且沒(méi)有任何活動(dòng)線程,它也不會(huì)自動(dòng) shutdown。
如果希望確保回收線程(即使用戶忘記調(diào)用 shutdown()),則必須安排未使用的線程最終終止:
設(shè)置適當(dāng)保持活動(dòng)時(shí)間,使用0核心線程的下邊界和/或設(shè)置 allowCoreThreadTimeOut(boolean)。
擴(kuò)展示例。此類的大多數(shù)擴(kuò)展可以重寫(xiě)一個(gè)或多個(gè)受保護(hù)的鉤子 (hook) 方法。例如,下面是一個(gè)添加了簡(jiǎn)單的暫停/恢復(fù)功能的子類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super (...); protected void beforeExecute(Thread t, Runnable r) { super .beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true ; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false ; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }} |
關(guān)于它的使用請(qǐng)參考《ExecutorService》
Nested Classes | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
|
ThreadPoolExecutor.AbortPolicy | A handler for rejected tasks that throws a RejectedExecutionException. | |||||||||
|
ThreadPoolExecutor.CallerRunsPolicy | A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded. | |||||||||
|
ThreadPoolExecutor.DiscardOldestPolicy | A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded. | |||||||||
|
ThreadPoolExecutor.DiscardPolicy | A handler for rejected tasks that silently discards the rejected task. |
主要構(gòu)造函數(shù):
1
2
3
4
5
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) |
用給定的初始參數(shù)和默認(rèn)的線程工廠及被拒絕的執(zhí)行處理程序創(chuàng)建新的 ThreadPoolExecutor。
使用 Executors 工廠方法之一比使用此通用構(gòu)造方法方便得多。
參數(shù):
corePoolSize - 池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize - 池中允許的最大線程數(shù)。
keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
unit - keepAliveTime 參數(shù)的時(shí)間單位。
workQueue - 執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅保持由 execute 方法提交的 Runnable 任務(wù)。
拋出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,
或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue 為 null
1
2
3
4
5
6
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) |
用給定的初始參數(shù)和默認(rèn)被拒絕的執(zhí)行處理程序創(chuàng)建新的 ThreadPoolExecutor。
參數(shù):
corePoolSize - 池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize - 池中允許的最大線程數(shù)。
keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
unit - keepAliveTime 參數(shù)的時(shí)間單位。
workQueue - 執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅保持由 execute 方法提交的 Runnable 任務(wù)。
threadFactory - 執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。
拋出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue 或 threadFactory 為 null。
1
2
3
4
5
6
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) |
用給定的初始參數(shù)和默認(rèn)的線程工廠創(chuàng)建新的 ThreadPoolExecutor。
參數(shù):
corePoolSize - 池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize - 池中允許的最大線程數(shù)。
keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
unit - keepAliveTime 參數(shù)的時(shí)間單位。
workQueue - 執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅由保持 execute 方法提交的 Runnable 任務(wù)。
handler - 由于超出線程范圍和隊(duì)列容量而使執(zhí)行被阻塞時(shí)所使用的處理程序。
拋出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,
或者 corePoolSize 大于 maximumPoolSize。
主要成員函數(shù)
public void execute(Runnable command)
在將來(lái)某個(gè)時(shí)間執(zhí)行給定任務(wù)。可以在新線程中或者在現(xiàn)有池線程中執(zhí)行該任務(wù)。 如果無(wú)法將任務(wù)提交執(zhí)行,或者因?yàn)榇藞?zhí)行程序已關(guān)閉,或者因?yàn)橐堰_(dá)到其容量,則該任務(wù)由當(dāng)前 RejectedExecutionHandler 處理。
參數(shù):
command - 要執(zhí)行的任務(wù)。
拋出:
RejectedExecutionException - 如果無(wú)法接收要執(zhí)行的任務(wù),則由 RejectedExecutionHandler 決定是否拋出 RejectedExecutionException
NullPointerException - 如果命令為 null
public void shutdown()
按過(guò)去執(zhí)行已提交任務(wù)的順序發(fā)起一個(gè)有序的關(guān)閉,但是不接受新任務(wù)。如果已經(jīng)關(guān)閉,則調(diào)用沒(méi)有其他作用。
拋出:
SecurityException - 如果安全管理器存在并且關(guān)閉此 ExecutorService 可能操作某些不允許調(diào)用者修改的線程(因?yàn)樗鼪](méi)有 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒絕訪問(wèn)。
public List<Runnable> shutdownNow()
嘗試停止所有的活動(dòng)執(zhí)行任務(wù)、暫停等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。在從此方法返回的任務(wù)隊(duì)列中排空(移除)這些任務(wù)。
并不保證能夠停止正在處理的活動(dòng)執(zhí)行任務(wù),但是會(huì)盡力嘗試。 此實(shí)現(xiàn)通過(guò) Thread.interrupt() 取消任務(wù),所以無(wú)法響應(yīng)中斷的任何任務(wù)可能永遠(yuǎn)無(wú)法終止。
返回:
從未開(kāi)始執(zhí)行的任務(wù)的列表。
拋出:
SecurityException - 如果安全管理器存在并且關(guān)閉此 ExecutorService
可能操作某些不允許調(diào)用者修改的線程(因?yàn)樗鼪](méi)有 RuntimePermission("modifyThread")),
或者安全管理器的 checkAccess 方法拒絕訪問(wèn)。
public int prestartAllCoreThreads()
啟動(dòng)所有核心線程,使其處于等待工作的空閑狀態(tài)。僅當(dāng)執(zhí)行新任務(wù)時(shí),此操作才重寫(xiě)默認(rèn)的啟動(dòng)核心線程策略。
返回:
已啟動(dòng)的線程數(shù)
public boolean allowsCoreThreadTimeOut()
如果此池允許核心線程超時(shí)和終止,如果在 keepAlive 時(shí)間內(nèi)沒(méi)有任務(wù)到達(dá),新任務(wù)到達(dá)時(shí)正在替換(如果需要),則返回 true。當(dāng)返回 true 時(shí),適用于非核心線程的相同的保持活動(dòng)策略也同樣適用于核心線程。當(dāng)返回 false(默認(rèn)值)時(shí),由于沒(méi)有傳入任務(wù),核心線程不會(huì)終止。
返回:
如果允許核心線程超時(shí),則返回 true;否則返回 false
public void allowCoreThreadTimeOut(boolean value)
如果在保持活動(dòng)時(shí)間內(nèi)沒(méi)有任務(wù)到達(dá),新任務(wù)到達(dá)時(shí)正在替換(如果需要),則設(shè)置控制核心線程是超時(shí)還是終止的策略。當(dāng)為 false(默認(rèn)值)時(shí),由于沒(méi)有傳入任務(wù),核心線程將永遠(yuǎn)不會(huì)中止。當(dāng)為 true 時(shí),適用于非核心線程的相同的保持活動(dòng)策略也同樣適用于核心線程。為了避免連續(xù)線程替換,保持活動(dòng)時(shí)間在設(shè)置為 true 時(shí)必須大于 0。通常應(yīng)該在主動(dòng)使用該池前調(diào)用此方法。
參數(shù):
value - 如果應(yīng)該超時(shí),則為 true;否則為 false
拋出:
IllegalArgumentException - 如果 value 為 true 并且當(dāng)前保持活動(dòng)時(shí)間不大于 0。
public boolean remove(Runnable task)
從執(zhí)行程序的內(nèi)部隊(duì)列中移除此任務(wù)(如果存在),從而如果尚未開(kāi)始,則讓其不再運(yùn)行。
此方法可用作取消方案的一部分。它可能無(wú)法移除在放置到內(nèi)部隊(duì)列之前已經(jīng)轉(zhuǎn)換為其他形式的任務(wù)。
例如,使用 submit 輸入的任務(wù)可能被轉(zhuǎn)換為維護(hù) Future 狀態(tài)的形式。但是,在此情況下,purge() 方法可用于移除那些已被取消的 Future。
參數(shù):
task - 要移除的任務(wù)
返回:
如果已經(jīng)移除任務(wù),則返回 true
public void purge()
嘗試從工作隊(duì)列移除所有已取消的 Future 任務(wù)。此方法可用作存儲(chǔ)回收操作,它對(duì)功能沒(méi)有任何影響。
取消的任務(wù)不會(huì)再次執(zhí)行,但是它們可能在工作隊(duì)列中累積,直到worker線程主動(dòng)將其移除。
調(diào)用此方法將試圖立即移除它們。但是,如果出現(xiàn)其他線程的干預(yù),那么此方法移除任務(wù)將失敗。
當(dāng)然它還實(shí)現(xiàn)了的ExecutorService的submit系列接口
|
Submits a Runnable task for execution and returns a Future representing that task.
如果執(zhí)行成功就返回T result
|
|
Submits a value-returning task for execution and returns a Future representing the pending results of the task.
|
|
Submits a Runnable task for execution and returns a Future representing that task.
|
感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!