Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4998711
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2015-06-24 16:17:41


  1. >>>from twisted.internet import reactor
  2. >>>reactor
  3. <twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>
reactor本来是一个模块,怎么变成对象了?
查看 reactor.py, 看到就一个模块方法selectreactor.install()

查看install方法:

  1. def install():
  2.     """Configure the twisted mainloop to be run using the select() reactor.
  3.     """
  4.     reactor = SelectReactor()
  5.     from twisted.internet.main import installReactor
  6.     installReactor(reactor)
这里生成了一个SelectReactor的对象,似乎就是我们要找的reactor.

再查看main.py



  1. def installReactor(reactor):
  2.     # this stuff should be common to all reactors.
  3.     import twisted.internet
  4.     import sys
  5.     assert not sys.modules.has_key('twisted.internet.reactor'), \
  6.            "reactor already installed"
  7.     twisted.internet.reactor = reactor
  8.     sys.modules['twisted.internet.reactor'] = reactor

哦, reactor已经被偷梁换柱了.

回到selectreactor.py, 看看 SelectReactor 类是个什么东西.

SelectReactor 继承父类posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我们就去posixbase.py看看他爸爸是干什么的.

PosixReactorBase继承两个父类_SignalReactorMixin 和 ReactorBase.  先不管其他,  寻根溯源,这两个父类都来自于internet.base模块.

好吧, 找到这里算是到头了. ReactorBase 作为 "Reactor" 的基类, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin来扩展. 下面做下详细的分析.

# 一般来说, 建立一个服务器基本遵循以下几个步骤
# 以建立一个最基本的TCP服务器为例

#1
reactor.listenTCP(PORT, Factory())
#2
reactor.run()
 
对于#1比较好理解, 在posixbase.PosixReactorBase中



  1. def listenTCP(self, port, factory, backlog=50, interface=''):
  2.         """@see: twisted.internet.interfaces.IReactorTCP.listenTCP
  3.         """
  4.         p = tcp.Port(port, factory, backlog, interface, self)
  5.         p.startListening()
  6.         return p

# 其中包括socket的建立,绑定等等一系列手续.
# 详细内容以后再表.

对于#2, 在base.SignalReactorMixin中

  1. def run(self, installSignalHandlers=True):
  2.         self.startRunning(installSignalHandlers=installSignalHandlers)
  3.         self.mainLoop()

  4. def mainLoop(self):
  5.         while self._started:
  6.             try:
  7.                 while self._started:
  8.                     # Advance simulation time in delayed event
  9.                     # processors.
  10.                     self.runUntilCurrent()
  11.                     t2 = self.timeout()
  12.                     t = self.running and t2
  13.                     self.doIteration(t)
  14.             except:
  15.                 log.msg("Unexpected error in main loop.")
  16.                 log.err()
  17.             else:
  18.                 log.msg('Main loop terminated.')

reactor一直孜孜不倦地执行两个方法:self.runUntilCurrent和 self.doIteration. 看看这两个函数都是干什么的:

  1. # 在ReactorBase中, runUntilCurrent方法主要做了两件事,
  2. # 把self.threadCallQueue和self.pendingTimedCalls 里的对象执行一遍
  3. def runUntilCurrent(self):
  4.         if self.threadCallQueue:
  5.             # Keep track of how many calls we actually make, as we're
  6.             # making them, in case another call is added to the queue
  7.             # while we're in this loop.
  8.             count = 0
  9.             total = len(self.threadCallQueue)
  10.             for (f, a, kw) in self.threadCallQueue:
  11.                 try:
  12.                     f(*a, **kw)
  13.                 except:
  14.                     log.err()
  15.                 count += 1
  16.                 if count == total:
  17.                     break
  18.             del self.threadCallQueue[:count]
  19.             if self.threadCallQueue:
  20.                 if self.waker:
  21.                     self.waker.wakeUp()


  22.         # insert new delayed calls now
  23.         self._insertNewDelayedCalls()


  24.         now = self.seconds()
  25.         while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
  26.             call = heappop(self._pendingTimedCalls)
  27.             if call.cancelled:
  28.                 self._cancellations-=1
  29.                 continue


  30.             if call.delayed_time > 0:
  31.                 call.activate_delay()
  32.                 heappush(self._pendingTimedCalls, call)
  33.                 continue


  34.             try:
  35.                 call.called = 1
  36.                 call.func(*call.args, **call.kw)
  37.             except:
  38.                 log.deferr()
  39.                 if hasattr(call, "creator"):
  40.                     e = "\n"
  41.                     e += " C: previous exception occurred in " + \
  42.                          "a DelayedCall created here:\n"
  43.                     e += " C:"
  44.                     e += "".join(call.creator).rstrip().replace("\n","\n C:")
  45.                     e += "\n"
  46.                     log.msg(e)




  47.         if (self._cancellations > 50 and
  48.              self._cancellations > len(self._pendingTimedCalls) >> 1):
  49.             self._cancellations = 0
  50.             self._pendingTimedCalls = [x for x in self._pendingTimedCalls
  51.                                        if not x.cancelled]
  52.             heapify(self._pendingTimedCalls)


  53.         if self._justStopped:
  54.             self._justStopped = False
  55.             self.fireSystemEvent("shutdown")

# 回到SelectReactor中,查看 doSelect(doIteration)方法
# _select既是select.select函数
# self._reads和self._writes内存储的应该都是类文件操作符,比如socket..
# 再看下self._doReadOrWrite方法,会发现所有的reader/writer都执行自身
# 的 doRead/doWrite方法.

  1. def doSelect(self, timeout):
  2.         """
  3.         Run one iteration of the I/O monitor loop.


  4.         This will run all selectables who had input or output readiness
  5.         waiting for them.
  6.         """
  7.         while 1:
  8.             try:
  9.                 r, w, ignored = _select(self._reads.keys(),
  10.                                         self._writes.keys(),
  11.                                         [], timeout)
  12.                 break
  13.             except ValueError, ve:
  14.                 # Possibly a file descriptor has gone negative?
  15.                 log.err()
  16.                 self._preenDescriptors()
  17.             except TypeError, te:
  18.                 # Something *totally* invalid (object w/o fileno, non-integral
  19.                 # result) was passed
  20.                 log.err()
  21.                 self._preenDescriptors()
  22.             except (select.error, IOError), se:
  23.                 # select(2) encountered an error
  24.                 if se.args[0] in (0, 2):
  25.                     # windows does this if it got an empty list
  26.                     if (not self._reads) and (not self._writes):
  27.                         return
  28.                     else:
  29.                         raise
  30.                 elif se.args[0] == EINTR:
  31.                     return
  32.                 elif se.args[0] == EBADF:
  33.                     self._preenDescriptors()
  34.                 else:
  35.                     # OK, I really don't know what's going on. Blow up.
  36.                     raise
  37.         _drdw = self._doReadOrWrite
  38.         _logrun = log.callWithLogger
  39.         for selectables, method, fdset in ((r, "doRead", self._reads),
  40.                                            (w,"doWrite", self._writes)):
  41.             for selectable in selectables:
  42.                 # if this was disconnected in another thread, kill it.
  43.                 # ^^^^ --- what the !@#*? -exarkun
  44.                 if selectable not in fdset:
  45.                     continue
  46.                 # This for pausing input when we

好吧,从上面基本可以看出, reactor在run循环里做了两件事, 执行线程队列和延迟对象队列,操作类文件对象符.

对于线程队列和延迟对象队列, 还比较好理解.
对于类文件对象的队列, reactor 是什么时候把它们加进的呢?

写道
# 插播 ReactorBase.callLater方法 
# 执行callLater后reactor把DelayedCall对象存放在_newTimedCalls队列中 
# 在执行ReactorBase.runUntilCurrent时, 
# reactor执行了_insertNewDelayedCalls 方法 
# 把_newTimedCalls内的数据存入_pendingTimedCalls队列中 

  1. def callLater(self, _seconds, _f, *args, **kw):
  2.     tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
  3.     self._cancelCallLater,
  4.     self._moveCallLaterSooner,
  5.     seconds=self.seconds)
  6.     self._newTimedCalls.append(tple)
  7.     return tple

# 同样对于thread 
# callFromThread方法也是把thread存入到threadCallQueue中 
# 直到在runUntilCurrent中执行 

  1. def callFromThread(self, f, *args, **kw):
  2.     self.threadCallQueue.append((f, args, kw))


观看上面的代码, reactor似乎没有主动加入过 reader/writer, reactor如何操作socket的呢?
重新想象reactor在run之前还做过什么?
对了, 连接/建立连接!
就如reactor.listenTCP


  1. def listenTCP(self, port, factory, backlog=50, interface=''):
  2.     p = tcp.Port(port, factory, backlog, interface, self)
  3.     p.startListening()
  4.     return p
看看tcp.Port的设计
tcp.Port继承于base.BasePort 和 tcp._SocketCloser,
而base.BasePort 继承于abstract.FileDescriptor, 一个抽象的文件操作符类
tcp.Port实例化时没有做太多动作, 我们聚焦在方法 startListening 上

# tcp.Port.startListening 生成并绑定了一个socket
# 也没有做什么过多的动作, 直接看看最下面的startReading



  1. def startListening(self):
  2.         try:
  3.             skt = self.createInternetSocket()
  4.             skt.bind((self.interface, self.port))
  5.         except socket.error, le:
  6.             raise CannotListenError, (self.interface, self.port, le)


  7.         # Make sure that if we listened on port 0, we update that to
  8.         # reflect what the OS actually assigned us.
  9.         self._realPortNumber = skt.getsockname()[1]


  10.         log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))


  11.         # The order of the next 6 lines is kind of bizarre. If no one
  12.         # can explain it, perhaps we should re-arrange them.
  13.         self.factory.doStart()
  14.         skt.listen(self.backlog)
  15.         self.connected = True
  16.         self.socket = skt
  17.         self.fileno = self.socket.fileno
  18.         self.numberAccepts = 100


  19.         self.startReading()

# 一直找到abstract.FileDescriptor.startReading
# 执行了reactor.addReader

  1. def startReading(self):
  2.     """Start waiting for read availability.
  3.     """
  4.     self.reactor.addReader(self)

# selectreactor.SelectReactor.addReader指明了
# 一个tcp.Port对象被作为reader加入到了reactor的reads队列中

  1. def addReader(self, reader):
  2.     """
  3.     Add a FileDescriptor for notification of data available to read.
  4.     """
  5.     self._reads[reader] = 1
原来在这里, 在reactor.listenTCP时候就被加入到了reader队列中.
赶紧回头看看, 在 selectreactor.SelectReactor.doSelect中,如果一个类文件操作符状态改变了,会执行其doRead/doWriter方法.那去看看作为reader的tcp.Port的doRead方法.


# tcp.Port的socket接受了一个连接,
# 并执行了self.factory.buildProtocol方法生成一个portocol
# 通过self.transport生成了一个tcp.Server对象



  1. def doRead(self):
  2.         try:
  3.             if platformType == "posix":
  4.                 numAccepts = self.numberAccepts
  5.             else:
  6.                 # win32 event loop breaks if we do more than one accept()
  7.                 # in an iteration of the event loop.
  8.                 numAccepts = 1
  9.             for i in range(numAccepts):
  10.                 # we need this so we can deal with a factory

虽然还有点迷糊, 不过知道了protocol对象产生于此处.那这个产生的transport实例具体作用是什么呢?
先看下 protocol.makeConnection

  1. # protocol.BaseProtocol
  2. def makeConnection(self, transport):
  3.     self.connected = 1
  4.     self.transport = transport
  5.     self.connectionMade()

看到了一个熟悉的方法connectionMade!
protocol的三个事件方法 connectionMade, dataReceived, connectionLost是protocol最重要的三个方法了.
其一出现了, 剩下的两个是在何处被触发的呢?

先不急, 先看看transport 是怎么回事:

tcp.Server 来自于 父类 tcp.Connection. 而Connection继承于abstract.FileDescriptor,又是一个类文件符.
tcp.Server实例时还是做了点小动作的


  1. # tcp.Server
  2. def __init__(self, sock, protocol, client, server, sessionno, reactor):
  3.         Connection.__init__(self, sock, protocol, reactor)
  4.         self.server = server
  5.         self.client = client
  6.         self.sessionno = sessionno
  7.         self.hostname = client[0]
  8.         self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
  9.                                     sessionno,
  10.                                     self.hostname)
  11.         self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
  12.                                           self.sessionno,
  13.                                           self.server._realPortNumber)
  14.         self.startReading()
  15.         self.connected = 1

self.startReading从 abstract.FileDescriptor上知晓是把 该实例作为reader加入到reactor队列中的.

那我们就看看tcp.Server的doRead方法

  1. # tcp.Connection
  2. def doRead(self):
  3.         """Calls self.protocol.dataReceived with all available data.


  4.         This reads up to self.bufferSize bytes of data from its socket, then
  5.         calls self.dataReceived(data) to process it. If the connection is not
  6.         lost through an error in the physical recv(), this function will return
  7.         the result of the dataReceived call.
  8.         """
  9.         try:
  10.             data = self.socket.recv(self.bufferSize)
  11.         except socket.error, se:
  12.             if se.args[0] == EWOULDBLOCK:
  13.                 return
  14.             else:
  15.                 return main.CONNECTION_LOST
  16.         if not data:
  17.             return main.CONNECTION_DONE
  18.         return self.protocol.dataReceived(data)
眼前一亮, dataReceived方法!

原文链接


阅读(1086) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~