下面通過實(shí)例代碼為大家介紹Java線程池的幾種實(shí)現(xiàn)方法和區(qū)別:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestThreadPool { // -newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時(shí)建新的線程 // -其獨(dú)特之處:任意時(shí)間點(diǎn),最多只能有固定數(shù)目的活動(dòng)線程存在,此時(shí)如果有新的線程要建立,只能放在另外的隊(duì)列中等待,直到當(dāng)前的線程中某個(gè)線程終止直接被移出池子 // -和cacheThreadPool不同,F(xiàn)ixedThreadPool沒有IDLE機(jī)制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP // IDLE機(jī)制之類的),所以FixedThreadPool多數(shù)針對(duì)一些很穩(wěn)定很固定的正規(guī)并發(fā)線程,多用于服務(wù)器 // -從方法的源代碼看,cache池和fixed 池調(diào)用的是同一個(gè)底層池,只不過參數(shù)不同: // fixed池線程數(shù)固定,并且是0秒IDLE(無IDLE) // cache池線程數(shù)支持0-Integer.MAX_VALUE(顯然完全沒考慮主機(jī)的資源承受能力),60秒IDLE private static ExecutorService fixedService = Executors.newFixedThreadPool( 6 ); // -緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse.如果沒有,就建一個(gè)新的線程加入池中 // -緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù) // 因此在一些面向連接的daemon型SERVER中用得不多。 // -能reuse的線程,必須是timeout IDLE內(nèi)的池中線程,缺省timeout是60s,超過這個(gè)IDLE時(shí)長,線程實(shí)例將被終止及移出池。 // 注意,放入CachedThreadPool的線程不必?fù)?dān)心其結(jié)束,超過TIMEOUT不活動(dòng),其會(huì)自動(dòng)被終止。 private static ExecutorService cacheService = Executors.newCachedThreadPool(); // -單例線程,任意時(shí)間池中只能有一個(gè)線程 // -用的是和cache池和fixed池相同的底層池,但線程數(shù)目是1-1,0秒IDLE(無IDLE) private static ExecutorService singleService = Executors.newSingleThreadExecutor(); // -調(diào)度型線程池 // -這個(gè)池子里的線程可以按schedule依次delay執(zhí)行,或周期執(zhí)行 private static ExecutorService scheduledService = Executors.newScheduledThreadPool( 10 ); public static void main(String[] args) { DateFormat format = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); List<Integer> customerList = new ArrayList<Integer>(); System.out.println(format.format( new Date())); testFixedThreadPool(fixedService, customerList); System.out.println( "--------------------------" ); testFixedThreadPool(fixedService, customerList); fixedService.shutdown(); System.out.println(fixedService.isShutdown()); System.out.println( "----------------------------------------------------" ); testCacheThreadPool(cacheService, customerList); System.out.println( "----------------------------------------------------" ); testCacheThreadPool(cacheService, customerList); cacheService.shutdownNow(); System.out.println( "----------------------------------------------------" ); testSingleServiceThreadPool(singleService, customerList); testSingleServiceThreadPool(singleService, customerList); singleService.shutdown(); System.out.println( "----------------------------------------------------" ); testScheduledServiceThreadPool(scheduledService, customerList); testScheduledServiceThreadPool(scheduledService, customerList); scheduledService.shutdown(); } public static void testScheduledServiceThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<Integer>> listCallable = new ArrayList<Callable<Integer>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { return new Random().nextInt( 10 ); } }; listCallable.add(callable); } try { List<Future<Integer>> listFuture = service.invokeAll(listCallable); for (Future<Integer> future : listFuture) { Integer id = future.get(); customerList.add(id); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testSingleServiceThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testCacheThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testFixedThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static List<Integer> getList( int x) { List<Integer> list = new ArrayList<Integer>(); list.add(x); list.add(x * x); return list; } } |
使用:LinkedBlockingQueue實(shí)現(xiàn)線程池講解
1
2
3
4
5
6
7
8
9
10
11
|
//例如:corePoolSize=3,maximumPoolSize=6,LinkedBlockingQueue(10) //RejectedExecutionHandler默認(rèn)處理方式是:ThreadPoolExecutor.AbortPolicy //ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); //1.如果線程池中(也就是調(diào)用executorService.execute)運(yùn)行的線程未達(dá)到LinkedBlockingQueue.init(10)的話,當(dāng)前執(zhí)行的線程數(shù)是:corePoolSize(3) //2.如果超過了LinkedBlockingQueue.init(10)并且超過的數(shù)>=init(10)+corePoolSize(3)的話,并且小于init(10)+maximumPoolSize. 當(dāng)前啟動(dòng)的線程數(shù)是:(當(dāng)前線程數(shù)-init(10)) //3.如果調(diào)用的線程數(shù)超過了init(10)+maximumPoolSize 則根據(jù)RejectedExecutionHandler的規(guī)則處理。 |
關(guān)于:RejectedExecutionHandler幾種默認(rèn)實(shí)現(xiàn)講解
1
2
3
4
5
6
7
8
|
//默認(rèn)使用:ThreadPoolExecutor.AbortPolicy,處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。 RejectedExecutionHandler policy= new ThreadPoolExecutor.AbortPolicy(); // //在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運(yùn)行該任務(wù)的execute本身。此策略提供簡單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。 // policy=new ThreadPoolExecutor.CallerRunsPolicy(); // //在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。 // policy=new ThreadPoolExecutor.DiscardPolicy(); // //在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。 // policy=new ThreadPoolExecutor.DiscardOldestPolicy(); |
希望本篇文章對(duì)您有所幫助
原文鏈接:http://www.2cto.com/kf/201605/511060.html