一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術(shù)及教程分享平臺(tái)!
分類導(dǎo)航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務(wù)器之家 - 腳本之家 - Python - 詳解Python的Twisted框架中reactor事件管理器的用法

詳解Python的Twisted框架中reactor事件管理器的用法

2020-08-24 09:38Syfun Python

這篇文章主要介紹了詳解Python的Twisted框架中reactor事件管理器的用法,Twisted是一款高人氣的異步Python開發(fā)框架,需要的朋友可以參考下

鋪墊
在大量的實(shí)踐中,似乎我們總是通過類似的方式來使用異步編程:

  • 監(jiān)聽事件
  • 事件發(fā)生執(zhí)行對(duì)應(yīng)的回調(diào)函數(shù)
  • 回調(diào)完成(可能產(chǎn)生新的事件添加進(jìn)監(jiān)聽隊(duì)列)
  • 回到1,監(jiān)聽事件

因此我們將這樣的異步模式稱為Reactor模式,例如在iOS開發(fā)中的Run Loop概念,實(shí)際上非常類似于Reactor loop,主線程的Run Loop監(jiān)聽屏幕UI事件,一旦發(fā)生UI事件則執(zhí)行對(duì)應(yīng)的事件處理代碼,還可以通過GCD等方式產(chǎn)生事件至主線程執(zhí)行。

詳解Python的Twisted框架中reactor事件管理器的用法

上圖是boost對(duì)Reactor模式的描繪,Twisted的設(shè)計(jì)就是基于這樣的Reactor模式,Twisted程序就是在等待事件、處理事件的過程中不斷循環(huán)。

from twisted.internet import reactor
reactor.run()

reactor是Twisted程序中的單例對(duì)象。

reactor
reactor是事件管理器,用于注冊(cè)、注銷事件,運(yùn)行事件循環(huán),當(dāng)事件發(fā)生時(shí)調(diào)用回調(diào)函數(shù)處理。關(guān)于reactor有下面幾個(gè)結(jié)論:

  • Twisted的reactor只有通過調(diào)用reactor.run()來啟動(dòng)。
  • reactor循環(huán)是在其開始的進(jìn)程中運(yùn)行,也就是運(yùn)行在主進(jìn)程中。
  • 一旦啟動(dòng),就會(huì)一直運(yùn)行下去。reactor就會(huì)在程序的控制下(或者具體在一個(gè)啟動(dòng)它的線程的控制下)。
  • reactor循環(huán)并不會(huì)消耗任何CPU的資源。
  • 并不需要顯式的創(chuàng)建reactor,只需要引入就OK了。

最后一條需要解釋清楚。在Twisted中,reactor是Singleton(也就是單例模式),即在一個(gè)程序中只能有一個(gè)reactor,并且只要你引入它就相應(yīng)地創(chuàng)建一個(gè)。上面引入的方式這是twisted默認(rèn)使用的方法,當(dāng)然了,twisted還有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系統(tǒng)調(diào)用來poll來代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安裝它。下面是安裝pollreactor的方法:

from twisted.internet import pollreactor
pollreactor.install()

如果你沒有安裝其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted會(huì)根據(jù)操作系統(tǒng)安裝默認(rèn)的reactor。正因?yàn)槿绱?,?xí)慣性做法不要在最頂層的模塊內(nèi)引入reactor以避免安裝默認(rèn)reactor,而是在你要使用reactor的區(qū)域內(nèi)安裝。
下面是使用 pollreactor重寫上上面的程序:

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

那么reactor是如何實(shí)現(xiàn)單例的?來看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代碼:

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

注:Python中所有加載到內(nèi)存的模塊都放在sys.modules,它是一個(gè)全局字典。當(dāng)import一個(gè)模塊時(shí)首先會(huì)在這個(gè)列表中查找是否已經(jīng)加載了此模塊,如果加載了則只是將模塊的名字加入到正在調(diào)用import的模塊的命名空間中。如果沒有加載則從sys.path目錄中按照模塊名稱查找模塊文件,找到后將模塊載入內(nèi)存,并加入到sys.modules中,并將名稱導(dǎo)入到當(dāng)前的命名空間中。

假如我們是第一次運(yùn)行from twisted.internet import reactor,因?yàn)閟ys.modules中還沒有twisted.internet.reactor,所以會(huì)運(yùn)行reactory.py中的代碼,安裝默認(rèn)的reactor。之后,如果導(dǎo)入的話,因?yàn)閟ys.modules中已存在該模塊,所以會(huì)直接將sys.modules中的twisted.internet.reactor導(dǎo)入到當(dāng)前命名空間。

default中的install:

# twisted/internet/default.py
def _getInstallFunction(platform):
  """
  Return a function to install the reactor most suited for the given platform.

  @param platform: The platform for which to select a reactor.
  @type platform: L{twisted.python.runtime.Platform}

  @return: A zero-argument callable which will install the selected
    reactor.
  """
  try:
    if platform.isLinux():
      try:
        from twisted.internet.epollreactor import install
      except ImportError:
        from twisted.internet.pollreactor import install
    elif platform.getType() == 'posix' and not platform.isMacOSX():
      from twisted.internet.pollreactor import install
    else:
      from twisted.internet.selectreactor import install
  except ImportError:
    from twisted.internet.selectreactor import install
  return install


install = _getInstallFunction(platform)

很明顯,default中會(huì)根據(jù)平臺(tái)獲取相應(yīng)的install。Linux下會(huì)首先使用epollreactor,如果內(nèi)核還不支持,就只能使用pollreactor。Mac平臺(tái)使用pollreactor,windows使用selectreactor。每種install的實(shí)現(xiàn)差不多,這里我們抽取selectreactor中的install來看看。

# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  # 單例
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)

# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.

  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor鍵,值就是再install中創(chuàng)建的單例reactor。以后要使用reactor,就會(huì)導(dǎo)入這個(gè)單例了。

SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor實(shí)現(xiàn)了IReactorFDSet接口的方法,這里用到了zope.interface,它是python中的接口實(shí)現(xiàn),有興趣的同學(xué)可以去看下。

IReactorFDSet接口主要對(duì)描述符的獲取、添加、刪除等操作的方法。這些方法看名字就能知道意思,所以我就沒有加注釋。

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):

  def addReader(reader):

  def addWriter(writer):

  def removeReader(reader):

  def removeWriter(writer):

  def removeAll():

  def getReaders():

  def getWriters():
reactor.listenTCP()

示例中的reactor.listenTCP()注冊(cè)了一個(gè)監(jiān)聽事件,它是父類PosixReactorBase中方法。

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase):

  def listenTCP(self, port, factory, backlog=50, interface=''):
    p = tcp.Port(port, factory, backlog, interface, self)
    p.startListening()
    return p

# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
  def __init__(self, port, factory, backlog=50, interface='', reactor=None):
    """Initialize with a numeric port to listen on.
    """
    base.BasePort.__init__(self, reactor=reactor)
    self.port = port
    self.factory = factory
    self.backlog = backlog
    if abstract.isIPv6Address(interface):
      self.addressFamily = socket.AF_INET6
      self._addressType = address.IPv6Address
    self.interface = interface
  ...

  def startListening(self):
    """Create and bind my socket, and begin listening on it.
     創(chuàng)建并綁定套接字,開始監(jiān)聽。

    This is called on unserialization, and must be called after creating a
    server to begin listening on the specified port.
    """
    if self._preexistingSocket is None:
      # Create a new socket and make it listen
      try:
        # 創(chuàng)建套接字
        skt = self.createInternetSocket()
        if self.addressFamily == socket.AF_INET6:
          addr = _resolveIPv6(self.interface, self.port)
        else:
          addr = (self.interface, self.port)
        # 綁定
        skt.bind(addr)
      except socket.error as le:
        raise CannotListenError(self.interface, self.port, le)
      # 監(jiān)聽
      skt.listen(self.backlog)
    else:
      # Re-use the externally specified socket
      skt = self._preexistingSocket
      self._preexistingSocket = None
      # Avoid shutting it down at the end.
      self._shouldShutdown = False

    # Make sure that if we listened on port 0, we update that to
    # reflect what the OS actually assigned us.
    self._realPortNumber = skt.getsockname()[1]

    log.msg("%s starting on %s" % (
        self._getLogPrefix(self.factory), self._realPortNumber))

    # The order of the next 5 lines is kind of bizarre. If no one
    # can explain it, perhaps we should re-arrange them.
    self.factory.doStart()
    self.connected = True
    self.socket = skt
    self.fileno = self.socket.fileno
    self.numberAccepts = 100

    # startReading調(diào)用reactor的addReader方法將Port加入讀集合
    self.startReading()

整個(gè)邏輯很簡單,和正常的server端一樣,創(chuàng)建套接字、綁定、監(jiān)聽。不同的是將套接字的描述符添加到了reactor的讀集合。那么假如有了client連接過來的話,reactor會(huì)監(jiān)控到,然后觸發(fā)事件處理程序。

reacotr.run()事件主循環(huán)

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)

# twisted/internet/base.py
class _SignalReactorMixin(object):

  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase的父類_SignalReactorMixin和ReactorBase都有該函數(shù),但是
    _SignalReactorMixin在前,安裝mro順序的話,會(huì)先調(diào)用_SignalReactorMixin中的。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)

  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()

  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration是關(guān)鍵,select,poll,epool實(shí)現(xiàn)各有不同
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

mianLoop就是最終的主循環(huán)了,在循環(huán)中,調(diào)用doIteration方法監(jiān)控讀寫描述符的集合,一旦發(fā)現(xiàn)有描述符準(zhǔn)備好讀寫,就會(huì)調(diào)用相應(yīng)的事件處理程序。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):

  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)

  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.

    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      # 調(diào)用select方法監(jiān)控讀寫集合,返回準(zhǔn)備好讀寫的描述符
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise

    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.

        # 調(diào)用_doReadOrWrite方法
        _logrun(selectable, _drdw, selectable, method)

  doIteration = doSelect

  def _doReadOrWrite(self, selectable, method):
    try:
      # 調(diào)用method,doRead或者是doWrite,
      # 這里的selectable可能是我們監(jiān)聽的tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客戶端有連接請(qǐng)求了,就會(huì)調(diào)用讀集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py

@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):

  def doRead(self):
    """Called when my socket is ready for reading.
    當(dāng)套接字準(zhǔn)備好讀的時(shí)候調(diào)用

    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          # 調(diào)用accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise

        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport初始化的過程中,會(huì)將自身假如到reactor的讀集合中,那么當(dāng)它準(zhǔn)備
        # 好讀的時(shí)候,就可以調(diào)用它的doRead方法讀取客戶端發(fā)過來的數(shù)據(jù)了
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

doRead方法中,調(diào)用accept產(chǎn)生了用于接收客戶端數(shù)據(jù)的套接字,將套接字與transport綁定,然后把transport加入到reactor的讀集合。當(dāng)客戶端有數(shù)據(jù)到來時(shí),就會(huì)調(diào)用transport的doRead方法進(jìn)行數(shù)據(jù)讀取了。

Connection是Server(transport實(shí)例的類)的父類,它實(shí)現(xiàn)了doRead方法。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):

  def doRead(self):
    try:
      # 接收數(shù)據(jù)
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST

    return self._dataReceived(data)

  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    # 調(diào)用我們自定義protocol的dataReceived方法處理數(shù)據(jù)
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

_dataReceived中調(diào)用了示例中我們自定義的EchoProtocol的dataReceived方法處理數(shù)據(jù)。

至此,一個(gè)簡單的流程,從創(chuàng)建監(jiān)聽事件,到接收客戶端數(shù)據(jù)就此結(jié)束了。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 男人的天堂久久 | 国内精品久久久久香蕉 | 99久久精品6在线播放 | 亚洲高清中文字幕一区二区三区 | 国产精品合集一区二区 | 日韩 国产 欧美 | 亚洲高清在线视频 | 丝瓜视频黄色在线观看 | 网站色小妹 | 青青青视频免费线看 视频 青青青青青国产免费手机看视频 | 免费一级毛片完整版在线看 | 男人和女人全黄一级毛片 | 黄色cc| 午夜dj免费视频观看社区 | 日本肥熟 | 青青青手机视频在线观看 | 91yellow吧字幕网zmff7 | 情人梁家辉在线 | 国产午夜免费不卡精品理论片 | 国产欧美亚洲精品第一页青草 | 青草国产福利视频免费观看 | 久久精品国产免费播高清无卡 | 久久久久夜| 国内精品一区视频在线播放 | 色综合色狠狠天天综合色 | 亚洲国产精品成人综合久久久 | 久久五月综合婷婷中文云霸高清 | 日本大尺度动漫在线观看缘之空 | 九九九精品视频 | 五月天精品视频播放在线观看 | 日本一卡=卡三卡免费 | 四虎成人影院 | 性妲己| 国产视频中文字幕 | 四虎影视国产精品婷婷 | 婷婷色伊人 | 欧美日韩三区 | 成人在线第一页 | 欧美成人aletta ocean | 91欧美秘密入口 | 91大神精品 |