分类: Python/Ruby
2011-11-27 00:09:58
Tornado中实现了简单的libevent(ioloop.py)。为了兼容低版本的python(没有内置epoll模块)还特地用c实现了个epoll模块。
其实此类对epoll进行封装的异步io框架前面已经有很多文章说了,技术上没有太多新鲜的。这里无非看看python做同样事情的一些工具和idiom。
先看IOLoop类,这相当于libevent中的event句柄。所有的事件都是通过这个对象管理。在Tornado中它是个singleton模式的类。来看看python如何实现singleton模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | class IOLoop(object): # 此处删除大段声明代码 def __init__(self, impl=None): self._impl = impl or _poll() if hasattr(self._impl, 'fileno'): self._set_close_exec(self._impl.fileno()) self._handlers = {} self._events = {} self._callbacks = [] self._timeouts = [] self._running = False self._stopped = False self._blocking_signal_threshold = None # 此处又省略大段初始化代码 @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance @classmethod def initialized(cls): """Returns true if the singleton instance has been created.""" return hasattr(cls, "_instance") |
Tornado这个做法蛮不错的,即可以当单体类来用,又可以创建多个实例用于多线程应用。突然想到python里可以像C++那样阻止用户自己构造对象?貌似没有,因为python中没有很强访问控制机制。另外,python强制所有的类必须有构造函数?有时间考证一下。
其实上面还牵涉到@classmethod这个装饰器(Decorator)。容易与之混淆的是@staticmethod。在python中,每个对象后面其实还隐藏着一个对象:类型对象——描述对象的元对象(其实是在每个类后面藏着)。于是,这两个装饰器的区别就来了:前者是和类型对象关联的方法,方法接收一个类型对象作为参数。而后者就如C++中的静态函数一般,相当于是个全局函数了。Tornado中将单件的实例绑定到类对象上,确保了对象的唯一性,good。
至于IOLoop类中的成员变量,名称已经说明一切了。不多说。
IOLoop中的事件注册、更新和删除比较直白。这里略了。不过里面有个stack_context好像蛮有意思的。后面看看有必要再写篇文章吧。
看主循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ if self._stopped: self._stopped = False return self._running = True while True: # Never use an infinite timeout here - it can stall epoll poll_timeout = 0.2 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. callbacks = self._callbacks self._callbacks = [] for callback in callbacks: self._run_callback(callback) if self._callbacks: poll_timeout = 0.0 if self._timeouts: now = time.time() while self._timeouts: if self._timeouts[0].callback is None: # the timeout was cancelled heapq.heappop(self._timeouts) elif self._timeouts[0].deadline <= now: timeout = heapq.heappop(self._timeouts) self._run_callback(timeout.callback) else: milliseconds = self._timeouts[0].deadline - now poll_timeout = min(milliseconds, poll_timeout) break if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except (KeyboardInterrupt, SystemExit): raise except (OSError, IOError), e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) def stop(self): self._running = False self._stopped = True self._wake() |
主循环也是非常常规,IOLoop对象使用running和stopped两个变量组合来表示三个状态分别是:
初始/已结束:running->False, stopped->False
正在运行:running->True, stopped->False
正在结束:running->False, stopped->True
定时器用了一个heapq来管理(看到这玩意让我想到了stl中的make_heap),有关定时器的实现可以参考一下我以前的那篇文章:高性能服务器编程中的定时器。
整个函数分三部分:先处理回调队列、再处理超时、最后处理IO事件。对于处理回调队列时产生的新回调放到新队列中下一个loop处理,此举是为了避免后续的IO得不到处理。IO啊,伤不起啊。
小总结一下:
1.@classmethod于@staticmethod
2.singleton模式
3.heapq
4.getattr、hasattr、isinstance
5.每个函数都有docstring