原文地址:
http://blog.csdn.net/yueguanghaidao/article/details/24281751
一直对gevent运行流程比较模糊,最近看源码略有所得,不敢独享,故分享之。
gevent是一个高性能网络库,底层是libevent,1.0版本之后是libev,核心是greenlet。gevent和eventlet是亲近,唯一不同的是eventlet是自己实现的事件驱动,而gevent是使用libev。两者都有广泛的应用,如openstack底层网络通信使用eventlet,goagent是使用gevent。
要想理解gevent首先要理解gevent的调度流程,gevent中有一个hub的概念,也就是下图的MainThread,用于调度所有其它的greenlet实例(下图Coroutine)。
其实hub也是一个greenlet,只不过特殊一些。
看下图我们会发现每次从hub切换到一个greenlet后,都会回到hub,这就是gevent的关键。
注意:gevent中并没有greenlet链的说法,所有都是向主循环注册greenlet.switch方法,主循环在合适的时机切换回来。
也许大家会好奇,为什么采用这种模式,为什么每次都要切换到hub?我想理由有二:
1.hub是事件驱动的核心,每次切换到hub后将继续循环事件。如果在一个greenlet中不出来,那么其它greenlet将得不到调用。
2.维持两者关系肯定比维持多个关系简单。每次我们所关心的就是hub以及当前greenlet,不需要考虑各个greenlet之间关系。
我们看看最简单的gevent.sleep发生了什么?
我们先想想最简单的sleep(0)该如何调度?根据上面很明显
1.向事件循环注册当前greenlet的switch函数
2.切换到hub,运行主事件循环
-
def sleep(seconds=0, ref=True):
-
hub = get_hub()
-
loop = hub.loop
-
if seconds <= 0:
-
waiter = Waiter()
-
loop.run_callback(waiter.switch)
-
waiter.get()
-
else:
-
hub.wait(loop.timer(seconds, ref=ref))
当seconds小于等于0时,loop.run_callback(waiter.switch)即是将当前greenlet的switch注册到loop,使用waiter.get()切换到hub。那么很明显,
当切换到hub后当调用刚注册的回调(waiter.switch)回到刚刚sleep所在的greenlet。
不熟悉Waiter的童鞋可能对上面说的有点模糊,下面我们好好看看Waiter是什么。
-
>>> result = Waiter()
-
>>> timer = get_hub().loop.timer(0.1)
-
>>> timer.start(result.switch, 'hello from Waiter')
-
>>> result.get()
-
'hello from Waiter'
timer.start(result.switch, 'hello from Waiter')我们向hub的主循环注册一个0.1s的定时器,回调为result.switch,然后将执行result.get(),此时过程代码如下:
-
def get(self):
-
assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
-
self.greenlet = getcurrent()
-
try:
-
return self.hub.switch()
-
finally:
-
self.greenlet = None
将把self.greenlet设置为当前greenlet,然后通过self.hub.switch()切换到主循环,很明显在主循环中将回调result.switch,看代码:
-
def switch(self, value=None):
-
-
greenlet = self.greenlet
-
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
-
switch = greenlet.switch
-
try:
-
switch(value)
-
except:
-
self.hub.handle_error(switch, *sys.exc_info())
拿到刚保存的greenlet,然后切换到greenlet.switch(),返回到我们刚调用reuslt.get()方法。通过上面assert我们也可以看出这是在hub中调用的。
通过以上分析,小伙伴们肯定都懂了gevent的执行流程了。
这里有个问题,如果上面先发生result.switch,那又该如何呢?就像下面这样:
-
>>> result = Waiter()
-
>>> timer = get_hub().loop.timer(0.1)
-
>>> timer.start(result.switch, 'hi from Waiter')
-
>>> sleep(0.2)
-
>>> result.get()
-
'hi from Waiter'
我想聪明的你,打开hub.py再看看源码肯定就明白了(上面Waiter代码是我特意简化的)。
既然我们知道了gevent运行流程,下面我们看看gevent.spawn和join到底做了什么?
gevent.spawn其实就是Greenlet.spawn,所以gevent.spawn就是创建一个greenlet,并将该greenlet的switch()加入hub主循环回调。
-
class Greenlet(greenlet):
-
-
-
def __init__(self, run=None, *args, **kwargs):
-
hub = get_hub()
-
greenlet.__init__(self, parent=hub)
-
if run is not None:
-
self._run = run
-
self._start_event = None
-
-
def start(self):
-
-
if self._start_event is None:
-
self._start_event = self.parent.loop.run_callback(self.switch)
-
-
@classmethod
-
def spawn(cls, *args, **kwargs):
-
-
-
-
-
g = cls(*args, **kwargs)
-
g.start()
-
return g
通过下面代码证明:
-
import gevent
-
-
def talk(msg):
-
print(msg)
-
-
g1 = gevent.spawn(talk, 'bar')
-
gevent.sleep(0)
将输出:bar,我们通过sleep切换到hub,然后hub将运行我们添加的回调talk,一切正常。
此时不要沾沾自喜,如果下面代码也觉得一切正常再高兴也不迟。
-
import gevent
-
-
def talk(msg):
-
print(msg)
-
gevent.sleep(0)
-
print msg
-
-
g1 = gevent.spawn(talk, 'bar')
-
gevent.sleep(0)
这次还是输出:bar,有点不对劲啊,应该输出两个bar才对,为什么为导致这样呢?
我们来好好分析流程:
1.gevent.spawn注册回调talk
2.然后最后一行gevent.sleep(0)注册当前greenlet.switch(最外面的)到hub,然后切换到hub
3.hub执行回调talk,打印"bar",此时gevent.sleep再次将g1.switch注册到hub,同时切换到hub
4.由于第2步最外层greenlet现注册,所以将调用最外层greenlet,此时很明显,程序将结束。因为最外层greenlet并不是hub的子greenlet,
所以died后并不会回到父greenlet,即hub
你可能会说那我自己手动切换到hub不就可以了吗?这将导致主循环结束不了的问题。
-
import gevent
-
-
def talk(msg):
-
print(msg)
-
gevent.sleep(0)
-
print msg
-
-
g1 = gevent.spawn(talk, 'bar')
-
gevent.get_hub().switch()
程序输出:
-
bar
-
bar
-
Traceback (most recent call last):
-
File "F:\py_cgi\geve.py", line 9, in
-
gevent.get_hub().switch()
-
File "C:\Python26\lib\site-packages\gevent\hub.py", line 331, in switch
-
return greenlet.switch(self)
-
gevent.hub.LoopExit: This operation would block forever
虽然成功的输出了两次“bar",但也导致了更为严重的问题。
这也就是join存在的价值,我们看看join是如何做到的?
-
def join(self, timeout=None):
-
-
-
-
if self.ready():
-
return
-
else:
-
switch = getcurrent().switch
-
self.rawlink(switch)
-
try:
-
t = Timeout.start_new(timeout)
-
try:
-
result = self.parent.switch()
-
assert result is self, 'Invalid switch into Greenlet.join(): %r' % (result, )
-
finally:
-
t.cancel()
-
except Timeout:
-
self.unlink(switch)
-
if sys.exc_info()[1] is not t:
-
raise
-
except:
-
self.unlink(switch)
-
raise
-
-
-
def rawlink(self, callback):
-
-
-
-
-
if not callable(callback):
-
raise TypeError('Expected callable: %r' % (callback, ))
-
self._links.append(callback)
-
if self.ready() and self._links and not self._notifier:
-
self._notifier = self.parent.loop.run_callback(self._notify_links)
-
-
def _notify_links(self):
-
while self._links:
-
link = self._links.popleft()
-
try:
-
link(self)
-
except:
-
self.parent.handle_error((link, self), *sys.exc_info())
从代码中可以看出,join会保存当前greenlet.switch到一个队列中,并注册_notify_links回调,然后切换到hub,在_notify_links回调中将依次调用先前注册在队列中的回调。
而我们调用g1.join()将会把最外层greenlet.switch注册到队列中,当回调时就顺利结束程序了。很完美!!!
阅读(803) | 评论(0) | 转发(0) |