線程池中ThreadGroup的坑
在Java中每一個(gè)線程都?xì)w屬于某個(gè)線程組管理的一員,例如在主函數(shù)main()主工作流程中產(chǎn)生一個(gè)線程,則產(chǎn)生的線程屬于main這個(gè)線程組管理的一員。簡(jiǎn)單地說(shuō),線程組(ThreadGroup)就是由線程組成的管理線程的類(lèi),這個(gè)類(lèi)是java.lang.ThreadGroup類(lèi)。
定義一個(gè)線程組,通過(guò)以下代碼可以實(shí)現(xiàn)。
1
2
|
ThreadGroup group= new ThreadGroup(“groupName”); Thread thread= new Thread(group,”the first thread of group”); |
ThreadGroup類(lèi)中的某些方法,可以對(duì)線程組中的線程產(chǎn)生作用。例如,setMaxPriority()方法可以設(shè)定線程組中的所有線程擁有最大的優(yōu)先權(quán)。
所有線程都隸屬于一個(gè)線程組。那可以是一個(gè)默認(rèn)線程組(不指定group),亦可是一個(gè)創(chuàng)建線程時(shí)明確指定的組。在創(chuàng)建之初,線程被限制到一個(gè)組里,而且不能改變到一個(gè)不同的組。每個(gè)應(yīng)用都至少有一個(gè)線程從屬于系統(tǒng)線程組。若創(chuàng)建多個(gè)線程而不指定一個(gè)組,它們就會(huì)自動(dòng)歸屬于系統(tǒng)線程組。
線程組也必須從屬于其他線程組。必須在構(gòu)建器里指定新線程組從屬于哪個(gè)線程組。若在創(chuàng)建一個(gè)線程組的時(shí)候沒(méi)有指定它的歸屬,則同樣會(huì)自動(dòng)成為系統(tǒng)線程組的一名屬下。因此,一個(gè)應(yīng)用程序中的所有線程組最終都會(huì)將系統(tǒng)線程組作為自己的“父”。
那么假如我們需要在線程池中實(shí)現(xiàn)一個(gè)帶自定義ThreadGroup的線程分組,該怎么實(shí)現(xiàn)呢?
我們?cè)诮o線程池(ThreadPoolExecutor)提交任務(wù)的時(shí)候可以通過(guò)execute(Runnable command)來(lái)將一個(gè)線程任務(wù)加入到該線程池,那么我們是否可以通過(guò)new一個(gè)指定了ThreadGroup的Thread實(shí)例來(lái)加入線程池來(lái)達(dá)到前面說(shuō)到的目的呢?
ThreadGroup是否可行
通過(guò)new Thread(threadGroup,runnable)實(shí)現(xiàn)線程池中任務(wù)分組
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public static void main(String[] args) { ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); final ThreadGroup group = new ThreadGroup( "Main_Test_Group" ); for ( int i = 0 ; i < 5 ; i++) { Thread thread = new Thread(group, new Runnable() { @Override public void run() { int sleep = ( int )(Math.random() * 10 ); try { Thread.sleep( 1000 * 3 ); System.out.println(Thread.currentThread().getName()+ "執(zhí)行完畢" ); System.out.println( "當(dāng)前線程組中的運(yùn)行線程數(shù)" +group.activeCount()); } catch (InterruptedException e) { e.printStackTrace(); } } }, group.getName()+ " #" +i+ "" ); pool.execute(thread); } } |
運(yùn)行結(jié)果
pool-1-thread-3執(zhí)行完畢
pool-1-thread-1執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
pool-1-thread-2執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
當(dāng)前線程組中的運(yùn)行線程數(shù)0
pool-1-thread-4執(zhí)行完畢
pool-1-thread-5執(zhí)行完畢
當(dāng)前線程組中的運(yùn)行線程數(shù)0
當(dāng)前線程組中的運(yùn)行線程數(shù)0
運(yùn)行結(jié)果中可以看到group中的線程并沒(méi)有因?yàn)榫€程池啟動(dòng)了這個(gè)線程任務(wù)而運(yùn)行起來(lái).因此通過(guò)線程組來(lái)對(duì)線程池中的線層任務(wù)分組不可行.
從java.util.concurrent.ThreadPoolExecutor源碼中可以看到如下構(gòu)造函數(shù):
1
2
3
4
5
6
7
8
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } |
如果我們?cè)趯?shí)例化ThreadPoolExecutor時(shí)不指定ThreadFactory,那么將以默認(rèn)的ThreadFactory來(lái)創(chuàng)建Thread.
Executors內(nèi)部類(lèi)DefaultThreadFactory
下面的源碼即是默認(rèn)的Thread工廠
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
從唯一的構(gòu)造函數(shù)可以看到DefaultThreadFactory以SecurityManager 實(shí)例中的ThreadGroup來(lái)指定線程的group,如果SecurityManager 獲取到的ThreadGroup為null才默認(rèn)以當(dāng)前線程的group來(lái)指定.public Thread newThread(Runnable r) 則以group來(lái)new 一個(gè)Thead.這樣我們可以在實(shí)例化ThreadPoolExecutor對(duì)象的時(shí)候在其構(gòu)造函數(shù)內(nèi)傳入自定義的ThreadFactory實(shí)例即可達(dá)到目的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class MyTheadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; private ThreadGroup defaultGroup; public MyTheadFactory() { SecurityManager s = System.getSecurityManager(); defaultGroup = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public MyTheadFactory(ThreadGroup group) { this .defaultGroup = group; namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(defaultGroup, null , namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
ThreadGroup的使用及手寫(xiě)線程池
監(jiān)聽(tīng)線程異常關(guān)閉
以下代碼在window下不方便測(cè)試,需在linux 上 測(cè)試
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 以下線程如果強(qiáng)制關(guān)閉的話(huà),是無(wú)法打印`線程被殺掉了` // 模擬關(guān)閉 kill PID public static void main(String[] args) { Runtime.getRuntime().addShutdownHook( new Thread( () -> { System.out.println( "線程被殺掉了" ); })); while ( true ){ System.out.println( "i am working ..." ); try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
如何拿到Thread線程中異常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public static void main(String[] args) { Thread thread = new Thread(() -> { try { Thread.sleep( 1000 ); int i = 10 / 0 ; } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setUncaughtExceptionHandler((t,e)->{ System.out.println( "線程的名字" + t.getName()); System.out.println(e); }); // 通過(guò)注入接口的方式 thread.start(); } |
ThreadGroup
注意: threadGroup 設(shè)置為isDaemon 后,會(huì)隨最后一個(gè)線程結(jié)束而銷(xiāo)毀,如果沒(méi)有設(shè)置isDaemon ,則需要手動(dòng)調(diào)用 destory()
線程池使用
自己搭建的簡(jiǎn)單線程池實(shí)現(xiàn)
其中ThreadGroup 的應(yīng)用沒(méi)有寫(xiě),但是我們可以觀察線程關(guān)閉后,檢查T(mén)hreadGroup 中是否還有活躍的線程等,具體參考ThreadGroup API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.stream.IntStream; /** * @Author: shengjm * @Date: 2020/2/10 9:52 * @Description: */ public class SimpleThreadPool extends Thread{ /** * 線程數(shù)量 */ private int size; private final int queueSize; /** * 默認(rèn)線程隊(duì)列數(shù)量 */ private final static int DEFAULR_TASK_QUEUE_SIZE = 2000 ; private static volatile int seq = 0 ; private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_" ; private final static ThreadGroup GROUP = new ThreadGroup( "Pool_Group" ); private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); private final DiscardPolicy discardPolicy; private volatile boolean destory = false ; private int min; private int max; private int active; /** * 定義異常策略的實(shí)現(xiàn) */ private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException( "線程池已經(jīng)被撐爆了,后繼多余的人將丟失" ); }; /** * */ public SimpleThreadPool(){ this ( 4 , 8 , 12 ,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY); } /** * */ public SimpleThreadPool( int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) { this .min = min; this .active = active; this .max = max; this .queueSize = queueSize; this .discardPolicy = discardPolicy; init(); } /** * 初始化 */ private void init() { for ( int i = 0 ; i < min; i++){ createWorkTask(); } this .size = min; this .start(); } private void createWorkTask(){ WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++)); task.start(); THREAD_QUEUE.add(task); } /** * 線程池自動(dòng)擴(kuò)充 */ @Override public void run() { while (!destory){ System.out.println( this .min + " --- " + this .active+ " --- " + this .max + " --- " + this .size + " --- " + TASK_QUEUE.size()); try { Thread.sleep( 1000 ); if (TASK_QUEUE.size() > active && size < active){ for ( int i = size; i < active;i++){ createWorkTask(); } size = active; } else if (TASK_QUEUE.size() > max && size < max){ for ( int i = size; i < max;i++){ createWorkTask(); } size = max; } synchronized (THREAD_QUEUE){ if (TASK_QUEUE.isEmpty() && size > active){ int release = size - active; for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){ if (release <= 0 ){ break ; } WorkerTask task = it.next(); task.close(); task.interrupt(); it.remove(); release--; } size = active; } } } catch (InterruptedException e) { break ; } } } public void submit(Runnable runnable){ synchronized (TASK_QUEUE){ if (destory){ throw new DiscardException( "線程池已經(jīng)被摧毀了..." ); } if (TASK_QUEUE.size() > queueSize){ discardPolicy.discard(); } TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } } /** * 關(guān)閉 */ public void shutdown(){ while (!TASK_QUEUE.isEmpty()){ try { Thread.sleep( 10 ); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (THREAD_QUEUE) { int initVal = THREAD_QUEUE.size(); while (initVal > 0 ) { for (WorkerTask workerTask : THREAD_QUEUE) { if (workerTask.getTaskState() == TaskState.BLOCKED) { workerTask.interrupt(); workerTask.close(); initVal--; } else { try { Thread.sleep( 10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } this .destory = true ; } } public int getSize() { return size; } public int getMin() { return min; } public int getMax() { return max; } public int getActive() { return active; } /** * 線程狀態(tài) */ private enum TaskState{ FREE , RUNNING , BLOCKED , DEAD } /** * 自定義異常類(lèi) */ public static class DiscardException extends RuntimeException{ public DiscardException(String message){ super (message); } } /** * 定義異常策略 */ @FunctionalInterface public interface DiscardPolicy{ void discard() throws DiscardException; } private static class WorkerTask extends Thread{ private volatile TaskState taskState = TaskState.FREE; public TaskState getTaskState(){ return this .taskState; } public WorkerTask(ThreadGroup group , String name){ super (group , name); } @Override public void run(){ OUTER: while ( this .taskState != TaskState.DEAD){ Runnable runnable; synchronized (TASK_QUEUE){ while (TASK_QUEUE.isEmpty()){ try { taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { break OUTER; } } runnable = TASK_QUEUE.removeFirst(); } if (runnable != null ){ taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; } } } public void close(){ this .taskState = TaskState.DEAD; } } /** * 測(cè)試 * @param args */ public static void main(String[] args) { SimpleThreadPool simpleThreadPool = new SimpleThreadPool(); // SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY); IntStream.rangeClosed( 0 , 40 ).forEach(i -> { simpleThreadPool.submit(() -> { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "the runnable " + i + "be servered by " + Thread.currentThread()); }); }); // try { // Thread.sleep(15000); // } catch (InterruptedException e) { // e.printStackTrace(); // } simpleThreadPool.shutdown(); } } |
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/tyBaoErGe/article/details/50196379