鼎鼎大名的 Gevent 。
协程核心思想是通过再次切分时间片,进行细粒度的上下文切换达到“并行”的效果,这种行为与线程类似,但是不在系统层面而是在应用的层面开销很小,所以称为协程。
- 所以某种程度上来说,协程的调度是应用主动进行的。
- 可以通过调用 gevent.sleep 来主动的进行协程调度
- gevent.select 也可以触发协程调度。
- 理想中的状态是,在发生 IO 和网络等待时触发协程调度。
- 相较于系统的线程调度,greenlet 的调度是可以预期的,但是这往往并不能代表结果就是可预期的,因为我们往往面对着随机的网络等待和IO等待。
- 可以使用 Greenlet.spawn 和 gevent.spawn 直接使用函数发起 greenlet ,使用 gevent.joinall 来同步协程的运行结果。
- 如果想用类的方式制作 greenlet 的执行体,需要继承 Greenlet 类,并且实现 _run 方法作为,执行的启动调用。
- Greenlet 的状态可以如下操作:
started — Boolean, 协程是否启动
ready() — Boolean, 协程是否被挂起
successful() — Boolean, 协程是否执行完毕并且没有异常发生
value — arbitrary, 协程的返回值
exception — exception, 协程发生的异常,但是不会从协程中抛出到主逻辑进程中
- 使用 gevent.signal(signal.SIGQUIT, gevent.kill) 来时 SIGQUIT 信号与 gevent.kill 关联起来,防止僵尸协程的出现。
- gevent 中有很多种处理超时的方法。
-
import gevent
-
from gevent import Timeout
-
-
seconds = 10
-
-
timeout = Timeout(seconds)
-
timeout.start()
-
-
def wait():
-
gevent.sleep(10)
-
-
try:
-
gevent.spawn(wait).join()
-
except Timeout:
-
print('Could not complete')
也可以使用with
-
import gevent
-
from gevent import Timeout
-
-
time_to_wait = 5 # seconds
-
-
class TooLong(Exception):
-
pass
-
-
with Timeout(time_to_wait, TooLong):
-
gevent.sleep(10)
- 猴子补丁
使用猴子补丁以后,gevent 替换了标准库的 socket 实现。
猴子补丁通过使用 gevent.monkey.patch_all 调用,也可以调用具体某个方面的补丁:
gevent.monkey.patch_socket(dns=True, aggressive=True)
gevent.monkey.patch_ssl()
gevent.monkey.patch_os() # 替换了 fork
gevent.monkey.patch_time() # 替换了 sleep
gevent.monkey.patch_select(aggressive=True) # 替换了 select
gevent.monkey.patch_thread(threading=True, _threading_local=True, Event=False)
gevent.monkey.patch_subprocess()
gevent.monkey.patch_sys(stdin=True, stdout=True, stderr=True)
- Gevent.event 相当于信号锁。event 的 set 方法可以通知所有的 wait 调用
- AsyncResult 可以在不同的协程中传递信息,可以有多个接收,(这点与 go 中的 channel 不同,channel 是有个数缓冲的, gevent 中类似广播机制)
- Gevent 中的 Queue 可以在不同协程中使用。
- 注意 Queue 中的 get put 与 get_nowait put_nowait 的区别
- 队列为空之后进行 get 将会阻塞,如果设置了超时,则超时后以后会抛出 Empty 异常
- 可以用 group 管理一组协程,方便 join 等操作
- group 也提供很多方法方便处理协程,如 map imap imap_unordered 等
- pool 与 group 作用类似,但是pool会限制并发数。
- gevent 提供了 local 作为保存协程内部变量使用,(效率更高?)不能跨协程使用。
- gevent 也提供 Subprocess 相关操作,可以直接用来替换标准库中的相关函数,这些都可以在协程环境下工作。
- 利用 gevent.socket.wait_read 与 wait_write 与标准库 multiprocessing 一起工作。
-
import gevent
-
from multiprocessing import Process, Pipe
-
from gevent.socket import wait_read, wait_write
-
-
# To Process
-
a, b = Pipe()
-
-
# From Process
-
c, d = Pipe()
-
-
def relay():
-
for i in xrange(10):
-
msg = b.recv()
-
c.send(msg + " in " + str(i))
-
-
def put_msg():
-
for i in xrange(10):
-
wait_write(a.fileno())
-
a.send('hi')
-
-
def get_msg():
-
for i in xrange(10):
-
wait_read(d.fileno())
-
print(d.recv())
-
-
if __name__ == '__main__':
-
proc = Process(target=relay)
-
proc.start()
-
-
g1 = gevent.spawn(get_msg)
-
g2 = gevent.spawn(put_msg)
-
gevent.joinall([g1, g2], timeout=1)
- 利用 Gevent 实现 Actor 模式。
-
import gevent
-
from gevent.queue import Queue
-
-
class Actor(gevent.Greenlet):
-
-
def __init__(self):
-
self.inbox = Queue()
-
Greenlet.__init__(self)
-
-
def receive(self, message):
-
"""
-
Define in your subclass.
-
"""
-
raise NotImplemented()
-
-
def _run(self):
-
self.running = True
-
-
while self.running:
-
message = self.inbox.get()
-
self.receive(message)
使用
-
import gevent
-
from gevent.queue import Queue
-
from gevent import Greenlet
-
-
class Pinger(Actor):
-
def receive(self, message):
-
print(message)
-
pong.inbox.put('ping')
-
gevent.sleep(0)
-
-
class Ponger(Actor):
-
def receive(self, message):
-
print(message)
-
ping.inbox.put('pong')
-
gevent.sleep(0)
-
-
ping = Pinger()
-
pong = Ponger()
-
-
ping.start()
-
pong.start()
-
-
ping.inbox.put('start')
-
gevent.joinall([ping, pong])
原文
阅读(1941) | 评论(0) | 转发(0) |