一、任務和執行策略之間的隱性耦合
Executor可以將任務的提交和任務的執行策略解耦
只有任務是同類型的且執行時間差別不大,才能發揮最大性能,否則,如將一些耗時長的任務和耗時短的任務放在一個線程池,除非線程池很大,否則會造成死鎖等問題
1.線程饑餓死鎖
類似于:將兩個任務提交給一個單線程池,且兩個任務之間相互依賴,一個任務等待另一個任務,則會發生死鎖;表現為池不夠
定義:某個任務必須等待池中其他任務的運行結果,有可能發生饑餓死鎖
2.線程池大小
注意:線程池的大小還受其他的限制,如其他資源池:數據庫連接池
如果每個任務都是一個連接,那么線程池的大小就受制于數據庫連接池的大小
3.配置ThreadPoolExecutor線程池
實例:
1.通過Executors的工廠方法返回默認的一些實現
2.通過實例化ThreadPoolExecutor(.....)自定義實現
線程池的隊列
1.無界隊列:任務到達,線程池飽滿,則任務在隊列中等待,如果任務無限達到,則隊列會無限擴張
如:單例和固定大小的線程池用的就是此種
2.有界隊列:如果新任務到達,隊列滿則使用飽和策略
3.同步移交:如果線程池很大,將任務放入隊列后在移交就會產生延時,如果任務生產者很快也會導致任務排隊
SynchronousQueue直接將任務移交給工作線程
機制:將一個任務放入,必須有一個線程等待接受,如果沒有,則新增線程,如果線程飽和,則拒絕任務
如:CacheThreadPool就是使用的這種策略
飽和策略:
setRejectedExecutionHandler來修改飽和策略
1.終止Abort(默認):拋出異常由調用者處理
2.拋棄Discard
3.拋棄DiscardOldest:拋棄最舊的任務,注意:如果是優先級隊列將拋棄優先級最高的任務
4.CallerRuns:回退任務,有調用者線程自行處理
4.線程工廠ThreadFactoy
每當創建線程時:其實是調用了線程工廠來完成
自定義線程工廠:implements ThreadFactory
可以定制該線程工廠的行為:如UncaughtExceptionHandler等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread" ; private static volatile boolean debugLifecycle = false ; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this (r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super (runnable, name + "-" + created.incrementAndGet()); //設置該線程工廠創建的線程的 未捕獲異常的行為 setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super .run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug( boolean b) { debugLifecycle = b; } } |
5.擴展ThreadPoolExecutor
可以被自定義子類覆蓋的方法:
1.afterExecute:結束后,如果拋出RuntimeException則方法不會執行
2.beforeExecute:開始前,如果拋出RuntimeException則任務不會執行
3.terminated:在線程池關閉時,可以用來釋放資源等
二、遞歸算法的并行化
1.循環
在循環中,每次循環操作都是獨立的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for ( final Element e : elements) exec.execute( new Runnable() { public void run() { process(e); } }); } |
2.迭代
如果每個迭代操作是彼此獨立的,則可以串行執行
如:深度優先搜索算法;注意:遞歸還是串行的,但是,每個節點的計算是并行的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
//串行 計算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 計算compute 和串行迭代 public <T> void parallelRecursive( final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for ( final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //調用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; } |
實例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this .puzzle = puzzle; this .exec = initThreadPool(); this .seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null , null )); // 等待ValueLatch中閉鎖解開,則表示已經找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null ) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown(); //最終主線程關閉線程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super (pos, move, prev); } public void run() { //如果有一個線程找到了答案,則return,通過ValueLatch中isSet CountDownlatch閉鎖實現; //為類避免死鎖,將已經掃描的節點放入set集合中,避免繼續掃描產生死循環 if (solution.isSet() || seen.putIfAbsent(pos, true ) != null ){ return ; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue( this ); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this )); } } } } |
以上這篇java并發編程_線程池的使用方法(詳解)就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。