Java線(xiàn)程池的幾種實(shí)現(xiàn)方法和區(qū)別介紹
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestThreadPool { // -newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時(shí)建新的線(xiàn)程 // -其獨(dú)特之處:任意時(shí)間點(diǎn),最多只能有固定數(shù)目的活動(dòng)線(xiàn)程存在,此時(shí)如果有新的線(xiàn)程要建立,只能放在另外的隊(duì)列中等待,直到當(dāng)前的線(xiàn)程中某個(gè)線(xiàn)程終止直接被移出池子 // -和cacheThreadPool不同,F(xiàn)ixedThreadPool沒(méi)有IDLE機(jī)制(可能也有,但既然文檔沒(méi)提,肯定非常長(zhǎng),類(lèi)似依賴(lài)上層的TCP或UDP // IDLE機(jī)制之類(lèi)的),所以FixedThreadPool多數(shù)針對(duì)一些很穩(wěn)定很固定的正規(guī)并發(fā)線(xiàn)程,多用于服務(wù)器 // -從方法的源代碼看,cache池和fixed 池調(diào)用的是同一個(gè)底層池,只不過(guò)參數(shù)不同: // fixed池線(xiàn)程數(shù)固定,并且是0秒IDLE(無(wú)IDLE) // cache池線(xiàn)程數(shù)支持0-Integer.MAX_VALUE(顯然完全沒(méi)考慮主機(jī)的資源承受能力),60秒IDLE private static ExecutorService fixedService = Executors.newFixedThreadPool(6); // -緩存型池子,先查看池中有沒(méi)有以前建立的線(xiàn)程,如果有,就reuse.如果沒(méi)有,就建一個(gè)新的線(xiàn)程加入池中 // -緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù) // 因此在一些面向連接的daemon型SERVER中用得不多。 // -能reuse的線(xiàn)程,必須是timeout IDLE內(nèi)的池中線(xiàn)程,缺省timeout是60s,超過(guò)這個(gè)IDLE時(shí)長(zhǎng),線(xiàn)程實(shí)例將被終止及移出池。 // 注意,放入CachedThreadPool的線(xiàn)程不必?fù)?dān)心其結(jié)束,超過(guò)TIMEOUT不活動(dòng),其會(huì)自動(dòng)被終止。 private static ExecutorService cacheService = Executors.newCachedThreadPool(); // -單例線(xiàn)程,任意時(shí)間池中只能有一個(gè)線(xiàn)程 // -用的是和cache池和fixed池相同的底層池,但線(xiàn)程數(shù)目是1-1,0秒IDLE(無(wú)IDLE) private static ExecutorService singleService = Executors.newSingleThreadExecutor(); // -調(diào)度型線(xiàn)程池 // -這個(gè)池子里的線(xiàn)程可以按schedule依次delay執(zhí)行,或周期執(zhí)行 private static ExecutorService scheduledService = Executors.newScheduledThreadPool(10); public static void main(String[] args) { DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List< Integer > customerList = new ArrayList< Integer >(); System.out.println(format.format(new Date())); testFixedThreadPool(fixedService, customerList); System.out.println("--------------------------"); testFixedThreadPool(fixedService, customerList); fixedService.shutdown(); System.out.println(fixedService.isShutdown()); System.out.println("----------------------------------------------------"); testCacheThreadPool(cacheService, customerList); System.out.println("----------------------------------------------------"); testCacheThreadPool(cacheService, customerList); cacheService.shutdownNow(); System.out.println("----------------------------------------------------"); testSingleServiceThreadPool(singleService, customerList); testSingleServiceThreadPool(singleService, customerList); singleService.shutdown(); System.out.println("----------------------------------------------------"); testScheduledServiceThreadPool(scheduledService, customerList); testScheduledServiceThreadPool(scheduledService, customerList); scheduledService.shutdown(); } public static void testScheduledServiceThreadPool(ExecutorService service, List< Integer > customerList) { List< Callable <Integer>> listCallable = new ArrayList< Callable <Integer>>(); for (int i = 0; i < 10 ; i++) { Callable<Integer> callable = new Callable< Integer >() { @Override public Integer call() throws Exception { return new Random().nextInt(10); } }; listCallable.add(callable); } try { List< Future <Integer>> listFuture = service.invokeAll(listCallable); for (Future< Integer > future : listFuture) { Integer id = future.get(); customerList.add(id); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testSingleServiceThreadPool(ExecutorService service, List< Integer > customerList) { List< Callable <List<Integer>>> listCallable = new ArrayList< Callable <List<Integer>>>(); for (int i = 0; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable< List <Integer>>() { @Override public List< Integer > call() throws Exception { List< Integer > list = getList(new Random().nextInt(10)); boolean isStop = false; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000"); isStop = true; } return list; } }; listCallable.add(callable); } try { List< Future <List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future< List <Integer>> future : listFuture) { List< Integer > list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testCacheThreadPool(ExecutorService service, List< Integer > customerList) { List< Callable <List<Integer>>> listCallable = new ArrayList< Callable <List<Integer>>>(); for (int i = 0; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable< List <Integer>>() { @Override public List< Integer > call() throws Exception { List< Integer > list = getList(new Random().nextInt(10)); boolean isStop = false; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000"); isStop = true; } return list; } }; listCallable.add(callable); } try { List< Future <List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future< List <Integer>> future : listFuture) { List< Integer > list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testFixedThreadPool(ExecutorService service, List< Integer > customerList) { List< Callable <List<Integer>>> listCallable = new ArrayList< Callable <List<Integer>>>(); for (int i = 0; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable< List <Integer>>() { @Override public List< Integer > call() throws Exception { List< Integer > list = getList(new Random().nextInt(10)); boolean isStop = false; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000"); isStop = true; } return list; } }; listCallable.add(callable); } try { List< Future <List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future< List <Integer>> future : listFuture) { List< Integer > list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static List< Integer > getList(int x) { List< Integer > list = new ArrayList< Integer >(); list.add(x); list.add(x * x); return list; } } |
使用:LinkedBlockingQueue實(shí)現(xiàn)線(xiàn)程池講解
1
2
3
4
5
6
7
8
9
10
11
|
//例如:corePoolSize=3,maximumPoolSize=6,LinkedBlockingQueue(10) //RejectedExecutionHandler默認(rèn)處理方式是:ThreadPoolExecutor.AbortPolicy //ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); //1.如果線(xiàn)程池中(也就是調(diào)用executorService.execute)運(yùn)行的線(xiàn)程未達(dá)到LinkedBlockingQueue.init(10)的話(huà),當(dāng)前執(zhí)行的線(xiàn)程數(shù)是:corePoolSize(3) //2.如果超過(guò)了LinkedBlockingQueue.init(10)并且超過(guò)的數(shù)>=init(10)+corePoolSize(3)的話(huà),并且小于init(10)+maximumPoolSize. 當(dāng)前啟動(dòng)的線(xiàn)程數(shù)是:(當(dāng)前線(xiàn)程數(shù)-init(10)) //3.如果調(diào)用的線(xiàn)程數(shù)超過(guò)了init(10)+maximumPoolSize 則根據(jù)RejectedExecutionHandler的規(guī)則處理。 |
關(guān)于:RejectedExecutionHandler幾種默認(rèn)實(shí)現(xiàn)講解
1
2
3
4
5
6
7
8
|
//默認(rèn)使用:ThreadPoolExecutor.AbortPolicy,處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。 RejectedExecutionHandler policy= new ThreadPoolExecutor.AbortPolicy(); // //在 ThreadPoolExecutor.CallerRunsPolicy 中,線(xiàn)程調(diào)用運(yùn)行該任務(wù)的execute本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。 // policy=new ThreadPoolExecutor.CallerRunsPolicy(); // //在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。 // policy=new ThreadPoolExecutor.DiscardPolicy(); // //在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)。 // policy=new ThreadPoolExecutor.DiscardOldestPolicy(); |
以上這篇Java線(xiàn)程池的幾種實(shí)現(xiàn)方法和區(qū)別介紹就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。