为了更游刃有余的使用gevent,我们必须搞懂gevent底层的运行原理,理解本文需要知道libevent和greenlet的使用。
从一段代码开始
例子一:
import gevent def test1(): print 'test1' test1_greenlet = gevent.spawn(test1)
执行完代码,木有任何输出程序就结束了。
OK,第一个问题:gevent.spawn()到底做了什么。 打开源码文件 gevent/greenlet.py,查看Greenlet
Greenlet继承了greenlet模块()的greenlet类(有点拗口)。
gevent.spawn(test1) 中初始化了一个Greenlet,并且调用start()
Greenlet初始化代码:
def __init__(self, run=None, *args, **kwargs): hub = get_hub() # hub是什么,是做什么的? greenlet.__init__(self, parent=hub) if run is not None: self._run = run # 在该greenlet的switch函数被调用时执行run() self.args = args self.kwargs = kwargs self._links = deque() # _links用来保存回调函数,回调函数在greenlet死亡后执行 self.value = None # greenlet跳转的时候可以将将此值传递给下个greenlet self._exception = _NONE # 存储异常 self._notifier = None # 存储一个pending callback事件 self._start_event = None # 保存loop.callback
又抛出一个问题:hub是什么,是做什么的?
get_hub()通过单例模式获取一个Hub类,Hub类也继承了Greenlet。greenlet.init(self, parent=hub)这段代码也解释了所有gevent.Greenlet生成greenlet的父类都是Hub,换句话说,就是所有由gevent spawn 返回的greenlet在死亡后都会将执行点切到Hub里。
Hub类在初始化的时候,有一个非常重要的属性——loop,这个就是libev最快的event loop(epoll on Linux, kqueue on FreeBSD)。
Hub的run函数执行了loop.run(), 这个函数对应的是libev中ev_run, 遍历所有pending事件,这也是个循环,循环会在所有pending事件结束后结束。理论上来说,这个run循环不会结束,也就是说ev_run循环中永远会有pending事件,否则会抛出LoopExit(‘This operation would block forever’)异常。Hub会在Main Greenlet结束后退出。
greenlet的start()函数就是将自己的switch函数注册到loop的pending事件中。
def start(self): """Schedule the greenlet to run in this loop iteration""" if self._start_event is None: self._start_event = self.parent.loop.run_callback(self.switch)
好了,该对例子一代码做一个总结:gevent.spawn(test1) 初始化一个greenlet,并将该greenlet.switch注册到hub.loop的pending事件中,因为hub.switch()没有调用而导致hub的io loop没办法开始,所以pending事件无法运行,test1也就木有执行。
为了让test1()能够运行起来,我们可以调用hub.switch()来解决这个问题,每个greenlet的parent都是hub,如下代码:
例子二:
import gevent def test1(): print 'test1' test1_greenlet = gevent.spawn(test1) test1_greenlet.parent.switch()
执行完出错。gevent.hub.LoopExit: This operation would block forever
因为hub.loop的pending事件只有test1,执行完之后hub.loop.run结束, 导致hub向Main Greenlet抛异常
self.parent.throw(LoopExit('This operation would block forever'))
例子二总结:在执行点切换至hub之前,必须将当前greenlet的switch函数注册至hub.loop的pending事件中。
gevent提供一个封装好的机制,gevent.sleep(0)
例子三:
import gevent def test1(): print 'test1' test1_greenlet = gevent.spawn(test1) gevent.sleep(0)
test1终于被打印出来了,我们来看看gevent.sleep(0)做了什么。
def sleep(seconds=0, ref=True): hub = get_hub() loop = hub.loop if seconds <= 0: waiter = Waiter() loop.run_callback(waiter.switch) waiter.get() else: hub.wait(loop.timer(seconds, ref=ref))
这里有个关键对象Waiter, 看源码文档,A low level communication utility for greenlets. 哈,是greenlets之间沟通的工具类。我们研究一下他到底做了什么。
Waiter有一个greenlet属性,表示当前这个waiter属于哪个greenlet。
Waiter.switch()只能在hub中调用,能将执行点切换到waiter所属的greenlet。
def switch(self, value=None): """Switch to the greenlet if one's available. Otherwise store the value.""" greenlet = self.greenlet if greenlet is None: self.value = value self._exception = None else: assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" switch = greenlet.switch try: switch(value) # 将执行点切换至waiter所属的greenlet的run函数中 except: self.hub.handle_error(switch, *sys.exc_info())
Waiter.get() 设置当前waiter所属的greenlet,并将当前执行点切换到hub里去。
def get(self): """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) self.greenlet = getcurrent() #设置当前waiter对象所属的greenlet try: return self.hub.switch() #将执行点切换到hub中的hub.loop.run 循环里去 finally: self.greenlet = None
解释一下例子三的运行过程,
test1_greenlet = gevent.spawn(test1) 这句代码表示 将test1 greenlet的switch注册到hub.loop的pengding事件中
gevent.sleep(0) 这句代码表示 waiter.switch 会注册到hub.loop的pending事件中,将waiter的greenlet属性设置为当前greenlet(在这里是main greenlet),并将执行点切换到hub.loop.run的函数中,第一次先执行test1 greenlet的switch,打印test1,然后执行waiter的switch,main greenlet执行结束,整个程序结束。