進程和線程
什么是進程?
進程就是正在運行的程序, 一個任務就是一個進程, 進程的主要工作是管理資源, 而不是實現功能
什么是線程?
線程的主要工作是去實現功能, 比如執行計算.
線程和進程的關系就像員工與老板的關系,
老板(進程) 提供資源 和 工作空間,
員工(線程) 負責去完成相應的任務
特點
一個進程至少由一個線程, 這一個必須存在的線程被稱為主線程, 同時一個進程也可以有多個線程, 即多線程
當我們我們遇到一些需要重復執行的代碼時, 就可以使用多線程分擔一些任務, 進而加快運行速度
線程的實現
線程模塊
python通過兩個標準庫_thread和threading, 提供對線程的支持 , threading對_thread進行了封裝。
threading模塊中提供了thread , lock , rlock , condition等組件。
因此在實際的使用中我們一般都是使用threading來實現多線程
線程包括子線程和主線程:
主線程 : 當一個程序啟動時 , 就有一個線程開始運行 , 該線程通常叫做程序的主線程
子線程 : 因為程序是開始時就執行的 , 如果你需要再創建線程 , 那么創建的線程就是這個主線程的子線程
主線程的重要性體現在兩方面 :
- 是產生其他子線程的線程
- 通常它必須最后完成執行, 比如執行各種關閉操作
thread類
常用參數說明
參數 | 說明 |
---|---|
target | 表示調用的對象, 即子線程要執行的任務, 可以是某個內置方法, 或是你自己寫的函數 |
name | 子線程的名稱 |
args | 傳入target函數中的位置參數, 是一個元組, 參數后必須加逗號 |
常用實例方法
方法 | 作用 |
---|---|
thread.run(self) | 線程啟動時運行的方法, 由該方法調用 target參數所指定的函數 |
thread.start(self) | 啟動進程, start方法就是區幫你調用run方法 |
thread.terminate(self) | 強制終止線程 |
thread.join(self, timeout=none) | 阻塞調用, 主線程進行等待 |
thread.setdaemon(self, daemonic) | 將子線程設置為守護線程, 隨主線程結束而結束 |
thread.getname(self, name) | 獲取線程名 |
thread.setname(self, name) | 設置線程名 |
創建線程
在python中創建線程有兩種方式, 實例thread類和繼承重寫thread類
實例thread類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import threading import time def run(name, s): # 線程要執行的任務 time.sleep(s) # 停兩秒 print ( 'i am %s' % name) # 實例化線程類, 并傳入函數及其參數, t1 = threading.thread(target = run, name = 'one' , args = ( 'one' , 5 )) t2 = threading.thread(target = run, name = 'two' , args = ( 'two' , 2 )) # 開始執行, 這兩個線程會同步執行 t1.start() t2.start() print (t1.getname()) # 獲取線程名 print (t2.getname()) # result: one two i am two # 運行2s后 i am one # 運行5s后 |
繼承thread類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
class mythread(threading.thread): # 繼承threading中的thread類 # 線程所需的參數 def __init__( self , name, second): super ().__init__() self .name = name self .second = second # 重寫run方法,表示線程所執行的任務,必須有 def run( self ): time.sleep( self .second) print ( 'i am %s' % self .name) # 創建線程實例 t1 = mythread( 'one' , 5 ) t2 = mythread( 'two' , 2 ) # 啟動線程,實際上是調用了類中的run方法 t1.start() t2.start() t1.join() print (t1.getname()) print (t2.getname()) # result: i am two # 運行后2s i am one # 運行后5s one two |
常用方法
join()
阻塞調用程序 , 直到調用join () 方法的線程執行結束, 才會繼續往下執行
1
2
3
4
5
6
7
8
9
10
11
12
|
# 開始執行, 這兩個線程會同步執行 t1.start() t2.start() t1.join() # 等待t1線程執行完畢,再繼續執行剩余的代碼 print (t1.getname()) print (t2.getname()) # result: i am two i am one one two |
setdemon()
使用給線程設置守護模式: 子線程跟隨主線程的結束而結束, 不管這個子線程任務是否完成. 而非守護模式的子線程只有在執行完成后, 主線程才會執行完成
setdaemon() 與 join() 基本上是相對的 , join會等子線程執行完畢 ; 而setdaemon則不會等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
def run(name, s): # 線程要執行的函數 time.sleep(s) # 停兩秒 print ( 'i am %s' % name) # 實例化線程類, 并傳入函數及其參數 t1 = threading.thread(target = run, name = 'one' , args = ( 'one' , 5 )) t2 = threading.thread(target = run, name = 'two' , args = ( 'two' , 2 )) # 給t1設置守護模式, 使其隨著主線程的結束而結束 t1.setdaemon(true) # 開始執行, 這兩個線程會同步執行 t1.start() t2.start() # 主線程會等待未設置守護模式的線程t2執行完成 # result: i am two # 運行后2s |
線程間的通信
互斥鎖
在同一個進程的多線程中 , 其中的變量對于所有線程來說都是共享的 , 因此 , 如果多個線程之間同時修改一個變量 , 那就亂套了 , 共享的數據就會有很大的風險 , 所以我們需要互斥鎖 , 來鎖住數據 , 防止篡改。
來看一個錯誤的示范:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
a = 0 def incr(n): global a for i in range (n): a + = 1 # 這兩個方法同時聲明了變量a,并對其進行修改 def decr(n): global a for i in range (n): a - = 1 t_incr = threading.thread(target = incr, args = ( 1000000 ,)) t_decr = threading.thread(target = decr, args = ( 1000000 ,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print (a) # 期望結果應該是0, 但是因為這里沒有設置互斥鎖, 所以兩個方法是同時對同一個變量進行修改, 得到的的結果值是隨機的 |
下面我們改一下上面的代碼 , 兩個方法加上互斥鎖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
a = 0 lock = threading.lock() # 實例化互斥鎖對象, 方便之后的調用 def incr(n): global a for i in range (n): lock.acquire() # 上鎖的方法 a + = 1 lock.release() # 解鎖的方法 # 要注意的是上鎖的位置是, 出現修改操作的代碼 def decr(n): global a for i in range (n): with lock: # 也可以直接使用with, 自動解鎖 a - = 1 t_incr = threading.thread(target = incr, args = ( 1000000 ,)) t_decr = threading.thread(target = decr, args = ( 1000000 ,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print (a) # result: 0 |
在容易出現搶奪資源的地方進行上鎖 , 實現同一時間內 , 只有一個線程可以對對象進行操作
隊列queue
常用方法
關鍵字 | 解釋 |
---|---|
put(item) | 入隊 , 將item放入隊列中 , 在隊列為滿時插入值會發生阻塞(1) |
get() | 出隊 , 從隊列中移除并返回一個數據 , 在隊列為空時獲取值會發生阻塞 |
task_done() | 任務結束 , 意味著之前入隊的一個任務已經完成。由隊列的消費者線程調用 |
join() | 等待完成 , 阻塞調用線程,直到隊列中的所有任務被處理掉。 |
empty() | 如果隊列為空,返回true,反之返回false |
full() | 如果隊列為滿,返回true,反之返回false |
qsize() | 隊列長度 , 返回當前隊列的數據量 |
(1): 阻塞: 程序停在阻塞的位置 , 無法繼續執行
導入和實例化
1
2
|
import queue q = queue.queue( 4 ) # 實例化隊列對象, 并設置最大數據量 |
put() 和 get()
1
2
3
4
5
|
q.put( 'a' ) q.put( 'b' ) print (q.get()) # : a print (q.get()) # : b q.task_done() # get后必須要加task_done,確認get操作是否完成 |
1
2
3
4
|
q.put( 1 ) # 當前隊列已滿,再次put就會阻塞 print (q.full()) # 由于已經阻塞, 所以這段不會被執行 # put會在隊列慢了點時候,在插入值會發生阻塞 # get會在隊列里沒有值的時候,會發生阻塞 |
empty()
1
2
3
|
print (q.empty()) # 判斷隊列是否為空: true q.put( 'test' ) print (q.empty()) # : false |
qsize()
1
|
print (q.qsize()) # 當前隊列里有多少人: 1 |
full()
1
2
3
4
|
q.put( 1 ) q.put( 1 ) q.put( 1 ) print (q.full()) # : true |
join()
1
2
3
|
print ( 'testetsetset' ) q.join() # join會在隊列非空時發生阻塞 print ( 'done' ) # 由于已經阻塞, 所以這段不會被執行 |
池的概念
線程池中實現準備好了一些可以重復使用的線程 , 等待接受任務并執行
主線程提交任務給 線程池 , 線程池中的每個線程會一次一個的接收任務并執行 , 直到主線程執行結束
主線程: 相當于生產者,只管向線程池提交任務。
并不關心線程池是如何執行任務的。
因此,并不關心是哪一個線程執行的這個任務。
線程池: 相當于消費者,負責接收任務,
并將任務分配到一個空閑的線程中去執行。
自定義線程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import queue import threading import time class threadpool: # 自定義線程池 def __init__( self , n): # 主線程做 self .queue_obj = queue.queue() for i in range (n): threading.thread(target = self .worker, daemon = true).start() # 給子線程worker設置為守護模式 def worker( self ): # 子線程做,由于debug調試的只是主線程的代碼,所以在調試時看不到子線程執行的代碼 """線程對象,寫while true 是為了能夠一直執行任務。""" while true: # 讓線程執行完一個任務之后不會死掉,主線程結束時,守護模式會讓worker里的死循環停止 func = self .queue_obj.get() # get已經入隊的任務, 這里會接收到主線程分配的func # 由于設置了守護模式,當隊列為空時,不會一直阻塞在get這里 # 有了守護模式,worker會在主線程執行完畢后死掉 func() # 將隊列里的任務拿出來調用 """ 這里func與task_done的順序非常重要,如果func放在task_done后面的話會出現只執行兩次就結束。 """ self .queue_obj.task_done() # task_done 會刷新計數器 # 線程池里有一個類似計數器的機制,用來記錄put的次數(+1),每一次task_done都會回撥一次記錄的次數(-1) # 當回撥完計數器為0之后,就會執行join def apply_async( self , func): # 主線程做 """向隊列中傳入需要執行的函數對象""" self .queue_obj.put(func) # 將接收到的func入隊 def join( self ): # 主線程做 """等待隊列中的內容被取完""" self .queue_obj.join() # 隊列里不為空就阻塞,為空就不阻塞 |
簡單使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def task1(): # 子線程做 time.sleep( 2 ) print ( 'task1 over' ) def task2(): # 子線程做 time.sleep( 3 ) print ( 'task2 over' ) p = threadpool( 2 ) # 如果在start開啟線程之后沒有傳入任務對象,worker里的get會直接阻塞 p.apply_async(task1) p.apply_async(task2) print ( 'start' ) p.join() print ( 'done' ) # result: start task1 over task2 over done |
如果get發生阻塞意味著隊列為空,意味著join不阻塞,意味著print('done')會執行,
意味著主線程沒有任務在做,意味著主線程結束,意味著不等待設置了守護的線程執行任務,
意味著子線程會隨著主線程的死亡而死亡,這就是為什么會設置守護模式。
如果沒有設置守護模式意味著get發生阻塞,意味著子線程任務執行不完,意味著主線程一直要等子線程完成,
意味著程序一直都結束不了,意味著程序有問題
python內置線程池
原理
- 創建線程池
- 將任務扔進去
- 關閉線程池
- 等待線程任務執行完畢
'''手動實現線程池:
主要是配合隊列來進行實現,我們定義好一個隊列對象,然后將我們的任務對象put到我們的隊列對象中,
然后使用多線程,讓我們的線程去get隊列種的對象,然后各自去執行自己get到的任務,
這樣的話其實也就實現了線程池
'''
使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from multiprocessing.pool import threadpool import time pool = threadpool( 2 ) # 直接使用內置線程池, 設置最大線程數 def task1(): time.sleep( 2 ) print ( 'task1 over' ) def task2( * args, * * kwargs): time.sleep( 3 ) print ( 'task2 over' , args, kwargs) pool.apply_async(task1) pool.apply_async(task2, args = ( 1 , 2 ), kwds = { 'a' : 1 , 'b' : 2 }) print ( 'task submitted' ) pool.close() # 要點: close必須要在join之前, 不允許再提交任務了 pool.join() print ( 'mission complete' ) # result: task submitted task1 over task2 over ( 1 , 2 ) { 'a' : 1 , 'b' : 2 } mission complete |
其他操作
操作一: close - 關閉提交通道,不允許再提交任務
操作二: terminate - 中止進程池,中止所有任務
以上所述是小編給大家介紹的python線程與線程池詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://blog.csdn.net/makesomethings/article/details/89819854