一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - 淺談python 線程池threadpool之實現

淺談python 線程池threadpool之實現

2020-12-18 00:59菜鳥磊子 Python

這篇文章主要介紹了淺談python 線程池threadpool之實現,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

首先介紹一下自己使用到的名詞:

工作線程(worker):創建線程池時,按照指定的線程數量,創建工作線程,等待從任務隊列中get任務;

任務(requests):即工作線程處理的任務,任務可能成千上萬個,但是工作線程只有少數。任務通過          makeRequests來創建

任務隊列(request_queue):存放任務的隊列,使用了queue實現的。工作線程從任務隊列中get任務進行處理;

任務處理函數(callable):工作線程get到任務后,通過調用任務的任務處理函數即(request.callable_)具體     的     處理任務,并返回處理結果;

任務結果隊列(result_queue):任務處理完成后,將返回的處理結果,放入到任務結果隊列中(包括異常);

任務異常處理函數或回調(exc_callback):從任務結果隊列中get結果,如果設置了異常,則需要調用異常回調處理異常;

任務結果回調(callback):從任務結果隊列中get結果,對result進行進一步處理;

上一節介紹了線程池threadpool的安裝和使用,本節將主要介紹線程池工作的主要流程:

(1)線程池的創建
(2)工作線程的啟動
(3)任務的創建
(4)任務的推送到線程池
(5)線程處理任務
(6)任務結束處理
(7)工作線程的退出

下面是threadpool的定義:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ThreadPool:
  """A thread pool, distributing work requests and collecting results.
 
  See the module docstring for more information.
 
  """
  def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
    pass
  def createWorkers(self, num_workers, poll_timeout=5):
    pass
  def dismissWorkers(self, num_workers, do_join=False):
    pass
  def joinAllDismissedWorkers(self):
    pass
  def putRequest(self, request, block=True, timeout=None):
    pass
  def poll(self, block=False):
    pass
  def wait(self):
    pass

1、線程池的創建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
task_pool=threadpool.ThreadPool(num_works)
  def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
    """Set up the thread pool and start num_workers worker threads.
 
    ``num_workers`` is the number of worker threads to start initially.
 
    If ``q_size > 0`` the size of the work *request queue* is limited and
    the thread pool blocks when the queue is full and it tries to put
    more work requests in it (see ``putRequest`` method), unless you also
    use a positive ``timeout`` value for ``putRequest``.
 
    If ``resq_size > 0`` the size of the *results queue* is limited and the
    worker threads will block when the queue is full and they try to put
    new results in it.
 
    .. warning:
      If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
      the possibilty of a deadlock, when the results queue is not pulled
      regularly and too many jobs are put in the work requests queue.
      To prevent this, always set ``timeout > 0`` when calling
      ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
 
    """
    self._requests_queue = Queue.Queue(q_size)#任務隊列,通過threadpool.makeReuests(args)創建的任務都會放到此隊列中
    self._results_queue = Queue.Queue(resq_size)#字典,任務對應的任務執行結果</span>
    self.workers = []#工作線程list,通過self.createWorkers()函數內創建的工作線程會放到此工作線程list中
    self.dismissedWorkers = []#被設置線程事件并且沒有被join的工作線程
    self.workRequests = {}#字典,記錄任務被分配到哪個工作線程中</span>
    self.createWorkers(num_workers, poll_timeout)

其中,初始化參數為:

num_works:線程池中線程個數

q_size :任務隊列的長度限制,如果限制了隊列的長度,那么當調用putRequest()添加任務時,到達限制長度后,那么putRequest將會不斷嘗試添加任務,除非在putRequest()設置了超時或者阻塞; 

esq_size: 任務結果隊列的長度;

pool_timeout:工作線程如果從request隊列中,讀取不到request,則會阻塞pool_timeout,如果仍沒request則直接返回;

其中,成員變量:

self._requests_queue:  任務隊列,通過threadpool.makeReuests(args)創建的任務都會放到此隊列中;
self._results_queue:  字典,任務對應的任務執行 
self.workers:  工作線程list,通過self.createWorkers()函數內創建的工作線程會放到此工作線程list中;
self.dismisssedWorkers:  被設置線程事件,并且沒有被join的工作線程
self.workRequests:  字典,記錄推送到線程池的任務,結構為requestID:request。其中requestID是任務的唯一標識,會在后面作介紹。

2、工作線程的啟動(self.createWorks(args))

函數定義:

?
1
2
3
4
5
6
7
8
9
10
11
def createWorkers(self, num_workers, poll_timeout=5):
   """Add num_workers worker threads to the pool.
 
   ``poll_timout`` sets the interval in seconds (int or float) for how
   ofte threads should check whether they are dismissed, while waiting for
   requests.
 
   """
   for i in range(num_workers):
     self.workers.append(WorkerThread(self._requests_queue,
       self._results_queue, poll_timeout=poll_timeout))

其中WorkerThread()繼承自thread,即python內置的線程類,將創建的WorkerThread對象放入到self.workers隊列中。下面看一下WorkerThread類的定義:

從self.__init__(args)可看出:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class WorkerThread(threading.Thread):
  """Background thread connected to the requests/results queues.
 
  A worker thread sits in the background and picks up work requests from
  one queue and puts the results in another until it is dismissed.
 
  """
 
  def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
    """Set up thread in daemonic mode and start it immediatedly.
 
    ``requests_queue`` and ``results_queue`` are instances of
    ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a
    new worker thread.
 
    """
    threading.Thread.__init__(self, **kwds)
    self.setDaemon(1)#
    self._requests_queue = requests_queue#任務隊列
    self._results_queue = results_queue#任務結果隊列
    self._poll_timeout = poll_timeout#run函數中從任務隊列中get任務時的超時時間,如果超時則繼續while(true);
    self._dismissed = threading.Event()#線程事件,如果set線程事件則run會執行break,直接退出工作線程;
    self.start()
 
  def run(self):
    """Repeatedly process the job queue until told to exit."""
    while True:
      if self._dismissed.isSet():#如果設置了self._dismissed則退出工作線程
 
        # we are dismissed, break out of loop
        break
      # get next work request. If we don't get a new request from the
      # queue after self._poll_timout seconds, we jump to the start of
      # the while loop again, to give the thread a chance to exit.
      try:
        request = self._requests_queue.get(True, self._poll_timeout)
      except Queue.Empty:#嘗從任務 隊列self._requests_queue 中get任務,如果隊列為空,則continue
        continue
      else:
        if self._dismissed.isSet():#檢測此工作線程事件是否被set,如果被設置,意味著要結束此工作線程,那么就需要將取到的任務返回到任務隊列中,并且退出線程
          # we are dismissed, put back request in queue and exit loop
          self._requests_queue.put(request)
          break
        try:<span style="color:#如果線程事件沒有被設置,那么執行任務處理函數request.callable,并將返回的result,壓入到任務結果隊列中
          result = request.callable(*request.args, **request.kwds)
          self._results_queue.put((request, result))
        except:
          request.exception = True
          self._results_queue.put((request, sys.exc_info()))#如果任務處理函數出現異常,則將異常壓入到隊列中
 
  def dismiss(self):</span>
    """Sets a flag to tell the thread to exit when done with current job.
    """
    self._dismissed.set()

初始化中變量:

self._request_queue:任務隊列;
self._resutls_queuqe,:任務結果隊列 ;
self._pool_timeout:run函數中從任務隊列中get任務時的超時時間,如果超時則繼續while(true);
self._dismissed:線程事件,如果set線程事件則run會執行break,直接退出工作線程;

最后調用self.start()啟動線程,run函數定義見上面:

從上面run函數while執行步驟如下:

(1)如果設置了self._dismissed則退出工作線程,否則執行第2步
(2)嘗從任務 隊列self._requests_queue 中get任務,如果隊列為空,則continue 執行下一次while循環,否則執行第3步
(3)檢測此工作線程事件是否被set,如果被設置,意味著要結束此工作線程,那么就需要將取到的任務返回到任務隊列中,并且退出線程。如果線程事件沒有被設置,那么執行任務處理函數request.callable,并將返回的result,壓入到任務結果隊列中,如果任務處理函數出現異常,則將異常壓入到隊列中。最后跳轉第4步
(4)繼續循環,返回1

到此工作線程創建完畢,根據設置的線程池線程數量,創建工作線程,工作線程從任務隊列中get任務,進行任務處理,并將任務處理結果壓入到任務結果隊列中。

3、任務的創建(makeRequests)

任務的創建函數為threadpool.makeRequests(callable_,args_list,callback=None):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# utility functions
def makeRequests(callable_, args_list, callback=None,
    exc_callback=_handle_thread_exception):
  """Create several work requests for same callable with different arguments.
 
  Convenience function for creating several work requests for the same
  callable where each invocation of the callable receives different values
  for its arguments.
 
  ``args_list`` contains the parameters for each invocation of callable.
  Each item in ``args_list`` should be either a 2-item tuple of the list of
  positional arguments and a dictionary of keyword arguments or a single,
  non-tuple argument.
 
  See docstring for ``WorkRequest`` for info on ``callback`` and
  ``exc_callback``.
 
  """
  requests = []
  for item in args_list:
    if isinstance(item, tuple):
      requests.append(
        WorkRequest(callable_, item[0], item[1], callback=callback,
          exc_callback=exc_callback)
      )
    else:
      requests.append(
        WorkRequest(callable_, [item], None, callback=callback,
          exc_callback=exc_callback)
      )
  return requests

其中創建任務的函數參數具體意義為下:

callable_:注冊的任務處理函數,當任務被放到任務隊列后,工作線程中獲取到該任務的線程,會執行此 callable_

args_list:首先args_list是列表,列表元素類型為元組,元組中有兩個元素item[0],item[1],item[0]為位置參 數,item[1]為字典類型關鍵字參數。列表中元組的個數,代表啟動的任務個數,在使用的時候一般都為單個元組,即一個makerequest()創建一個任務。

callback:回調函數,在poll函數中調用(后面講解此函數),callable_調用結束后,會就任務結果放入到任務結果隊列中(self._resutls_queue),在poll函數中,當從self._resutls_queue隊列中get某個結果后,會執行此callback(request,result),其中result是request任務返回的結果。

 exc_callback:異常回調函數,在poll函數中,如果某個request對應有執行異常,那么會調用此異常回調。

創建完成任務后,返回創建的任務。

外層記錄此任務,放入到任務列表中。

上面是創建任務的函數,下面講解任務對象的結構:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class WorkRequest:
  """A request to execute a callable for putting in the request queue later.
 
  See the module function ``makeRequests`` for the common case
  where you want to build several ``WorkRequest`` objects for the same
  callable but with different arguments for each call.
 
  """
 
  def __init__(self, callable_, args=None, kwds=None, requestID=None,
      callback=None, exc_callback=_handle_thread_exception):
    """Create a work request for a callable and attach callbacks.
 
    A work request consists of the a callable to be executed by a
    worker thread, a list of positional arguments, a dictionary
    of keyword arguments.
 
    A ``callback`` function can be specified, that is called when the
    results of the request are picked up from the result queue. It must
    accept two anonymous arguments, the ``WorkRequest`` object and the
    results of the callable, in that order. If you want to pass additional
    information to the callback, just stick it on the request object.
 
    You can also give custom callback for when an exception occurs with
    the ``exc_callback`` keyword parameter. It should also accept two
    anonymous arguments, the ``WorkRequest`` and a tuple with the exception
    details as returned by ``sys.exc_info()``. The default implementation
    of this callback just prints the exception info via
    ``traceback.print_exception``. If you want no exception handler
    callback, just pass in ``None``.
 
    ``requestID``, if given, must be hashable since it is used by
    ``ThreadPool`` object to store the results of that work request in a
    dictionary. It defaults to the return value of ``id(self)``.
 
    """
    if requestID is None:
      self.requestID = id(self)
    else:
      try:
        self.requestID = hash(requestID)
      except TypeError:
        raise TypeError("requestID must be hashable.")
    self.exception = False
    self.callback = callback
    self.exc_callback = exc_callback
    self.callable = callable_
    self.args = args or []
    self.kwds = kwds or {}
 
  def __str__(self):
    return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
      (self.requestID, self.args, self.kwds, self.exception)

上面self.callback 以及self.exc_callback,和self.callable_ ,args,dwds都已經講解,就不在啰嗦了。
其中有一個任務的全局唯一標識,即self.requestID,通過獲取自身內存首地址作為自己的唯一標識id(self)
self.exception 初始化為False,如果執行self.callable()過程中出現異常,那么此變量會標設置為True。

至此,任務創建完畢,調用makeRequests()的上層記錄有任務列表request_list.

4、任務的推送到線程池(putRequest)

上面小節中介紹了任務的創建,任務的個數可以成千上百,但是處理任務的線程數量只有我們在創建線程池的時候制定的線程數量來處理,指定的線程數量往往比任務的數量小得多,因此,每個線程必須處理多個任務。

本節介紹如何將創建的任務推送的線程池中,以讓線程池由阻塞狀態,獲取任務,然后去處理任務。

任務的推送使用ThreadPool線程池類中的putRequest(self,request,block,timeout)來創建:

?
1
2
3
4
5
6
7
def putRequest(self, request, block=True, timeout=None):
  """Put work request into work queue and save its id for later."""
  assert isinstance(request, WorkRequest)
  # don't reuse old work requests
  assert not getattr(request, 'exception', None)
  self._requests_queue.put(request, block, timeout)
  self.workRequests[request.requestID] = request

函數的主要作用就是將request任務,也就是上一小節中創建的任務,put到線程池的任務隊列中(self._request_queue)。然后記錄已經推送到線程池的任務,通過線程池的self.workReuests 字典來存儲,結構為request.requestID:request。

至此,任務創建完成,并且已經將任務推送到線程池中。

5、線程處理任務

通過上一小節,任務已經推送到了線程中。在任務沒有被推送到線程池中時,線程池中的線程都處在處在阻塞狀態中,即在線程的self.run()函數中,一直處于一下狀態:

?
1
2
3
4
try:
  request = self._requests_queue.get(True, self._poll_timeout)
except Queue.Empty:#嘗從任務 隊列self._requests_queue 中get任務,如果隊列為空,則continue
  continue

現在任務已經推送到線程池中,那么get任務將會正常返回,會執行下面的步驟:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def run(self):
    """Repeatedly process the job queue until told to exit."""
    while True:
      if self._dismissed.isSet():#如果設置了self._dismissed則退出工作線程
 
        # we are dismissed, break out of loop
        break
      # get next work request. If we don't get a new request from the
      # queue after self._poll_timout seconds, we jump to the start of
      # the while loop again, to give the thread a chance to exit.
      try:
        request = self._requests_queue.get(True, self._poll_timeout)
      except Queue.Empty:#嘗從任務 隊列self._requests_queue 中get任務,如果隊列為空,則continue
        continue
      else:
        if self._dismissed.isSet():#檢測此工作線程事件是否被set,如果被設置,意味著要結束此工作線程,那么就需要將取到的任務返回到任務隊列中,并且退出線程
          # we are dismissed, put back request in queue and exit loop
          self._requests_queue.put(request)
          break
        try:#如果線程事件沒有被設置,那么執行任務處理函數request.callable,并將返回的result,壓入到任務結果隊列中
          result = request.callable(*request.args, **request.kwds)
          self._results_queue.put((request, result))
        except:
          request.exception = True
          self._results_queue.put((request, sys.exc_info()))#如果任務處理函數出現異常,則將異常壓入到隊列中

獲取任務--->調用任務的處理函數callable()處理任務--->將任務request以及任務返回的結果壓入到self.results_queue隊列中------>如果任務處理函數異常,那么將任務異常標識設置為True,并將任務request以及任務異常壓入到self.results_queue隊列中---->再次返回獲取任務

如果,在while循環過程中,外部設置了線程事件,即self._dismissed.isSet為True,那么意味著此線程將會結束處理任務,那么會將get到的任務返回的任務隊列中,并且退出線程。

6、任務結束處理

上面小節中,介紹了線程池不斷的get任務,并且不斷的處理任務。那么每個任務結束之后我們該怎么處理呢,線程池提供了wait()以及poll()函數。

當我們把任務提交個線程池之后,我們會調用wait()來等待任務處理結束,結束后wait()將會返回,返回后我們可以進行下一步操作,例如重新創建任務,將任務繼續推送到線程池中,或者結束線程池。結束線程池會在下一小節介紹,這一小節主要介紹wait()和poll()操作。

先來看看wait()操作:

?
1
2
3
4
5
6
7
def wait(self):
  """Wait for results, blocking until all have arrived."""
  while 1:
    try:
      self.poll(True)
    except NoResultsPending:
      break

等待任務處理結束,在所有任務處理完成之前一直處于block階段,如果self.poll()返回異常NoResultsPending異常,然后wait返回,任務處理結束。

下面看看poll函數:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def poll(self, block=False):
  """Process any new results in the queue."""
  while True:
    # still results pending?
    if not self.workRequests:
      raise NoResultsPending
    # are there still workers to process remaining requests?
    elif block and not self.workers:
      raise NoWorkersAvailable
    try:
      # get back next results
      request, result = self._results_queue.get(block=block)
      # has an exception occured?
      if request.exception and request.exc_callback:
        request.exc_callback(request, result)
      # hand results to callback, if any
      if request.callback and not \
          (request.exception and request.exc_callback):
        request.callback(request, result)
      del self.workRequests[request.requestID]
    except Queue.Empty:
      break

(1)首先,檢測任務字典({request.requestID:request})是否為空,如果為空則拋出異常NoResultPending結束,否則到第2步;

(2)檢測工作線程是否為空(如果某個線程的線程事件被設置,那么工作線程退出,并從self.workers中pop出),如果為空則拋出NoWorkerAvailable異常結束,否則進入第3步;

(3)從任務結果隊列中get任務結果,如果拋出隊列為空,那么break,返回,否則進入第4步;

(4)如果任務處理過程中出現異常,即設置了request.exception,并且設置了異常處理回調即request.exc_callback則執行異常回調,再回調中處理異常,返回后將任務從任務列表self.workRequests中移除,繼續get任務,返回第1步。否則進入第5步;

(5)如果設置了任務結果回調即request.callback不為空,則執行任務結果回調即request.callbacl(request,result),并
將任務從任務列表self.workRequests中移除,繼續get任務,返回第1步。

(6)重復進行上面的步驟直到拋出異常,或者任務隊列為空,則poll返會;

至此拋出NoResultPending wait操作接受此異常后,至此wait()返回。

7、工作線程的退出

threadpool提供的工作線程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def dismissWorkers(self, num_workers, do_join=False):
  """Tell num_workers worker threads to quit after their current task."""
  dismiss_list = []
  for i in range(min(num_workers, len(self.workers))):
    worker = self.workers.pop()
    worker.dismiss()
    dismiss_list.append(worker)
 
  if do_join:
    for worker in dismiss_list:
      worker.join()
  else:
    self.dismissedWorkers.extend(dismiss_list)
 
def joinAllDismissedWorkers(self):
  """Perform Thread.join() on all worker threads that have been dismissed.
  """
  for worker in self.dismissedWorkers:
    worker.join()
  self.dismissedWorkers = []

從dismissWorkers可看出,主要工作是從self.workers 工作線程中pop出指定的線程數量,并且設置此線程的線程事件,設置線程事件后,此線程self.run()函數,則會檢測到此設置,并結束線程。

如果設置了在do_join,即設置了在此函數中join退出的線程,那么對退出的線程執行join操作。否則將pop出的線程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去處理join線程。

8、總結

到此為止,threadpool線程池中所有的操作介紹完畢,其實現也做了具體的介紹。從上面可看出,線程池并沒有那么復雜,只有幾個簡單的操作,主要是了解整個處理流程即可。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://blog.csdn.net/hehe123456zxc/article/details/52258431

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日韩aaa| 91最新高端约会系列178 | 久久天天躁狠狠躁夜夜躁 | 成人国产在线观看 | 啊用力好大粗黑人小说 | 国产成人综合亚洲一区 | www.四虎网站| 办公室恋情在线观看 | 99国内精品久久久久久久黑人 | 国产高清在线精品一区二区 | 免费午夜影院 | 1024免费福利永久观看网站 | 亚洲精品电影天堂网 | 91亚洲精品第一综合不卡播放 | 91香蕉嫩草 | 波多野结衣伦理在线观看 | 国产精品日韩欧美一区二区 | 丝袜足控免费网站xx动漫漫画 | 99ri精品| 国产yw193.㎝m在线观看 | 国产麻豆精品视频 | 操碰免费视频 | 娇妻与公陈峰姚瑶最新版 | 女仆掀起蕾丝裙被打屁股作文 | 国产成人啪精品视频站午夜 | 亚洲福利一区二区 | 精品久久久久久国产 | 精品欧美一区二区在线观看欧美熟 | 乌克兰一级毛片9一18 | freesex 18 19处xx| 狠狠撸在线播放 | 亚洲娇小性hd | 日本一二线不卡在线观看 | 精品国产午夜久久久久九九 | 波多野结在线观看 | 美女天天操 | 97超pen个人视频公开视频视 | 禁忌h1v1怀孕 | 91高清在线视频 | 国产高清在线精品一区二区三区 | 美女视频ww8888网网 |