前言
前幾篇文章著重介紹了后端服務數據庫和多線程并行處理優化,并示例了改造前后的偽代碼邏輯。當然了,優化是無止境的,前人栽樹后人乘涼。作為我們開發者來說,既然站在了巨人的肩膀上,就要寫出更加優化的程序。
springboot開發案例之jdbctemplate批量操作
springboot開發案例之countdownlatch多任務并行處理
改造
理論上講,線程越多程序可能更快,但是在實際使用中我們需要考慮到線程本身的創建以及銷毀的資源消耗,以及保護操作系統本身的目的。我們通常需要將線程限制在一定的范圍之類,線程池就起到了這樣的作用。
程序邏輯
多任務并行+線程池處理.png
一張圖能解決的問題,就應該盡可能的少bb,當然底層原理性的東西還是需要大家去記憶并理解的。
java 線程池
java通過executors提供四種線程池,分別為:
- newcachedthreadpool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newfixedthreadpool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
- newscheduledthreadpool 創建一個定長線程池,支持定時及周期性任務執行。
- newsinglethreadexecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(fifo, lifo, 優先級)執行。
優點
- 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
- 可有效控制最大并發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
- 提供定時執行、定期執行、單線程、并發數控制等功能。
代碼實現
方式一(countdownlatch)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/** * 多任務并行+線程池統計 * 創建時間 2018年4月17日 */ public class statsdemo { final static simpledateformat sdf = new simpledateformat( "yyyy-mm-dd hh:mm:ss" ); final static string starttime = sdf.format( new date()); /** * io密集型任務 = 一般為2*cpu核心數(常出現于線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等) * cpu密集型任務 = 一般為cpu核心數+1(常出現于線程中:復雜算法) * 混合型任務 = 視機器配置和復雜度自測而定 */ private static int corepoolsize = runtime.getruntime().availableprocessors(); /** * public threadpoolexecutor(int corepoolsize,int maximumpoolsize,long keepalivetime, * timeunit unit,blockingqueue<runnable> workqueue) * corepoolsize用于指定核心線程數量 * maximumpoolsize指定最大線程數 * keepalivetime和timeunit指定線程空閑后的最大存活時間 * workqueue則是線程池的緩沖隊列,還未執行的線程會在隊列中等待 * 監控隊列長度,確保隊列有界 * 不當的線程池大小會使得處理速度變慢,穩定性下降,并且導致內存泄露。如果配置的線程過少,則隊列會持續變大,消耗過多內存。 * 而過多的線程又會 由于頻繁的上下文切換導致整個系統的速度變緩——殊途而同歸。隊列的長度至關重要,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請求。 * executorservice 默認的實現是一個無界的 linkedblockingqueue。 */ private static threadpoolexecutor executor = new threadpoolexecutor(corepoolsize, corepoolsize+ 1 , 10l, timeunit.seconds, new linkedblockingqueue<runnable>( 1000 )); public static void main(string[] args) throws interruptedexception { countdownlatch latch = new countdownlatch( 5 ); //使用execute方法 executor.execute( new stats( "任務a" , 1000 , latch)); executor.execute( new stats( "任務b" , 1000 , latch)); executor.execute( new stats( "任務c" , 1000 , latch)); executor.execute( new stats( "任務d" , 1000 , latch)); executor.execute( new stats( "任務e" , 1000 , latch)); latch.await(); // 等待所有人任務結束 system.out.println( "所有的統計任務執行完成:" + sdf.format( new date())); } static class stats implements runnable { string statsname; int runtime; countdownlatch latch; public stats(string statsname, int runtime, countdownlatch latch) { this .statsname = statsname; this .runtime = runtime; this .latch = latch; } public void run() { try { system.out.println(statsname+ " do stats begin at " + starttime); //模擬任務執行時間 thread.sleep(runtime); system.out.println(statsname + " do stats complete at " + sdf.format( new date())); latch.countdown(); //單次任務結束,計數器減一 } catch (interruptedexception e) { e.printstacktrace(); } } } } |
方式二(future)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
/** * 多任務并行+線程池統計 * 創建時間 2018年4月17日 */ public class statsdemo { final static simpledateformat sdf = new simpledateformat( "yyyy-mm-dd hh:mm:ss" ); final static string starttime = sdf.format( new date()); /** * io密集型任務 = 一般為2*cpu核心數(常出現于線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等) * cpu密集型任務 = 一般為cpu核心數+1(常出現于線程中:復雜算法) * 混合型任務 = 視機器配置和復雜度自測而定 */ private static int corepoolsize = runtime.getruntime().availableprocessors(); /** * public threadpoolexecutor(int corepoolsize,int maximumpoolsize,long keepalivetime, * timeunit unit,blockingqueue<runnable> workqueue) * corepoolsize用于指定核心線程數量 * maximumpoolsize指定最大線程數 * keepalivetime和timeunit指定線程空閑后的最大存活時間 * workqueue則是線程池的緩沖隊列,還未執行的線程會在隊列中等待 * 監控隊列長度,確保隊列有界 * 不當的線程池大小會使得處理速度變慢,穩定性下降,并且導致內存泄露。如果配置的線程過少,則隊列會持續變大,消耗過多內存。 * 而過多的線程又會 由于頻繁的上下文切換導致整個系統的速度變緩——殊途而同歸。隊列的長度至關重要,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請求。 * executorservice 默認的實現是一個無界的 linkedblockingqueue。 */ private static threadpoolexecutor executor = new threadpoolexecutor(corepoolsize, corepoolsize+ 1 , 10l, timeunit.seconds, new linkedblockingqueue<runnable>( 1000 )); public static void main(string[] args) throws interruptedexception { list<future<string>> resultlist = new arraylist<future<string>>(); //使用submit提交異步任務,并且獲取返回值為future resultlist.add(executor.submit( new stats( "任務a" , 1000 ))); resultlist.add(executor.submit( new stats( "任務b" , 1000 ))); resultlist.add(executor.submit( new stats( "任務c" , 1000 ))); resultlist.add(executor.submit( new stats( "任務d" , 1000 ))); resultlist.add(executor.submit( new stats( "任務e" , 1000 ))); //遍歷任務的結果 for (future<string> fs : resultlist) { try { system.out.println(fs.get()); //打印各個線任務執行的結果,調用future.get() 阻塞主線程,獲取異步任務的返回結果 } catch (interruptedexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } finally { //啟動一次順序關閉,執行以前提交的任務,但不接受新任務。如果已經關閉,則調用沒有其他作用。 executor.shutdown(); } } system.out.println( "所有的統計任務執行完成:" + sdf.format( new date())); } static class stats implements callable<string> { string statsname; int runtime; public stats(string statsname, int runtime) { this .statsname = statsname; this .runtime = runtime; } public string call() { try { system.out.println(statsname+ " do stats begin at " + starttime); //模擬任務執行時間 thread.sleep(runtime); system.out.println(statsname + " do stats complete at " + sdf.format( new date())); } catch (interruptedexception e) { e.printstacktrace(); } return call(); } } } |
執行時間
以上代碼,均是偽代碼,下面是2000+個學生的真實測試記錄。
2018-04-17 17:42:29.284 info 測試記錄81e51ab031eb4ada92743ddf66528d82-單線程順序執行,花費時間:3797
2018-04-17 17:42:31.452 info 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務,花費時間:2167
2018-04-17 17:42:33.170 info 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務+線程池,花費時間:1717
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://blog.52itstyle.com/archives/2705/