最近項(xiàng)目中遇到了一個(gè)新的需求,就是實(shí)現(xiàn)一個(gè)可以動(dòng)態(tài)添加定時(shí)任務(wù)的功能。說到這里,有人可能會(huì)說簡(jiǎn)單啊,使用quartz就好了,簡(jiǎn)單粗暴。然而quartz框架太重了,小項(xiàng)目根本不好操作啊。當(dāng)然,也有人會(huì)說,jdk提供了timer的接口啊,完全夠用啊。但是我們項(xiàng)目的需求完全是多線程的模型啊,而timer是單線程的,so,樓主最后還是選擇了jdk的線程池。
線程池是什么
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool :創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool : 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
newScheduledThreadPool : 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
newSingleThreadExecutor : 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
樓主項(xiàng)目中用到的是newScheduledThreadPool, 就這些吧,再多的樓主就班門弄斧了,Google一下,一大堆。
線程池service的獲取
樓主通過單例模式來獲取線程池的service,代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/** * 線程池創(chuàng)建. * @author wuhf * @date 2018/01/16 */ public class ThreadPoolUtils { private static ScheduledExecutorService executorService; private ThreadPoolUtils() { //手動(dòng)創(chuàng)建線程池. executorService = new ScheduledThreadPoolExecutor( 10 , new BasicThreadFactory.Builder().namingPattern( "syncdata-schedule-pool-%d" ).daemon( true ).build()); } private static class PluginConfigHolder { private final static ThreadPoolUtils INSTANCE = new ThreadPoolUtils(); } public static ThreadPoolUtils getInstance() { return PluginConfigHolder.INSTANCE; } public ScheduledExecutorService getThreadPool(){ return executorService; } } |
中斷某一個(gè)正在運(yùn)行的線程代碼實(shí)現(xiàn)
廢話就不多說了,代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
/** * 中斷線程池的某個(gè)任務(wù). */ public class InterruptThread implements Runnable { private int num; public InterruptThread ( int num){ this .num = num; } public static void main(String[] args) throws InterruptedException { Thread interruptThread = new Thread( new InterruptThread( 1 )); ScheduledFuture<?> t = ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread, 0 , 2 , TimeUnit.SECONDS); InterruptThread interruptThread1 = new InterruptThread( 2 ); ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread1, 0 , 2 , TimeUnit.SECONDS); InterruptThread interruptThread2 = new InterruptThread( 3 ); ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread2, 0 , 2 , TimeUnit.SECONDS); Thread.sleep( 5000 ); //終止正在運(yùn)行的線程interruptThread t.cancel( true ); while ( true ){ } } @Override public void run() { System.out.println( "this is a thread" + num); } } |
踩坑記錄
樓主在使用如下代碼時(shí),突然想到當(dāng)這個(gè)定時(shí)任務(wù)需要被停止時(shí)該如何停止線程運(yùn)行
1
2
|
ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread, 0 , 2 , TimeUnit.SECONDS); |
既然我有這樣的需求,那就Google一下吧,找了大半圈,愣是沒找到相關(guān)資料,都是一些關(guān)于Java線程池的深入分析。或者是全局變量啥的,并沒有找到令樓主滿意的解決方案。
既然沒有線程的那就扒一下scheduleAtFixedRate的底層源碼看看是什么東西吧,果不其然我在源碼中看到了scheduleAtFixedRate方法的具體實(shí)現(xiàn),發(fā)現(xiàn)他的返回值是ScheduledFuture。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null ) throw new NullPointerException(); if (period <= 0 ) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null , triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } |
接著往下我們?cè)倏纯碨cheduledFuture里面有什么東西吧,沒有讓樓主失望,看到了這個(gè)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public boolean cancel( boolean mayInterruptIfRunning) { boolean cancelled = super .cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0 ) remove( this ); return cancelled; } //從線程的運(yùn)行隊(duì)列中移除當(dāng)前線程 public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; } |
再往上查super.cancel(mayInterruptIfRunning)是什么東西,我們看到了這個(gè),
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
//通過調(diào)用線程的interrupt方法終止線程運(yùn)行 public boolean cancel( boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt( this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt( this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; } |
到這里所有的問題都迎刃而解。
總結(jié)一下吧
項(xiàng)目中總是會(huì)遇到比較難搞的解決方案,當(dāng)Google不太好找時(shí),翻一下jdk的源碼或許也是一個(gè)不錯(cuò)的方法。
原文鏈接:http://www.hchstudio.cn/article/2018/cc2a/