-
>>>from twisted.internet import reactor
-
>>>reactor
-
<twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>
reactor本来是一个模块,怎么变成对象了?
查看 reactor.py, 看到就一个模块方法selectreactor.install()
查看install方法:
-
def install():
-
"""Configure the twisted mainloop to be run using the select() reactor.
-
"""
-
reactor = SelectReactor()
-
from twisted.internet.main import installReactor
-
installReactor(reactor)
这里生成了一个SelectReactor的对象,似乎就是我们要找的reactor.
再查看main.py
-
def installReactor(reactor):
-
# this stuff should be common to all reactors.
-
import twisted.internet
-
import sys
-
assert not sys.modules.has_key('twisted.internet.reactor'), \
-
"reactor already installed"
-
twisted.internet.reactor = reactor
-
sys.modules['twisted.internet.reactor'] = reactor
哦, reactor已经被偷梁换柱了.
回到selectreactor.py, 看看 SelectReactor 类是个什么东西.
SelectReactor 继承父类posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我们就去posixbase.py看看他爸爸是干什么的.
PosixReactorBase继承两个父类_SignalReactorMixin 和 ReactorBase. 先不管其他, 寻根溯源,这两个父类都来自于internet.base模块.
好吧, 找到这里算是到头了. ReactorBase 作为 "Reactor" 的基类, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin来扩展. 下面做下详细的分析.
# 一般来说, 建立一个服务器基本遵循以下几个步骤
# 以建立一个最基本的TCP服务器为例
#1
reactor.listenTCP(PORT, Factory())
#2
reactor.run()
对于#1比较好理解, 在posixbase.PosixReactorBase中
-
def listenTCP(self, port, factory, backlog=50, interface=''):
-
"""@see: twisted.internet.interfaces.IReactorTCP.listenTCP
-
"""
-
p = tcp.Port(port, factory, backlog, interface, self)
-
p.startListening()
-
return p
# 其中包括socket的建立,绑定等等一系列手续.
# 详细内容以后再表.
对于#2, 在base.SignalReactorMixin中
-
def run(self, installSignalHandlers=True):
-
self.startRunning(installSignalHandlers=installSignalHandlers)
-
self.mainLoop()
-
-
def mainLoop(self):
-
while self._started:
-
try:
-
while self._started:
-
# Advance simulation time in delayed event
-
# processors.
-
self.runUntilCurrent()
-
t2 = self.timeout()
-
t = self.running and t2
-
self.doIteration(t)
-
except:
-
log.msg("Unexpected error in main loop.")
-
log.err()
-
else:
-
log.msg('Main loop terminated.')
reactor一直孜孜不倦地执行两个方法:self.runUntilCurrent和 self.doIteration. 看看这两个函数都是干什么的:
-
# 在ReactorBase中, runUntilCurrent方法主要做了两件事,
-
# 把self.threadCallQueue和self.pendingTimedCalls 里的对象执行一遍
-
def runUntilCurrent(self):
-
if self.threadCallQueue:
-
# Keep track of how many calls we actually make, as we're
-
# making them, in case another call is added to the queue
-
# while we're in this loop.
-
count = 0
-
total = len(self.threadCallQueue)
-
for (f, a, kw) in self.threadCallQueue:
-
try:
-
f(*a, **kw)
-
except:
-
log.err()
-
count += 1
-
if count == total:
-
break
-
del self.threadCallQueue[:count]
-
if self.threadCallQueue:
-
if self.waker:
-
self.waker.wakeUp()
-
-
-
# insert new delayed calls now
-
self._insertNewDelayedCalls()
-
-
-
now = self.seconds()
-
while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
-
call = heappop(self._pendingTimedCalls)
-
if call.cancelled:
-
self._cancellations-=1
-
continue
-
-
-
if call.delayed_time > 0:
-
call.activate_delay()
-
heappush(self._pendingTimedCalls, call)
-
continue
-
-
-
try:
-
call.called = 1
-
call.func(*call.args, **call.kw)
-
except:
-
log.deferr()
-
if hasattr(call, "creator"):
-
e = "\n"
-
e += " C: previous exception occurred in " + \
-
"a DelayedCall created here:\n"
-
e += " C:"
-
e += "".join(call.creator).rstrip().replace("\n","\n C:")
-
e += "\n"
-
log.msg(e)
-
-
-
-
-
if (self._cancellations > 50 and
-
self._cancellations > len(self._pendingTimedCalls) >> 1):
-
self._cancellations = 0
-
self._pendingTimedCalls = [x for x in self._pendingTimedCalls
-
if not x.cancelled]
-
heapify(self._pendingTimedCalls)
-
-
-
if self._justStopped:
-
self._justStopped = False
-
self.fireSystemEvent("shutdown")
# 回到SelectReactor中,查看 doSelect(doIteration)方法
# _select既是select.select函数
# self._reads和self._writes内存储的应该都是类文件操作符,比如socket..
# 再看下self._doReadOrWrite方法,会发现所有的reader/writer都执行自身
# 的 doRead/doWrite方法.
-
def doSelect(self, timeout):
-
"""
-
Run one iteration of the I/O monitor loop.
-
-
-
This will run all selectables who had input or output readiness
-
waiting for them.
-
"""
-
while 1:
-
try:
-
r, w, ignored = _select(self._reads.keys(),
-
self._writes.keys(),
-
[], timeout)
-
break
-
except ValueError, ve:
-
# Possibly a file descriptor has gone negative?
-
log.err()
-
self._preenDescriptors()
-
except TypeError, te:
-
# Something *totally* invalid (object w/o fileno, non-integral
-
# result) was passed
-
log.err()
-
self._preenDescriptors()
-
except (select.error, IOError), se:
-
# select(2) encountered an error
-
if se.args[0] in (0, 2):
-
# windows does this if it got an empty list
-
if (not self._reads) and (not self._writes):
-
return
-
else:
-
raise
-
elif se.args[0] == EINTR:
-
return
-
elif se.args[0] == EBADF:
-
self._preenDescriptors()
-
else:
-
# OK, I really don't know what's going on. Blow up.
-
raise
-
_drdw = self._doReadOrWrite
-
_logrun = log.callWithLogger
-
for selectables, method, fdset in ((r, "doRead", self._reads),
-
(w,"doWrite", self._writes)):
-
for selectable in selectables:
-
# if this was disconnected in another thread, kill it.
-
# ^^^^ --- what the !@#*? -exarkun
-
if selectable not in fdset:
-
continue
-
# This for pausing input when we
好吧,从上面基本可以看出, reactor在run循环里做了两件事, 执行线程队列和延迟对象队列,操作类文件对象符.
对于线程队列和延迟对象队列, 还比较好理解.
对于类文件对象的队列, reactor 是什么时候把它们加进的呢?
写道
# 插播 ReactorBase.callLater方法
# 执行callLater后reactor把DelayedCall对象存放在_newTimedCalls队列中
# 在执行ReactorBase.runUntilCurrent时,
# reactor执行了_insertNewDelayedCalls 方法
# 把_newTimedCalls内的数据存入_pendingTimedCalls队列中
-
def callLater(self, _seconds, _f, *args, **kw):
-
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
-
self._cancelCallLater,
-
self._moveCallLaterSooner,
-
seconds=self.seconds)
-
self._newTimedCalls.append(tple)
-
return tple
# 同样对于thread
# callFromThread方法也是把thread存入到threadCallQueue中
# 直到在runUntilCurrent中执行
-
def callFromThread(self, f, *args, **kw):
-
self.threadCallQueue.append((f, args, kw))
观看上面的代码, reactor似乎没有主动加入过 reader/writer, reactor如何操作socket的呢?
重新想象reactor在run之前还做过什么?
对了, 连接/建立连接!
就如reactor.listenTCP
-
def listenTCP(self, port, factory, backlog=50, interface=''):
-
p = tcp.Port(port, factory, backlog, interface, self)
-
p.startListening()
-
return p
看看tcp.Port的设计
tcp.Port继承于base.BasePort 和 tcp._SocketCloser,
而base.BasePort 继承于abstract.FileDescriptor, 一个抽象的文件操作符类
tcp.Port实例化时没有做太多动作, 我们聚焦在方法 startListening 上
# tcp.Port.startListening 生成并绑定了一个socket
# 也没有做什么过多的动作, 直接看看最下面的startReading
-
def startListening(self):
-
try:
-
skt = self.createInternetSocket()
-
skt.bind((self.interface, self.port))
-
except socket.error, le:
-
raise CannotListenError, (self.interface, self.port, le)
-
-
-
# Make sure that if we listened on port 0, we update that to
-
# reflect what the OS actually assigned us.
-
self._realPortNumber = skt.getsockname()[1]
-
-
-
log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
-
-
-
# The order of the next 6 lines is kind of bizarre. If no one
-
# can explain it, perhaps we should re-arrange them.
-
self.factory.doStart()
-
skt.listen(self.backlog)
-
self.connected = True
-
self.socket = skt
-
self.fileno = self.socket.fileno
-
self.numberAccepts = 100
-
-
-
self.startReading()
# 一直找到abstract.FileDescriptor.startReading
# 执行了reactor.addReader
-
def startReading(self):
-
"""Start waiting for read availability.
-
"""
-
self.reactor.addReader(self)
# selectreactor.SelectReactor.addReader指明了
# 一个tcp.Port对象被作为reader加入到了reactor的reads队列中
-
def addReader(self, reader):
-
"""
-
Add a FileDescriptor for notification of data available to read.
-
"""
-
self._reads[reader] = 1
原来在这里, 在reactor.listenTCP时候就被加入到了reader队列中.
赶紧回头看看, 在 selectreactor.SelectReactor.doSelect中,如果一个类文件操作符状态改变了,会执行其doRead/doWriter方法.那去看看作为reader的tcp.Port的doRead方法.
# tcp.Port的socket接受了一个连接,
# 并执行了self.factory.buildProtocol方法生成一个portocol
# 通过self.transport生成了一个tcp.Server对象
-
def doRead(self):
-
try:
-
if platformType == "posix":
-
numAccepts = self.numberAccepts
-
else:
-
# win32 event loop breaks if we do more than one accept()
-
# in an iteration of the event loop.
-
numAccepts = 1
-
for i in range(numAccepts):
-
# we need this so we can deal with a factory
虽然还有点迷糊, 不过知道了protocol对象产生于此处.那这个产生的transport实例具体作用是什么呢?
先看下 protocol.makeConnection
-
# protocol.BaseProtocol
-
def makeConnection(self, transport):
-
self.connected = 1
-
self.transport = transport
-
self.connectionMade()
看到了一个熟悉的方法connectionMade!
protocol的三个事件方法 connectionMade, dataReceived, connectionLost是protocol最重要的三个方法了.
其一出现了, 剩下的两个是在何处被触发的呢?
先不急, 先看看transport 是怎么回事:
tcp.Server 来自于 父类 tcp.Connection. 而Connection继承于abstract.FileDescriptor,又是一个类文件符.
tcp.Server实例时还是做了点小动作的
-
# tcp.Server
-
def __init__(self, sock, protocol, client, server, sessionno, reactor):
-
Connection.__init__(self, sock, protocol, reactor)
-
self.server = server
-
self.client = client
-
self.sessionno = sessionno
-
self.hostname = client[0]
-
self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
-
sessionno,
-
self.hostname)
-
self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
-
self.sessionno,
-
self.server._realPortNumber)
-
self.startReading()
-
self.connected = 1
self.startReading从 abstract.FileDescriptor上知晓是把 该实例作为reader加入到reactor队列中的.
那我们就看看tcp.Server的doRead方法
-
# tcp.Connection
-
def doRead(self):
-
"""Calls self.protocol.dataReceived with all available data.
-
-
-
This reads up to self.bufferSize bytes of data from its socket, then
-
calls self.dataReceived(data) to process it. If the connection is not
-
lost through an error in the physical recv(), this function will return
-
the result of the dataReceived call.
-
"""
-
try:
-
data = self.socket.recv(self.bufferSize)
-
except socket.error, se:
-
if se.args[0] == EWOULDBLOCK:
-
return
-
else:
-
return main.CONNECTION_LOST
-
if not data:
-
return main.CONNECTION_DONE
-
return self.protocol.dataReceived(data)
眼前一亮, dataReceived方法!
原文链接