Python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
引例:
如之前創建多進程的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# -*- coding:utf-8 -*- from multiprocessing import Process,Pool import os,time def run_proc(name): ##定義一個函數用于進程調用 for i in range ( 5 ): time.sleep( 0.2 ) #休眠0.2秒 print 'Run child process %s (%s)' % (name, os.getpid()) #執行一次該函數共需1秒的時間 if __name__ = = '__main__' : #執行主進程 print 'Run the main process (%s).' % (os.getpid()) mainStart = time.time() #記錄主進程開始的時間 p = Pool( 8 ) #開辟進程池 for i in range ( 16 ): #開辟14個進程 p.apply_async(run_proc,args = ( 'Process' + str (i),)) #每個進程都調用run_proc函數, #args表示給該函數傳遞的參數。 print 'Waiting for all subprocesses done ...' p.close() #關閉進程池 p.join() #等待開辟的所有進程執行完后,主進程才繼續往下執行 print 'All subprocesses done' mainEnd = time.time() #記錄主進程結束時間 print 'All process ran %0.2f seconds.' % (mainEnd - mainStart) #主進程執行時間 |
運行結果:
1
2
3
4
5
6
7
|
Run the main process (36652). Waiting for all subprocesses done … Run child process Process0 (36708)Run child process Process1 (36748) Run child process Process3 (36736) Run child process Process2 (36716) Run child process Process4 (36768) |
如第3行的輸出,偶爾會出現這樣不如意的輸入格式,為什么呢?
原因是多個進程爭用打印輸出資源的結果。前一個進程為來得急輸出換行符,該資源就切換給了另一個進程使用,致使兩個進程輸出在同一行上,而前一個進程的換行符在下一次獲得資源時才打印輸出。
Lock
為了避免這種情況,需在進程進入臨界區(使進程進入臨界資源的那段代碼,稱為臨界區)時加鎖。
可以向如下這樣添加鎖后看看執行效果:
1
2
3
4
5
6
7
8
9
10
|
# -*- coding:utf-8 -*- lock = Lock() #申明一個全局的lock對象 def run_proc(name): global lock #引用全局鎖 for i in range ( 5 ): time.sleep( 0.2 ) lock.acquire() #申請鎖 print 'Run child process %s (%s)' % (name, os.getpid()) lock.release() #釋放鎖 |
Semaphore
Semaphore為信號量機制。當共享的資源擁有多個時,可用Semaphore來實現進程同步。其用法和Lock差不多,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減少1,當資源個數已為0時,就進入阻塞;每執行一次s.release(),占用的資源被釋放,該資源的可用個數增加1。
多進程的通信(信息交互)
不同進程之間進行數據交互,可能不少剛開始接觸多進程的同學會想到共享全局變量的方式,這樣通過向全局變量寫入和讀取信息便能實現信息交互。但是很遺憾,并不能這樣實現。
下面通過例子,加深對那篇文章的理解:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool import os import time L1 = [ 1 , 2 , 3 ] def add(a, b): global L1 L1 + = range (a, b) print L1 if __name__ = = '__main__' : p1 = Process(target = add, args = ( 20 , 30 )) p2 = Process(target = add, args = ( 30 , 40 )) p1.start() p2.start() p1.join() p2.join() print L1 |
輸出結果:
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]
該程序的原本目的是想將兩個子進程生成的列表加到全局變量L1中,但用該方法并不能達到想要的效果。既然不能通過全局變量來實現不同進程間的信息交互,那有什么辦法呢。
mutiprocessing為我們可以通過Queue和Pipe來實現進程間的通信。
Queue
按上面的例子通過Queue來實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, Lock L = [ 1 , 2 , 3 ] def add(q, lock, a, b): lock.acquire() # 加鎖避免寫入時出現不可預知的錯誤 L1 = range (a, b) lock.release() q.put(L1) print L1 if __name__ = = '__main__' : q = Queue() lock = Lock() p1 = Process(target = add, args = (q, lock, 20 , 30 )) p2 = Process(target = add, args = (q, lock, 30 , 40 )) p1.start() p2.start() p1.join() p2.join() L + = q.get() + q.get() print L |
執行結果:
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
下面介紹Queue的常用方法:
- 定義時可用q = Queue(maxsize = 10)來指定隊列的長度,默認時或maxsize值小于1時隊列為無限長度。
- q.put(item)方法向隊列放入元素,其還有一個可選參數block,默認為True,此時若隊列已滿則會阻塞等待,直到有空閑位置。而當black值為 False,在該情況下就會拋出Full異 常
- Queue是不可迭代的對象,不能通過for循環取值,取值時每次調用q.get()方法。同樣也有可選參數block,默認為True,若此時隊列為空則會阻塞等待。而black值為False時,在該情況下就會拋出Empty異常
- Queue.qsize() 返回隊列的大小
- Queue.empty() 如果隊列為空,返回True,反之False
- Queue.full() 如果隊列滿了,返回True,反之False
- Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間Queue.get_nowait() 相當Queue.get(False) 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
Pipe
Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。雙向Pipe允許兩端的進即可以發送又可以接受;單向的Pipe只允許前面的端口用于接收,后面的端口用于發送。
下面給出例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True : print "proc2 recieve:" , pipe.recv() if __name__ = = "__main__" : pipe = Pipe() p1 = Process(target = proc1, args = (pipe[ 0 ],)) p2 = Process(target = proc2, args = (pipe[ 1 ],)) p1.start() p2.start() p1.join() p2.join( 2 ) #限制執行時間最多為2秒 print '\nend all processes.' |
執行結果如下:
proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.
當第二行輸出后,因為管道中沒有數據傳來,Proc2處于阻塞狀態,2秒后被強制結束。
以下是單向管道的例子,注意pipe[0],pipe[1]的分配。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True : print "proc2 recieve:" , pipe.recv() if __name__ = = "__main__" : pipe = Pipe(duplex = False ) p1 = Process(target = proc1, args = (pipe[ 1 ],)) #pipe[1]為發送端 p2 = Process(target = proc2, args = (pipe[ 0 ],)) #pipe[0]為接收端 p1.start() p2.start() p1.join() p2.join( 2 ) # 限制執行時間最多為2秒 print '\nend all processes.' |
執行結果同上。
強大的Manage
Queue和Pipe實現的數據共享方式只支持兩種結構 Value 和 Array。Python中提供了強大的Manage專門用來做數據共享,其支持的類型非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等
其用法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import Process, Manager def func(dt, lt): for i in range ( 10 ): key = 'arg' + str (i) dt[key] = i * i lt + = range ( 11 , 16 ) if __name__ = = "__main__" : manager = Manager() dt = manager. dict () lt = manager. list () p = Process(target = func, args = (dt, lt)) p.start() p.join() print dt, '\n' , lt |
執行結果:
{‘arg8': 64, ‘arg9': 81, ‘arg0': 0, ‘arg1': 1, ‘arg2': 4, ‘arg3': 9, ‘arg4': 16, ‘arg5': 25, ‘arg6': 36, ‘arg7': 49}
[11, 12, 13, 14, 15]
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://blog.csdn.net/u014556057/article/details/66974452