一、簡介
多線程編程技術可以實現代碼并行性,優化處理能力,同時功能的更小劃分可以使代碼的可重用性更好。Python中threading和Queue模塊可以用來實現多線程編程。
二、詳解
1、線程和進程
進程(有時被稱為重量級進程)是程序的一次執行。每個進程都有自己的地址空間、內存、數據棧以及其它記錄其運行軌跡的輔助數據。操作系統管理在其上運行的所有進程,并為這些進程公平地分配時間。進程也可以通過fork和spawn操作來完成其它的任務,不過各個進程有自己的內存空間、數據棧等,所以只能使用進程間通訊(IPC),而不能直接共享信息。
線程(有時被稱為輕量級進程)跟進程有些相似,不同的是所有的線程運行在同一個進程中,共享相同的運行環境。它們可以想像成是在主進程或“主線程”中并行運行的“迷你進程”。線程有開始、順序執行和結束三部分,它有一個自己的指令指針,記錄自己運行到什么地方。線程的運行可能被搶占(中斷)或暫時的被掛起(也叫睡眠)讓其它的線程運行,這叫做讓步。一個進程中的各個線程之間共享同一片數據空間,所以線程之間可以比進程之間更方便地共享數據以及相互通訊。線程一般都是并發執行的,正是由于這種并行和數據共享的機制使得多個任務的合作變為可能。實際上,在單CPU的系統中,真正的并發是不可能的,每個線程會被安排成每次只運行一小會,然后就把CPU讓出來,讓其它的線程去運行。在進程的整個運行過程中,每個線程都只做自己的事,在需要的時候跟其它的線程共享運行的結果。多個線程共同訪問同一片數據不是完全沒有危險的,由于數據訪問的順序不一樣,有可能導致數據結果的不一致的問題,這叫做競態條件。而大多數線程庫都帶有一系列的同步原語,來控制線程的執行和數據的訪問。
2、使用線程
(1)全局解釋器鎖(GIL)
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。在多線程環境中,Python 虛擬機按以下方式執行:a、設置 GIL;b、切換到一個線程去運行;c、運行指定數量的字節碼指令或者線程主動讓出控制(可以調用 time.sleep(0));d、把線程設置為睡眠狀態;e、解鎖 GIL;d、再次重復以上所有步驟。
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由于在這期間沒有Python的字節碼被運行,所以不會做線程切換)編寫擴展的程序員可以主動解鎖GIL。
(2)退出線程
當一個線程結束計算,它就退出了。線程可以調用thread.exit()之類的退出函數,也可以使用Python退出進程的標準方法,如sys.exit()或拋出一個SystemExit異常等。不過,不可以直接“殺掉”("kill")一個線程。
不建議使用thread模塊,很明顯的一個原因是,當主線程退出的時候,所有其它線程沒有被清除就退出了。另一個模塊threading就能確保所有“重要的”子線程都退出后,進程才會結束。
(3)Python的線程模塊
Python提供了幾個用于多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用于多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出后進程才退出。
3、thread模塊
thread模塊除了產生線程外,thread模塊也提供了基本的同步數據結構鎖對象(lock object也叫原語鎖、簡單鎖、互斥鎖、互斥量、二值信號量)。同步原語與線程的管理是密不可分的。
常用的線程函數以及LockType類型的鎖對象的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/env python import thread from time import sleep, ctime def loop0(): print '+++start loop 0 at:' , ctime() sleep( 4 ) print '+++loop 0 done at:' , ctime() def loop1(): print '***start loop 1 at:' , ctime() sleep( 2 ) print '***loop 1 done at:' , ctime() def main(): print '------starting at:' , ctime() thread.start_new_thread(loop0, ()) thread.start_new_thread(loop1, ()) sleep( 6 ) print '------all DONE at:' , ctime() if __name__ = = '__main__' : main() |
thread 模塊提供的簡單的多線程的機制,兩個循環并發地被執行,總的運行時間為最慢的那個線程的運行時間(主線程6s),而不是所有的線程的運行時間之和。start_new_thread()要求要有前兩個參數,就算想要運行的函數不要參數,也要傳一個空的元組。
sleep(6)是讓主線程停下來,主線程一旦運行結束,就關閉運行著其他兩個線程。但這可能造成主線程過早或過晚退出,那就要使用線程鎖,可以在兩個子線程都退出后,主線程立即退出。
在CODE上查看代碼片派生到我的代碼片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#!/usr/bin/env python import thread from time import sleep, ctime loops = [ 4 , 2 ] def loop(nloop, nsec, lock): print '+++start loop:' , nloop, 'at:' , ctime() sleep(nsec) print '+++loop:' , nloop, 'done at:' , ctime() lock.release() def main(): print '---starting threads...' locks = [] nloops = range ( len (loops)) for i in nloops: lock = thread.allocate_lock() lock.acquire() locks.append(lock) for i in nloops: thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: while locks[i].locked(): pass print '---all DONE at:' , ctime() if __name__ = = '__main__' : main() |
4、threading模塊
更高級別的threading模塊,它不僅提供了Thread類,還提供了各種非常好用的同步機制。threading 模塊里所有的對象:
thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求它就在那等著,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。如果主線程退出不用等待那些子線程完成,那就設定這些線程的daemon屬性,即在線程thread.start()開始前,調用setDaemon()函數設定線程的daemon標志(thread.setDaemon(True))就表示這個線程“不重要”。如果想要等待子線程完成再退出,那就什么都不用做或者顯式地調用thread.setDaemon(False)以保證其daemon標志為False,可以調用thread.isDaemon()函數來判斷其daemon標志的值。新的子線程會繼承其父線程的daemon標志,整個Python會在所有的非守護線程退出后才會結束,即進程中沒有非守護線程存在的時候才結束。
(1)threading的Thread類
它有很多thread模塊里沒有的函數,Thread對象的函數:
創建一個Thread的實例,傳給它一個函數
在CODE上查看代碼片派生到我的代碼片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] def loop(nloop, nsec): print '+++start loop:' , nloop, 'at:' , ctime() sleep(nsec) print '+++loop:' , nloop, 'done at:' , ctime() def main(): print '---starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: t = threading.Thread(target = loop, args = (i, loops[i])) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all threads[i].join() # threads to finish print '---all DONE at:' , ctime() if __name__ = = '__main__' : main() |
實例化一個Thread(調用 Thread())與調用thread.start_new_thread()之間最大的區別就是,新的線程不會立即開始。在創建線程對象,但不想馬上開始運行線程的時候,這是一個很有用的同步特性。所有的線程都創建了之后,再一起調用 start()函數啟動,而不是創建一個啟動一個。而且也不用再管理一堆鎖(分配鎖、獲得鎖、釋放鎖、檢查鎖的狀態等),只要簡單地對每個線程調用join()主線程等待子線程的結束即可。join()還可以設置timeout的參數,即主線程等到超時為止。
join()的另一個比較重要的方面是它可以完全不用調用,一旦線程啟動后,就會一直運行,直到線程的函數結束,退出為止。如果主線程除了等線程結束外,還有其它的事情要做,那就不用調用 join(),只有在等待線程結束的時候才調用join()。
創建一個Thread的實例,傳給它一個可調用的類對象
[html] view plaincopy在CODE上查看代碼片派生到我的代碼片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] class ThreadFunc( object ): def __init__( self , func, args, name = ''): self .name = name self .func = func self .args = args def __call__( self ): apply ( self .func, self .args) def loop(nloop, nsec): print 'start loop' , nloop, 'at:' , ctime() sleep(nsec) print 'loop' , nloop, 'done at:' , ctime() def main(): print 'starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: # create all threads t = threading.Thread(target = ThreadFunc(loop, (i, loops[i]), loop.__name__)) threads.append(t) for i in nloops: # start all threads threads[i].start() for i in nloops: # wait for completion threads[i].join() print 'all DONE at:' , ctime() if __name__ = = '__main__' : main() |
與傳一個函數很相似的另一個方法是在創建線程的時候,傳一個可調用的類的實例供線程啟動的時候執行,這是多線程編程的一個更為面向對象的方法。相對于一個或幾個函數來說,類對象里可以使用類的強大的功能。創建新線程的時候,Thread對象會調用ThreadFunc對象,這時會用到一個特殊函數__call__()。由于已經有了要用的參數,所以就不用再傳到Thread()的構造函數中。由于有一個參數的元組,這時要使用apply()函數或使用self.res = self.func(*self.args)。
從Thread派生出一個子類,創建一個這個子類的實例
在CODE上查看代碼片派生到我的代碼片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
#!/usr/bin/env python import threading from time import sleep, ctime loops = [ 4 , 2 ] class MyThread(threading.Thread): def __init__( self , func, args, name = ''): threading.Thread.__init__( self ) self .name = name self .func = func self .args = args def getResult( self ): return self .res def run( self ): print 'starting' , self .name, 'at:' , ctime() self .res = apply ( self .func, self .args) print self .name, 'finished at:' , ctime() def loop(nloop, nsec): print 'start loop' , nloop, 'at:' , ctime() sleep(nsec) print 'loop' , nloop, 'done at:' , ctime() def main(): print 'starting at:' , ctime() threads = [] nloops = range ( len (loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.__name__) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print 'all DONE at:' , ctime() if __name__ = = '__main__' : main() |
子類化Thread類,MyThread子類的構造函數一定要先調用基類的構造函數,特殊函數__call__()在子類中,名字要改為run()。在 MyThread類中,加入一些用于調試的輸出信息,把代碼保存到myThread模塊中,并導入這個類。除使用apply()函數來運行這些函數之外,還可以把結果保存到實現的self.res屬性中,并創建一個新的函數getResult()來得到結果。
(2)threading模塊中的其它函數
5、Queue模塊
常用的 Queue 模塊的屬性:
Queue模塊可以用來進行線程間通訊,讓各個線程之間共享數據。Queue解決生產者-消費者的問題,現在創建一個隊列,讓生產者線程把新生產的貨物放進去供消費者線程使用。生產者生產貨物所要花費的時間無法預先確定,消費者消耗生產者生產的貨物的時間也是不確定的。
在CODE上查看代碼片派生到我的代碼片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#!/usr/bin/env python from random import randint from time import sleep from Queue import Queue from myThread import MyThread def writeQ(queue): print '+++producing object for Q...' , queue.put( 'xxx' , 1 ) print "+++size now:" , queue.qsize() def readQ(queue): val = queue.get( 1 ) print '---consumed object from Q... size now' , \ queue.qsize() def writer(queue, loops): for i in range (loops): writeQ(queue) sleep(randint( 1 , 3 )) def reader(queue, loops): for i in range (loops): readQ(queue) sleep(randint( 2 , 5 )) funcs = [writer, reader] nfuncs = range ( len (funcs)) def main(): nloops = randint( 2 , 5 ) q = Queue( 32 ) threads = [] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), \ funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print '***all DONE' if __name__ = = '__main__' : main() |
這個實現中使用了Queue對象和隨機地生產(和消耗)貨物的方式。生產者和消費者相互獨立并且并發地運行,它們不一定是輪流執行的(隨機數模擬)。writeQ()和readQ()函數分別用來把對象放入隊列和消耗隊列中的一個對象,在這里使用字符串'xxx'來表示隊列中的對象。writer()函數就是一次往隊列中放入一個對象,等待一會然后再做同樣的事,一共做指定的次數,這個次數是由腳本運行時隨機生成的。reader()函數做的事比較類似,只是它是用來消耗對象的。
6、線程相關模塊
多線程相關的標準庫模塊:
三、總結
(1)一個要完成多項任務的程序,可以考慮每個任務使用一個線程,這樣的程序在設計上相對于單線程做所有事的程序來說,更為清晰明了。
(2)單線程的程序在程序性能上的限制,尤其在有相互獨立、運行時間不確定、多個任務的程序里,而把多個任務分隔成多個線程同時運行會比順序運行速度更快。由于Python解釋器是單線程的,所以不是所有的程序都能從多線程中得到好處。
(3)若有不足,請留言,在此先感謝!