進(jìn)程與線程的歷史
我們都知道計(jì)算機(jī)是由硬件和軟件組成的。硬件中的CPU是計(jì)算機(jī)的核心,它承擔(dān)計(jì)算機(jī)的所有任務(wù)。 操作系統(tǒng)是運(yùn)行在硬件之上的軟件,是計(jì)算機(jī)的管理者,它負(fù)責(zé)資源的管理和分配、任務(wù)的調(diào)度。 程序是運(yùn)行在系統(tǒng)上的具有某種功能的軟件,比如說(shuō)瀏覽器,音樂(lè)播放器等。 每次執(zhí)行程序的時(shí)候,都會(huì)完成一定的功能,比如說(shuō)瀏覽器幫我們打開(kāi)網(wǎng)頁(yè),為了保證其獨(dú)立性,就需要一個(gè)專門的管理和控制執(zhí)行程序的數(shù)據(jù)結(jié)構(gòu)——進(jìn)程控制塊。 進(jìn)程就是一個(gè)程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行過(guò)程。 進(jìn)程一般由程序、數(shù)據(jù)集、進(jìn)程控制塊三部分組成。我們編寫的程序用來(lái)描述進(jìn)程要完成哪些功能以及如何完成;數(shù)據(jù)集則是程序在執(zhí)行過(guò)程中所需要使用的資源;進(jìn)程控制塊用來(lái)記錄進(jìn)程的外部特征,描述進(jìn)程的執(zhí)行變化過(guò)程,系統(tǒng)可以利用它來(lái)控制和管理進(jìn)程,它是系統(tǒng)感知進(jìn)程存在的唯一標(biāo)志。
在早期的操作系統(tǒng)里,計(jì)算機(jī)只有一個(gè)核心,進(jìn)程執(zhí)行程序的最小單位,任務(wù)調(diào)度采用時(shí)間片輪轉(zhuǎn)的搶占式方式進(jìn)行進(jìn)程調(diào)度。每個(gè)進(jìn)程都有各自的一塊獨(dú)立的內(nèi)存,保證進(jìn)程彼此間的內(nèi)存地址空間的隔離。 隨著計(jì)算機(jī)技術(shù)的發(fā)展,進(jìn)程出現(xiàn)了很多弊端,一是進(jìn)程的創(chuàng)建、撤銷和切換的開(kāi)銷比較大,二是由于對(duì)稱多處理機(jī)(對(duì)稱多處理機(jī)(SymmetricalMulti-Processing)又叫SMP,是指在一個(gè)計(jì)算機(jī)上匯集了一組處理器(多CPU),各CPU之間共享內(nèi)存子系統(tǒng)以及總線結(jié)構(gòu))的出現(xiàn),可以滿足多個(gè)運(yùn)行單位,而多進(jìn)程并行開(kāi)銷過(guò)大。 這個(gè)時(shí)候就引入了線程的概念。 線程也叫輕量級(jí)進(jìn)程,它是一個(gè)基本的CPU執(zhí)行單元,也是程序執(zhí)行過(guò)程中的最小單元,由線程ID、程序計(jì)數(shù)器、寄存器集合 和堆棧共同組成。線程的引入減小了程序并發(fā)執(zhí)行時(shí)的開(kāi)銷,提高了操作系統(tǒng)的并發(fā)性能。 線程沒(méi)有自己的系統(tǒng)資源,只擁有在運(yùn)行時(shí)必不可少的資源。但線程可以與同屬與同一進(jìn)程的其他線程共享進(jìn)程所擁有的其他資源。
進(jìn)程與線程之間的關(guān)系
線程是屬于進(jìn)程的,線程運(yùn)行在進(jìn)程空間內(nèi),同一進(jìn)程所產(chǎn)生的線程共享同一內(nèi)存空間,當(dāng)進(jìn)程退出時(shí)該進(jìn)程所產(chǎn)生的線程都會(huì)被強(qiáng)制退出并清除。線程可與屬于同一進(jìn)程的其它線程共享進(jìn)程所擁有的全部資源,但是其本身基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的信息(如程序計(jì)數(shù)器、一組寄存器和棧)。
python 線程
Threading用于提供線程相關(guān)的操作,線程是應(yīng)用程序中工作的最小單元。
1、threading模塊
threading 模塊建立在 _thread 模塊之上。thread 模塊以低級(jí)、原始的方式來(lái)處理和控制線程,而 threading 模塊通過(guò)對(duì) thread 進(jìn)行二次封裝,提供了更方便的 api 來(lái)處理線程。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import threading import time def worker(num): """ thread worker function :return: """ time.sleep( 1 ) print ( "The num is %d" % num) return for i in range ( 20 ): t = threading.Thread(target = worker,args = (i,),name = “t. % d” % i) t.start() |
上述代碼創(chuàng)建了20個(gè)“前臺(tái)”線程,然后控制器就交給了CPU,CPU根據(jù)指定算法進(jìn)行調(diào)度,分片執(zhí)行指令。
Thread方法說(shuō)明
t.start() : 激活線程,
t.getName() : 獲取線程的名稱
t.setName() : 設(shè)置線程的名稱
t.name : 獲取或設(shè)置線程的名稱
t.is_alive() : 判斷線程是否為激活狀態(tài)
t.isAlive() :判斷線程是否為激活狀態(tài)
t.setDaemon() 設(shè)置為后臺(tái)線程或前臺(tái)線程(默認(rèn):False);通過(guò)一個(gè)布爾值設(shè)置線程是否為守護(hù)線程,必須在執(zhí)行start()方法之后才可以使用。如果是后臺(tái)線程,主線程執(zhí)行過(guò)程中,后臺(tái)線程也在進(jìn)行,主線程執(zhí)行完畢后,后臺(tái)線程不論成功與否,均停止;如果是前臺(tái)線程,主線程執(zhí)行過(guò)程中,前臺(tái)線程也在進(jìn)行,主線程執(zhí)行完畢后,等待前臺(tái)線程也執(zhí)行完成后,程序停止
t.isDaemon() : 判斷是否為守護(hù)線程
t.ident :獲取線程的標(biāo)識(shí)符。線程標(biāo)識(shí)符是一個(gè)非零整數(shù),只有在調(diào)用了start()方法之后該屬性才有效,否則它只返回None。
t.join() :逐個(gè)執(zhí)行每個(gè)線程,執(zhí)行完畢后繼續(xù)往下執(zhí)行,該方法使得多線程變得無(wú)意義
t.run() :線程被cpu調(diào)度后自動(dòng)執(zhí)行線程對(duì)象的run方法
2、線程鎖threading.RLock和threading.Lock
由于線程之間是進(jìn)行隨機(jī)調(diào)度,并且每個(gè)線程可能只執(zhí)行n條執(zhí)行之后,CPU接著執(zhí)行其他線程。為了保證數(shù)據(jù)的準(zhǔn)確性,引入了鎖的概念。所以,可能出現(xiàn)如下問(wèn)題:
例:假設(shè)列表A的所有元素就為0,當(dāng)一個(gè)線程從前向后打印列表的所有元素,另外一個(gè)線程則從后向前修改列表的元素為1,那么輸出的時(shí)候,列表的元素就會(huì)一部分為0,一部分為1,這就導(dǎo)致了數(shù)據(jù)的不一致。鎖的出現(xiàn)解決了這個(gè)問(wèn)題。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 獲得鎖 global globals_num globals_num + = 1 time.sleep( 1 ) print (globals_num) lock.release() # 釋放鎖 for i in range ( 10 ): t = threading.Thread(target = Func) t.start() |
3、threading.RLock和threading.Lock 的區(qū)別
RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。 如果使用RLock,那么acquire和release必須成對(duì)出現(xiàn),即調(diào)用了n次acquire,必須調(diào)用n次的release才能真正釋放所占用的瑣。
1
2
3
4
5
6
7
8
9
10
11
12
|
import threading lock = threading.Lock() #Lock對(duì)象 lock.acquire() lock.acquire() #產(chǎn)生了死瑣。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock對(duì)象 rLock.acquire() rLock.acquire() #在同一線程內(nèi),程序不會(huì)堵塞。 rLock.release() rLock.release() |
4、threading.Event
python線程的事件用于主線程控制其他線程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。
事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞。
•clear:將“Flag”設(shè)置為False
•set:將“Flag”設(shè)置為True
•Event.isSet() :判斷標(biāo)識(shí)位是否為Ture。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import threading def do(event): print ( 'start' ) event.wait() print ( 'execute' ) event_obj = threading.Event() for i in range ( 10 ): t = threading.Thread(target = do, args = (event_obj,)) t.start() event_obj.clear() inp = input ( 'input:' ) if inp = = 'true' : event_obj. set ( |
當(dāng)線程執(zhí)行的時(shí)候,如果flag為False,則線程會(huì)阻塞,當(dāng)flag為True的時(shí)候,線程不會(huì)阻塞。它提供了本地和遠(yuǎn)程的并發(fā)性。
5、threading.Condition
一個(gè)condition變量總是與某些類型的鎖相聯(lián)系,這個(gè)可以使用默認(rèn)的情況或創(chuàng)建一個(gè),當(dāng)幾個(gè)condition變量必須共享和同一個(gè)鎖的時(shí)候,是很有用的。鎖是conditon對(duì)象的一部分:沒(méi)有必要分別跟蹤。
condition變量服從上下文管理協(xié)議:with語(yǔ)句塊封閉之前可以獲取與鎖的聯(lián)系。 acquire() 和 release() 會(huì)調(diào)用與鎖相關(guān)聯(lián)的相應(yīng)的方法。
其他和鎖關(guān)聯(lián)的方法必須被調(diào)用,wait()方法會(huì)釋放鎖,當(dāng)另外一個(gè)線程使用 notify() or notify_all()喚醒它之前會(huì)一直阻塞。一旦被喚醒,wait()會(huì)重新獲得鎖并返回,
Condition類實(shí)現(xiàn)了一個(gè)conditon變量。 這個(gè)conditiaon變量允許一個(gè)或多個(gè)線程等待,直到他們被另一個(gè)線程通知。 如果lock參數(shù),被給定一個(gè)非空的值,,那么他必須是一個(gè)lock或者Rlock對(duì)象,它用來(lái)做底層鎖。否則,會(huì)創(chuàng)建一個(gè)新的Rlock對(duì)象,用來(lái)做底層鎖.
•wait(timeout=None) : 等待通知,或者等到設(shè)定的超時(shí)時(shí)間。當(dāng)調(diào)用這wait()方法時(shí),如果調(diào)用它的線程沒(méi)有得到鎖,那么會(huì)拋出一個(gè)RuntimeError 異常。 wati()釋放鎖以后,在被調(diào)用相同條件的另一個(gè)進(jìn)程用notify() or notify_all() 叫醒之前 會(huì)一直阻塞。wait() 還可以指定一個(gè)超時(shí)時(shí)間。
如果有等待的線程,notify()方法會(huì)喚醒一個(gè)在等待conditon變量的線程。notify_all() 則會(huì)喚醒所有在等待conditon變量的線程。
注意: notify()和notify_all()不會(huì)釋放鎖,也就是說(shuō),線程被喚醒后不會(huì)立刻返回他們的wait() 調(diào)用。除非線程調(diào)用notify()和notify_all()之后放棄了鎖的所有權(quán)。
在典型的設(shè)計(jì)風(fēng)格里,利用condition變量用鎖去通許訪問(wèn)一些共享狀態(tài),線程在獲取到它想得到的狀態(tài)前,會(huì)反復(fù)調(diào)用wait()。修改狀態(tài)的線程在他們狀態(tài)改變時(shí)調(diào)用 notify() or notify_all(),用這種方式,線程會(huì)盡可能的獲取到想要的一個(gè)等待者狀態(tài)。 例子: 生產(chǎn)者-消費(fèi)者模型,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import threading import time def consumer(cond): with cond: print ( "consumer before wait" ) cond.wait() print ( "consumer after wait" ) def producer(cond): with cond: print ( "producer before notifyAll" ) cond.notifyAll() print ( "producer after notifyAll" ) condition = threading.Condition() c1 = threading.Thread(name = "c1" , target = consumer, args = (condition,)) c2 = threading.Thread(name = "c2" , target = consumer, args = (condition,)) p = threading.Thread(name = "p" , target = producer, args = (condition,)) c1.start() time.sleep( 2 ) c2.start() time.sleep( 2 ) p.start() |
6、queue模塊
Queue 就是對(duì)隊(duì)列,它是線程安全的
舉例來(lái)說(shuō),我們?nèi)湲?dāng)勞吃飯。飯店里面有廚師職位,前臺(tái)負(fù)責(zé)把廚房做好的飯賣給顧客,顧客則去前臺(tái)領(lǐng)取做好的飯。這里的前臺(tái)就相當(dāng)于我們的隊(duì)列。形成管道樣,廚師做好飯通過(guò)前臺(tái)傳送給顧客,所謂單向隊(duì)列
這個(gè)模型也叫生產(chǎn)者-消費(fèi)者模型。
import queue
q = queue.Queue(maxsize=0) # 構(gòu)造一個(gè)先進(jìn)顯出隊(duì)列,maxsize指定隊(duì)列長(zhǎng)度,為0 時(shí),表示隊(duì)列長(zhǎng)度無(wú)限制。
q.join() # 等到隊(duì)列為kong的時(shí)候,在執(zhí)行別的操作
q.qsize() # 返回隊(duì)列的大小 (不可靠)
q.empty() # 當(dāng)隊(duì)列為空的時(shí)候,返回True 否則返回False (不可靠)
q.full() # 當(dāng)隊(duì)列滿的時(shí)候,返回True,否則返回False (不可靠)
q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以參數(shù)block默認(rèn)為True,表示當(dāng)隊(duì)列滿時(shí),會(huì)等待隊(duì)列給出可用位置,
為False時(shí)為非阻塞,此時(shí)如果隊(duì)列已滿,會(huì)引發(fā)queue.Full 異常。 可選參數(shù)timeout,表示 會(huì)阻塞設(shè)置的時(shí)間,過(guò)后,
如果隊(duì)列無(wú)法給出放入item的位置,則引發(fā) queue.Full 異常
q.get(block=True, timeout=None) # 移除并返回隊(duì)列頭部的一個(gè)值,可選參數(shù)block默認(rèn)為True,表示獲取值的時(shí)候,如果隊(duì)列為空,則阻塞,為False時(shí),不阻塞,
若此時(shí)隊(duì)列為空,則引發(fā) queue.Empty異常。 可選參數(shù)timeout,表示會(huì)阻塞設(shè)
置的時(shí)候,過(guò)后,如果隊(duì)列為空,則引發(fā)Empty異常。
q.put_nowait(item) # 等效于 put(item,block=False)
q.get_nowait() # 等效于 get(item,block=False)
代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python import Queue import threading message = Queue.Queue( 10 ) def producer(i): while True : message.put(i) def consumer(i): while True : msg = message.get() for i in range ( 12 ): t = threading.Thread(target = producer, args = (i,)) t.start() for i in range ( 10 ): t = threading.Thread(target = consumer, args = (i,)) t.start() |
那就自己做個(gè)線程池吧:
方法一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# 簡(jiǎn)單往隊(duì)列中傳輸線程數(shù) import threading import time import queue class Threadingpool(): def __init__( self ,max_num = 10 ): self .queue = queue.Queue(max_num) for i in range (max_num): self .queue.put(threading.Thread) def getthreading( self ): return self .queue.get() def addthreading( self ): self .queue.put(threading.Thread) def func(p,i): time.sleep( 1 ) print (i) p.addthreading() if __name__ = = "__main__" : p = Threadingpool() for i in range ( 20 ): thread = p.getthreading() t = thread(target = func, args = (p,i)) t.start() |
方法二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
#往隊(duì)列中無(wú)限添加任務(wù) import queue import threading import contextlib import time StopEvent = object () class ThreadPool( object ): def __init__( self , max_num): self .q = queue.Queue() self .max_num = max_num self .terminal = False self .generate_list = [] self .free_list = [] def run( self , func, args, callback = None ): """ 線程池執(zhí)行一個(gè)任務(wù) :param func: 任務(wù)函數(shù) :param args: 任務(wù)函數(shù)所需參數(shù) :param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個(gè)參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認(rèn)為None,即:不執(zhí)行回調(diào)函數(shù)) :return: 如果線程池已經(jīng)終止,則返回True否則None """ if len ( self .free_list) = = 0 and len ( self .generate_list) < self .max_num: self .generate_thread() w = (func, args, callback,) self .q.put(w) def generate_thread( self ): """ 創(chuàng)建一個(gè)線程 """ t = threading.Thread(target = self .call) t.start() def call( self ): """ 循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù) """ current_thread = threading.currentThread self .generate_list.append(current_thread) event = self .q.get() # 獲取線程 while event ! = StopEvent: # 判斷獲取的線程數(shù)不等于全局變量 func, arguments, callback = event # 拆分元祖,獲得執(zhí)行函數(shù),參數(shù),回調(diào)函數(shù) try : result = func( * arguments) # 執(zhí)行函數(shù) status = True except Exception as e: # 函數(shù)執(zhí)行失敗 status = False result = e if callback is not None : try : callback(status, result) except Exception as e: pass # self.free_list.append(current_thread) # event = self.q.get() # self.free_list.remove(current_thread) with self .work_state(): event = self .q.get() else : self .generate_list.remove(current_thread) def close( self ): """ 關(guān)閉線程,給傳輸全局非元祖的變量來(lái)進(jìn)行關(guān)閉 :return: """ for i in range ( len ( self .generate_list)): self .q.put(StopEvent) def terminate( self ): """ 突然關(guān)閉線程 :return: """ self .terminal = True while self .generate_list: self .q.put(StopEvent) self .q.empty() @contextlib .contextmanager def work_state( self ): self .free_list.append(threading.currentThread) try : yield finally : self .free_list.remove(threading.currentThread) def work(i): print (i) return i + 1 # 返回給回調(diào)函數(shù) def callback(ret): print (ret) pool = ThreadPool( 10 ) for item in range ( 50 ): pool.run(func = work, args = (item,),callback = callback) pool.terminate() # pool.close() |
python 進(jìn)程
multiprocessing是python的多進(jìn)程管理包,和threading.Thread類似。
1、multiprocessing模塊
直接從側(cè)面用subprocesses替換線程使用GIL的方式,由于這一點(diǎn),multiprocessing模塊可以讓程序員在給定的機(jī)器上充分的利用CPU。在multiprocessing中,通過(guò)創(chuàng)建Process對(duì)象生成進(jìn)程,然后調(diào)用它的start()方法,
1
2
3
4
5
6
7
|
from multiprocessing import Process def func(name): print ( 'hello' , name) if __name__ = = "__main__" : p = Process(target = func,args = ( 'zhangyanlin' ,)) p.start() p.join() # 等待進(jìn)程執(zhí)行完畢 |
在使用并發(fā)設(shè)計(jì)的時(shí)候最好盡可能的避免共享數(shù)據(jù),尤其是在使用多進(jìn)程的時(shí)候。 如果你真有需要 要共享數(shù)據(jù), multiprocessing提供了兩種方式。
(1)multiprocessing,Array,Value
數(shù)據(jù)可以用Value或Array存儲(chǔ)在一個(gè)共享內(nèi)存地圖里,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import Array,Value,Process def func(a,b): a.value = 3.333333333333333 for i in range ( len (b)): b[i] = - b[i] if __name__ = = "__main__" : num = Value( 'd' , 0.0 ) arr = Array( 'i' , range ( 11 )) c = Process(target = func,args = (num,arr)) d = Process(target = func,args = (num,arr)) c.start() d.start() c.join() d.join() print (num.value) for i in arr: print (i)<br>輸出:<br> 3.1415927 <br> [ 0 , - 1 , - 2 , - 3 , - 4 , - 5 , - 6 , - 7 , - 8 , - 9 ] |
創(chuàng)建num和arr時(shí),“d”和“i”參數(shù)由Array模塊使用的typecodes創(chuàng)建:“d”表示一個(gè)雙精度的浮點(diǎn)數(shù),“i”表示一個(gè)有符號(hào)的整數(shù),這些共享對(duì)象將被線程安全的處理。
Array(‘i', range(10))中的‘i'參數(shù):
‘c': ctypes.c_char ‘u': ctypes.c_wchar ‘b': ctypes.c_byte ‘B': ctypes.c_ubyte
‘h': ctypes.c_short ‘H': ctypes.c_ushort ‘i': ctypes.c_int ‘I': ctypes.c_uint
‘l': ctypes.c_long, ‘L': ctypes.c_ulong ‘f': ctypes.c_float ‘d': ctypes.c_double
(2)multiprocessing,Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import Process,Manager def f(d,l): d[ "name" ] = "zhangyanlin" d[ "age" ] = 18 d[ "Job" ] = "pythoner" l.reverse() if __name__ = = "__main__" : with Manager() as man: d = man. dict () l = man. list ( range ( 10 )) p = Process(target = f,args = (d,l)) p.start() p.join() print (d) print (l)<br><br>輸出: { 0.25 : None , 1 : '1' , '2' : 2 } [ 9 , 8 , 7 , 6 , 5 , 4 , 3 , 2 , 1 , 0 ] |
Server process manager比 shared memory 更靈活,因?yàn)樗梢灾С秩我獾膶?duì)象類型。另外,一個(gè)單獨(dú)的manager可以通過(guò)進(jìn)程在網(wǎng)絡(luò)上不同的計(jì)算機(jī)之間共享,不過(guò)他比shared memory要慢。
2、進(jìn)程池(Using a pool of workers)
Pool類描述了一個(gè)工作進(jìn)程池,他有幾種不同的方法讓任務(wù)卸載工作進(jìn)程。
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒(méi)有可供使用的進(jìn)進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。
我們可以用Pool類創(chuàng)建一個(gè)進(jìn)程池, 展開(kāi)提交的任務(wù)給進(jìn)程池。 例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#apply from multiprocessing import Pool import time def f1(i): time.sleep( 0.5 ) print (i) return i + 100 if __name__ = = "__main__" : pool = Pool( 5 ) for i in range ( 1 , 31 ): pool. apply (func = f1,args = (i,)) #apply_async def f1(i): time.sleep( 0.5 ) print (i) return i + 100 def f2(arg): print (arg) if __name__ = = "__main__" : pool = Pool( 5 ) for i in range ( 1 , 31 ): pool.apply_async(func = f1,args = (i,),callback = f2) pool.close() pool.join() |
一個(gè)進(jìn)程池對(duì)象可以控制工作進(jìn)程池的哪些工作可以被提交,它支持超時(shí)和回調(diào)的異步結(jié)果,有一個(gè)類似map的實(shí)現(xiàn)。
•processes :使用的工作進(jìn)程的數(shù)量,如果processes是None那么使用 os.cpu_count()返回的數(shù)量。
•initializer: 如果initializer是None,那么每一個(gè)工作進(jìn)程在開(kāi)始的時(shí)候會(huì)調(diào)用initializer(*initargs)。
•maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)心的工作進(jìn)程來(lái)替代原進(jìn)程,來(lái)讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。
•context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用 multiprocessing.Pool() 或者一個(gè)context對(duì)象的Pool()方法來(lái)創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context
注意:Pool對(duì)象的方法只可以被創(chuàng)建pool的進(jìn)程所調(diào)用。
New in version 3.2: maxtasksperchild
New in version 3.4: context
進(jìn)程池的方法
apply(func[, args[, kwds]]) :使用arg和kwds參數(shù)調(diào)用func函數(shù),結(jié)果返回前會(huì)一直阻塞,由于這個(gè)原因,apply_async()更適合并發(fā)執(zhí)行,另外,func函數(shù)僅被pool中的一個(gè)進(jìn)程運(yùn)行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個(gè)變體,會(huì)返回一個(gè)結(jié)果對(duì)象。如果callback被指定,那么callback可以接收一個(gè)參數(shù)然后被調(diào)用,當(dāng)結(jié)果準(zhǔn)備好回調(diào)時(shí)會(huì)調(diào)用callback,調(diào)用失敗時(shí),則用error_callback替換callback。 Callbacks應(yīng)被立即完成,否則處理結(jié)果的線程會(huì)被阻塞。
close() : 阻止更多的任務(wù)提交到pool,待任務(wù)完成后,工作進(jìn)程會(huì)退出。
terminate() : 不管任務(wù)是否完成,立即停止工作進(jìn)程。在對(duì)pool對(duì)象進(jìn)程垃圾回收的時(shí)候,會(huì)立即調(diào)用terminate()。
join() : wait工作線程的退出,在調(diào)用join()前,必須調(diào)用close() or terminate()。這樣是因?yàn)楸唤K止的進(jìn)程需要被父進(jìn)程調(diào)用wait(join等價(jià)與wait),否則進(jìn)程會(huì)成為僵尸進(jìn)程。
1
2
3
4
5
6
|
map (func, iterable[, chunksize])¶ map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶ imap(func, iterable[, chunksize])¶ imap_unordered(func, iterable[, chunksize]) starmap(func, iterable[, chunksize])¶ starmap_async(func, iterable[, chunksize[, callback[, error_back]]]) |
線程和進(jìn)程的操作是由程序觸發(fā)系統(tǒng)接口,最后的執(zhí)行者是系統(tǒng);協(xié)程的操作則是程序員。
協(xié)程存在的意義:對(duì)于多線程應(yīng)用,CPU通過(guò)切片的方式來(lái)切換線程間的執(zhí)行,線程切換時(shí)需要耗時(shí)(保存狀態(tài),下次繼續(xù))。協(xié)程,則只使用一個(gè)線程,在一個(gè)線程中規(guī)定某個(gè)代碼塊執(zhí)行順序。
協(xié)程的適用場(chǎng)景:當(dāng)程序中存在大量不需要CPU的操作時(shí)(IO),適用于協(xié)程;
event loop是協(xié)程執(zhí)行的控制點(diǎn), 如果你希望執(zhí)行協(xié)程, 就需要用到它們。
event loop提供了如下的特性:
•注冊(cè)、執(zhí)行、取消延時(shí)調(diào)用(異步函數(shù))
•創(chuàng)建用于通信的client和server協(xié)議(工具)
•創(chuàng)建和別的程序通信的子進(jìn)程和協(xié)議(工具)
•把函數(shù)調(diào)用送入線程池中
協(xié)程示例:
1
2
3
4
5
6
7
8
9
10
|
import asyncio async def cor1(): print ( "COR1 start" ) await cor2() print ( "COR1 end" ) async def cor2(): print ( "COR2" ) loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close() |
最后三行是重點(diǎn)。
•asyncio.get_event_loop() : asyncio啟動(dòng)默認(rèn)的event loop
•run_until_complete() : 這個(gè)函數(shù)是阻塞執(zhí)行的,知道所有的異步函數(shù)執(zhí)行完成,
•close() : 關(guān)閉event loop。
1、greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import greenlet def fun1(): print ( "12" ) gr2.switch() print ( "56" ) gr2.switch() def fun2(): print ( "34" ) gr1.switch() print ( "78" ) gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch() |
2、gevent
gevent屬于第三方模塊需要下載安裝包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
pip3 install - - upgrade pip3 pip3 install gevent import gevent def fun1(): print ( "www.baidu.com" ) # 第一步 gevent.sleep( 0 ) print ( "end the baidu.com" ) # 第三步 def fun2(): print ( "www.zhihu.com" ) # 第二步 gevent.sleep( 0 ) print ( "end th zhihu.com" ) # 第四步 gevent.joinall([ gevent.spawn(fun1), gevent.spawn(fun2), ]) |
遇到IO操作自動(dòng)切換:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import gevent import requests def func(url): print ( "get: %s" % url) gevent.sleep( 0 ) date = requests.get(url) ret = date.text print (url, len (ret)) gevent.joinall([ gevent.spawn(func, 'https://www.python.org/' ), gevent.spawn(func, 'https://www.yahoo.com/' ), gevent.spawn(func, 'https://github.com/' ), ]) |
以上所述是小編給大家介紹的深入淺析python中的多進(jìn)程、多線程、協(xié)程的相關(guān)知識(shí),希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)服務(wù)器之家網(wǎng)站的支持!