1.簡介
使用線程池可以避免線程的頻繁創(chuàng)建以及銷毀。
java中提供的用于實(shí)現(xiàn)線程池的api:
executor、executorservice、abstractexecutorservice、threadpoolexecutor、forkjoinpool都位于java.util.concurrent包下。
*threadpoolexecutor、forkjoinpool為線程池的實(shí)現(xiàn)類。
2.executor
1
2
3
4
5
6
7
8
|
public interface executor { /** * 向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行 */ void execute(runnable command); } |
*該接口聲明了execute(runnable command)方法,負(fù)責(zé)向線程池中提交一個(gè)任務(wù)。
3.executorservice接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
public interface executorservice extends executor { /** * 關(guān)閉線程池(等待隊(duì)列中的任務(wù)被執(zhí)行完畢) */ void shutdown(); /** * 立刻關(guān)閉線程池(不執(zhí)行隊(duì)列中的任務(wù),并嘗試中斷當(dāng)前執(zhí)行的任務(wù)) */ list<runnable> shutdownnow(); /** * 判斷線程池是否處于shutdown狀態(tài). */ boolean isshutdown(); /** * 判斷線程池是否處于terminated狀態(tài). */ boolean isterminated(); /** * 若在指定時(shí)間內(nèi)線程池處于terminated狀態(tài)則立即返回true,否則超過時(shí)間后仍未為terminated狀態(tài)則返回false. */ boolean awaittermination( long timeout, timeunit unit) throws interruptedexception; /** * 向線程池提交一個(gè)任務(wù)并返回包含指定類型的future(根據(jù)callable的泛型) */ <t> future<t> submit(callable<t> task); /** * 向線程池提交一個(gè)任務(wù)并指定任務(wù)執(zhí)行結(jié)果的類型,返回包含指定類型的future. */ <t> future<t> submit(runnable task, t result); /** * 向線程池提交一個(gè)任務(wù)并返回未知類型的future. */ future<?> submit(runnable task); /** * 向線程池提交多個(gè)任務(wù)并返回指定類型的future列表. */ <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks) throws interruptedexception; /** * 向線程池提交多個(gè)任務(wù)并返回指定類型的future列表,如果在指定時(shí)間內(nèi)沒有執(zhí)行完畢則直接返回. */ <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks, long timeout, timeunit unit) throws interruptedexception; /** * 向線程池提交多個(gè)任務(wù),當(dāng)任意一個(gè)任務(wù)執(zhí)行完畢后返回指定類型的future. */ <t> t invokeany(collection<? extends callable<t>> tasks) throws interruptedexception, executionexception; /** * 向線程池提交多個(gè)任務(wù),在指定時(shí)間內(nèi),當(dāng)任意一個(gè)任務(wù)執(zhí)行完畢后返回指定類型的future,若超時(shí)則拋出異常. */ <t> t invokeany(collection<? extends callable<t>> tasks, long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public interface future<v> { /** * 中斷任務(wù)的執(zhí)行 */ boolean cancel( boolean mayinterruptifrunning); /** * 判斷任務(wù)是否中斷成功 */ boolean iscancelled(); /** * 判斷任務(wù)是否執(zhí)行完成 */ boolean isdone(); /** * 獲取任務(wù)的執(zhí)行結(jié)果直到任務(wù)執(zhí)行完畢(阻塞線程) */ v get() throws interruptedexception, executionexception; /** * 獲取任務(wù)的執(zhí)行結(jié)果,若在指定時(shí)間內(nèi)任務(wù)仍然沒有執(zhí)行完畢則拋出timeoutexception */ v get( long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; } |
*execute()方法不能獲取任務(wù)的執(zhí)行結(jié)果,而submit()方法能夠根據(jù)返回的future實(shí)例獲取任務(wù)的執(zhí)行結(jié)果。
4.threadpoolexecutor
corepoolsize:線程池中核心線程的數(shù)量。
maximumpoolsize:線程池中最大線程數(shù)。
keepalivetime:線程的空閑時(shí)間。
unit:修飾線程空閑時(shí)間的單位。
workqueue:任務(wù)隊(duì)列。
threadfactory:線程工廠,用于創(chuàng)建線程。
handler:當(dāng)隊(duì)列已滿且當(dāng)前線程數(shù)已達(dá)到所允許的最大值時(shí)的處理策略。
*線程池中的線程包括核心線程以及普通線程,核心線程一旦創(chuàng)建后直到線程池被關(guān)閉前都就不會被銷毀,而普通線程會因?yàn)榈竭_(dá)空閑時(shí)間而被銷毀。
構(gòu)造方法:
1
2
3
4
5
6
7
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) |
blockingqueue的類型
blockingqueue提供了arrayblockingqueue、linkedblockingqueue、synchronousqueue等實(shí)現(xiàn)類。
1.arrayblockingqueue:使用順序表的結(jié)構(gòu)進(jìn)行存儲,在使用時(shí)需要指定其長度,支持公平鎖/非公平鎖進(jìn)行操作。
2.linkedblockingqueue:使用鏈表的結(jié)構(gòu)進(jìn)行存儲,在使用時(shí)不需要指定其長度,隊(duì)列的最大長度為integer.max_value。
3.synchronousqueue:一個(gè)不存儲元素的隊(duì)列,每一個(gè)put操作必須等待take操作,否則不能添加元素,支持公平鎖和非公平鎖。
*這些實(shí)現(xiàn)類在進(jìn)行入隊(duì)和出隊(duì)操作時(shí)都會進(jìn)行加鎖,以保證在多線程并發(fā)訪問時(shí)數(shù)據(jù)的安全性。
隊(duì)列已滿且線程數(shù)已達(dá)到所允許的最大值時(shí)的處理策略
rejectedexecutionhandler提供了abortpolicy、discardpolicy、discardolderstpolicy、callerrunspolicy四個(gè)策略,這四個(gè)策略都是threadpoolexecutor的靜態(tài)內(nèi)部類。
1.abortpolicy:放棄任務(wù)并拋出rejectedexecutionexception異常。
2.discardpolicy:放棄任務(wù)但不拋出異常。
3.discardolderstpolicy: 放棄隊(duì)頭中的任務(wù),然后重新嘗試執(zhí)行新任務(wù)。
4.callerrunspolicy: 由調(diào)用線程來處理該任務(wù)。
線程池的狀態(tài)
1
2
3
4
5
|
private static final int running = - 1 ; private static final int shutdown = 0 ; private static final int stop = 1 ; private static final int tidying = 2 ; private static final int terminated = 3 ; |
1.runing:線程池處于運(yùn)行狀態(tài),此時(shí)可以接受新的任務(wù)請求,并且執(zhí)行隊(duì)列中的任務(wù)。
2.shutdown:線程池處于關(guān)閉狀態(tài),此時(shí)不接受新的任務(wù)請求,但會繼續(xù)執(zhí)行隊(duì)列中的任務(wù)。
3.stop:線程池處于禁用狀態(tài),此時(shí)不接受新的任務(wù)請求,并且不會執(zhí)行隊(duì)列中的任務(wù)。
4.tidying:線程池處于整理狀態(tài),此時(shí)沒有正在執(zhí)行的任務(wù)。
5.terminated :線程池處于終止?fàn)顟B(tài)。
線程池狀態(tài)的變化過程
1.當(dāng)線程池創(chuàng)建后處于running狀態(tài)。
2.1 若此時(shí)調(diào)用了shutdown()方法,那么線程池將處于shutdown狀態(tài),不接受新的任務(wù)請求,但會繼續(xù)執(zhí)行隊(duì)列中的任務(wù),當(dāng)隊(duì)列中的任務(wù)為空且沒有正在執(zhí)行的任務(wù)時(shí),線程池的狀態(tài)為tidying。
2.2 若此時(shí)調(diào)用了shutdownnow()方法,那么線程池將處于stop狀態(tài),不接受新的任務(wù)請求并且不執(zhí)行隊(duì)列中的任務(wù),此時(shí)線程池的狀態(tài)為tidying。
3.當(dāng)線程池的狀態(tài)為tidying時(shí),當(dāng)terminated()方法處理完畢后,線程池的狀態(tài)為trrminated。
任務(wù)的執(zhí)行流程
1.當(dāng)調(diào)用了execute()或者submit()方法向線程池提交一個(gè)任務(wù)后,首先判斷當(dāng)前線程池中的線程個(gè)數(shù)是否大于核心線程數(shù)。
2.如果當(dāng)前線程池的線程個(gè)數(shù)小于核心線程數(shù),則創(chuàng)建一個(gè)核心線程來處理任務(wù)。
3.如果當(dāng)前線程池的線程個(gè)數(shù)大于核心線程數(shù),則將任務(wù)放入到隊(duì)列中,如果放入隊(duì)列成功,那么該任務(wù)將等待被空閑的線程處理,如果放入隊(duì)列失敗(隊(duì)滿),則判斷當(dāng)前線程池中的線程個(gè)數(shù)是否達(dá)到所允許的最大值,若未達(dá)到則創(chuàng)建一個(gè)普通線程去處理任務(wù),否則根據(jù)預(yù)定義的處理策略去進(jìn)行處理。
5.executors工具類
java中提供了executors工具類,用于直接創(chuàng)建executor。
cachethreadpool
1
2
3
|
public static executorservice newcachedthreadpool() { return new threadpoolexecutor( 0 , integer.max_value,60l, timeunit.seconds, new synchronousqueue<runnable>()); } |
cachethreadpool創(chuàng)建的都是普通線程(其核心線程數(shù)為0)、線程池的最大線程數(shù)為integer.max_value、線程的空閑時(shí)間為60秒,此方式適合大量耗時(shí)短的任務(wù)、不適合大量耗時(shí)長的任務(wù)。
*由于創(chuàng)建的都是普通線程,且空閑時(shí)間為60秒,則仍有可能會頻繁的創(chuàng)建線程。
fixedthreadpool
1
2
3
|
public static executorservice newfixedthreadpool( int nthreads) { return new threadpoolexecutor(nthreads, nthreads,0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); } |
fixedthreadpool創(chuàng)建的都是核心線程,其線程個(gè)數(shù)由入?yún)Q定,線程不會因?yàn)榭臻e時(shí)間而被銷毀,適合預(yù)知任務(wù)數(shù)量的業(yè)務(wù)。
singlethreadexecutor
1
2
3
|
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice( new threadpoolexecutor( 1 , 1 , new linkedblockingqueue<runnable>())); } |
singlethreadexecutor使用一個(gè)核心線程來處理任務(wù)。
scheduledthreadpool
1
2
3
|
public static scheduledexecutorservice newscheduledthreadpool( int corepoolsize) { return new scheduledthreadpoolexecutor(corepoolsize); } |
*scheduledthreadpool支持定時(shí)執(zhí)行任務(wù)以及固定間隔執(zhí)行任務(wù)。
singlethreadscheduledexecutor
1
2
3
|
public static scheduledexecutorservice newsinglethreadscheduledexecutor() { return new delegatedscheduledexecutorservice( new scheduledthreadpoolexecutor( 1 )); } |
*singlethreadscheduledexecutor支持一個(gè)線程的定時(shí)執(zhí)行任務(wù)以及固定間隔執(zhí)行任務(wù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public interface scheduledexecutorservice extends executorservice { /** * 在指定的延遲時(shí)間到達(dá)后執(zhí)行任務(wù)一次 */ public scheduledfuture<?> schedule(runnable command, long delay, timeunit unit); /** * 在指定的延遲時(shí)間到達(dá)后執(zhí)行任務(wù)一次 */ public <v> scheduledfuture<v> schedule(callable<v> callable, long delay, timeunit unit); /** * 在指定的初始化延遲時(shí)間到達(dá)后執(zhí)行任務(wù)一次,往后每隔period時(shí)間執(zhí)行任務(wù)一次. */ public scheduledfuture<?> scheduleatfixedrate(runnable command, long initialdelay, long period,timeunit unit); /** * 在指定的初始化延遲時(shí)間到達(dá)后執(zhí)行任務(wù)一次,往后每次任務(wù)執(zhí)行完畢后相隔delay時(shí)間執(zhí)行任務(wù)一次. */ public scheduledfuture<?> schedulewithfixeddelay(runnable command, long initialdelay, long delay,timeunit unit); } |
workstealingpool
1
2
3
|
public static executorservice newworkstealingpool( int parallelism) { return new forkjoinpool(parallelism,forkjoinpool.defaultforkjoinworkerthreadfactory, null , true ); } |
workstealingpool創(chuàng)建一個(gè)并行級別的線程池,同一時(shí)刻最多只能有指定個(gè)數(shù)個(gè)線程正在執(zhí)行任務(wù),創(chuàng)建時(shí)直接指定同一時(shí)刻最多能允許的并行執(zhí)行的線程個(gè)數(shù)即可,如果不傳則使用cpu的核數(shù)。
newworkstealingpool方法內(nèi)部返回一個(gè)forkjoinpool實(shí)例,forkjoinpool是java7新提供的線程池,同樣繼承abstactexecutorservice。
*作用類似于semaphore。
以上所述是小編給大家介紹的java中的線程池詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對服務(wù)器之家網(wǎng)站的支持!
原文鏈接:https://www.cnblogs.com/funyoung/p/10530986.html