一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python實現進程同步和通信的方法

Python實現進程同步和通信的方法

2020-12-30 00:44Kalankalan Python

本篇文章主要介紹了Python實現進程同步和通信的方法,詳細的介紹了Process、Queue、Pipe、Lock等組件,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

Python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

引例:

如之前創建多進程的例子

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
import os,time
 
def run_proc(name):    ##定義一個函數用于進程調用
  for i in range(5): 
    time.sleep(0.2#休眠0.2秒
    print 'Run child process %s (%s)' % (name, os.getpid())
#執行一次該函數共需1秒的時間
 
if __name__ =='__main__': #執行主進程
  print 'Run the main process (%s).' % (os.getpid())
  mainStart = time.time() #記錄主進程開始的時間
  p = Pool(8)      #開辟進程池
  for i in range(16):                 #開辟14個進程
    p.apply_async(run_proc,args=('Process'+str(i),))#每個進程都調用run_proc函數,
                            #args表示給該函數傳遞的參數。
 
  print 'Waiting for all subprocesses done ...'
  p.close() #關閉進程池
  p.join() #等待開辟的所有進程執行完后,主進程才繼續往下執行
  print 'All subprocesses done'
  mainEnd = time.time() #記錄主進程結束時間
  print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主進程執行時間

運行結果:

?
1
2
3
4
5
6
7
Run the main process (36652).
Waiting for all subprocesses done …
Run child process Process0 (36708)Run child process Process1 (36748)
 
Run child process Process3 (36736)
Run child process Process2 (36716)
Run child process Process4 (36768)

如第3行的輸出,偶爾會出現這樣不如意的輸入格式,為什么呢?

原因是多個進程爭用打印輸出資源的結果。前一個進程為來得急輸出換行符,該資源就切換給了另一個進程使用,致使兩個進程輸出在同一行上,而前一個進程的換行符在下一次獲得資源時才打印輸出。

Lock

為了避免這種情況,需在進程進入臨界區(使進程進入臨界資源的那段代碼,稱為臨界區)時加鎖。
可以向如下這樣添加鎖后看看執行效果:

?
1
2
3
4
5
6
7
8
9
10
# -*- coding:utf-8 -*-
 
lock = Lock()  #申明一個全局的lock對象
def run_proc(name):
  global lock   #引用全局鎖
  for i in range(5):
    time.sleep(0.2)
    lock.acquire() #申請鎖
    print 'Run child process %s (%s)' % (name, os.getpid())
    lock.release()  #釋放鎖

Semaphore

Semaphore為信號量機制。當共享的資源擁有多個時,可用Semaphore來實現進程同步。其用法和Lock差不多,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減少1,當資源個數已為0時,就進入阻塞;每執行一次s.release(),占用的資源被釋放,該資源的可用個數增加1。

多進程的通信(信息交互)

不同進程之間進行數據交互,可能不少剛開始接觸多進程的同學會想到共享全局變量的方式,這樣通過向全局變量寫入和讀取信息便能實現信息交互。但是很遺憾,并不能這樣實現。

下面通過例子,加深對那篇文章的理解:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time
L1 = [1, 2, 3]
def add(a, b):
  global L1
  L1 += range(a, b)
  print L1
if __name__ == '__main__':
  p1 = Process(target=add, args=(20, 30))
  p2 = Process(target=add, args=(30, 40))
  p1.start()
  p2.start()
  p1.join()
  p2.join()
  print L1

輸出結果:

[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]

該程序的原本目的是想將兩個子進程生成的列表加到全局變量L1中,但用該方法并不能達到想要的效果。既然不能通過全局變量來實現不同進程間的信息交互,那有什么辦法呢。

mutiprocessing為我們可以通過Queue和Pipe來實現進程間的通信。

Queue

按上面的例子通過Queue來實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding:utf-8 -*-
from multiprocessing import Process, Queue, Lock
L = [1, 2, 3]
def add(q, lock, a, b):
  lock.acquire() # 加鎖避免寫入時出現不可預知的錯誤
  L1 = range(a, b)
  lock.release()
  q.put(L1)
  print L1
if __name__ == '__main__':
  q = Queue()
  lock = Lock()
  p1 = Process(target=add, args=(q, lock, 20, 30))
  p2 = Process(target=add, args=(q, lock, 30, 40))
  p1.start()
  p2.start()
  p1.join()
  p2.join()
  L += q.get() + q.get()
  print L

 執行結果:

[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]

下面介紹Queue的常用方法:

  1. 定義時可用q = Queue(maxsize = 10)來指定隊列的長度,默認時或maxsize值小于1時隊列為無限長度。
  2. q.put(item)方法向隊列放入元素,其還有一個可選參數block,默認為True,此時若隊列已滿則會阻塞等待,直到有空閑位置。而當black值為 False,在該情況下就會拋出Full異 常
  3. Queue是不可迭代的對象,不能通過for循環取值,取值時每次調用q.get()方法。同樣也有可選參數block,默認為True,若此時隊列為空則會阻塞等待。而black值為False時,在該情況下就會拋出Empty異常
  4. Queue.qsize() 返回隊列的大小
  5. Queue.empty() 如果隊列為空,返回True,反之False
  6. Queue.full() 如果隊列滿了,返回True,反之False
  7. Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間Queue.get_nowait() 相當Queue.get(False) 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
  8. Queue.put_nowait(item) 相當Queue.put(item, False)

Pipe

Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。雙向Pipe允許兩端的進即可以發送又可以接受;單向的Pipe只允許前面的端口用于接收,后面的端口用于發送。

下面給出例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
  s = 'Hello,This is proc1'
  pipe.send(s)
def proc2(pipe):
  while True:
    print "proc2 recieve:", pipe.recv()
if __name__ == "__main__":
  pipe = Pipe()
  p1 = Process(target=proc1, args=(pipe[0],))
  p2 = Process(target=proc2, args=(pipe[1],))
  p1.start()
  p2.start()
  p1.join()
  p2.join(2#限制執行時間最多為2秒
  print '\nend all processes.'

執行結果如下:

proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.

當第二行輸出后,因為管道中沒有數據傳來,Proc2處于阻塞狀態,2秒后被強制結束。

以下是單向管道的例子,注意pipe[0],pipe[1]的分配。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
  s = 'Hello,This is proc1'
  pipe.send(s)
def proc2(pipe):
  while True:
    print "proc2 recieve:", pipe.recv()
if __name__ == "__main__":
  pipe = Pipe(duplex=False)
  p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]為發送端
  p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]為接收端
  p1.start()
  p2.start()
  p1.join()
  p2.join(2) # 限制執行時間最多為2秒
  print '\nend all processes.'

執行結果同上。

強大的Manage

Queue和Pipe實現的數據共享方式只支持兩種結構 Value 和 Array。Python中提供了強大的Manage專門用來做數據共享,其支持的類型非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等

其用法如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Manager
def func(dt, lt):
  for i in range(10):
    key = 'arg' + str(i)
    dt[key] = i * i
 
  lt += range(11, 16)
 
if __name__ == "__main__":
  manager = Manager()
  dt = manager.dict()
  lt = manager.list()
 
  p = Process(target=func, args=(dt, lt))
  p.start()
  p.join()
  print dt, '\n', lt

執行結果:

{‘arg8': 64, ‘arg9': 81, ‘arg0': 0, ‘arg1': 1, ‘arg2': 4, ‘arg3': 9, ‘arg4': 16, ‘arg5': 25, ‘arg6': 36, ‘arg7': 49}
[11, 12, 13, 14, 15]

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://blog.csdn.net/u014556057/article/details/66974452

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美乱子伦xxxx12在线 | 好大好硬视频 | 亚洲成年www | 亚洲精品成人A8198A片漫画 | xxx95日本老师xxx学生 | 欧美三级不卡视频 | 好涨好大我快受不了了视频网 | 我与白丝同桌的故事h文 | 久久这里只有精品国产精品99 | 日本中文字幕不卡在线一区二区 | 国产成人精品1024在线 | 国产成人啪精品视频站午夜 | 国产精品女主播自在线拍 | 亚洲国产在线视频中文字 | 性xxxxⅹhd成人 | 国产悠悠视频在线播放 | 欧美日韩高清不卡一区二区三区 | 日本小网站 | 性猛交娇小69hd | 99成人免费视频 | 美女被无套进入 | 88av免费观看| 亚洲热在线视频 | 美式禁忌在线 | 免费α片| 暖暖高清日本在线 | 亚洲乱码一二三四区国产 | 蜜桃影像传媒破解版 | 国产一级一级一级成人毛片 | 亚洲精品青青草原avav久久qv | 国产盗摄美女嘘嘘视频 | 我要色色网 | 日本肉体xxxx69xxxx| 亚洲高清一区二区三区四区 | 欧美va在线 | 亚洲天堂精品视频 | 国产79av| 国产成人免费观看在线视频 | 精品成人一区二区 | 免费99精品国产自在现线 | 四虎影院新地址 |