C++,python,热爱算法和机器学习
全部博文(1214)
分类: Python/Ruby
2014-08-13 17:52:19
We can become blind by seeing each day as a similar one. -- Paolo Coelho
我觉得最佳的方式是,只针对你需要的模块打 patch, 否则容易造成 gevent 的滥用。
gevent tutorial 称 monkeypatching 为 dark corners of Gevent.
当然,还有更精确的使用方法,例如:
参考:
import gevent.monkey
gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('')
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime'] print 'Process ', pid, datetime return json_result['datetime'] def synchronous(): for i in range(1,10):
fetch(i) def asynchronous(): threads = [] for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads) print 'Synchronous:' synchronous() print 'Asynchronous:' asynchronous()
协程中避免使用全局变量来进行状态统计,或者结果收集。
若要收集结果,可以使用 value 属性来获取每个 greenlet 的返回值。
gevent.spawn 时,协程已经开始执行。 gevent.joinall 只是用来等待所有协程执行完毕。
注意,在协程外无法通过平常的异常捕获方式获取内部异常
# Exceptions raised in the Greenlet, stay inside the Greenlet. try:
gevent.joinall([winner, loser])
except Exception as e: print('This will never be reached')
但是,可以通过 exception 属性获得。
所以,每个协程内的异常最好自己包住,通过错误码返回给外面。
为了避免僵尸进程, 需要这样
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread = gevent.spawn(run_forever)
thread.join()
为什么要 unpatch ?
首先需要证明 patch 是一个邪恶的东西
a.py
import gevent.monkey
gevent.monkey.patch_socket()
import socket print socket.socket #
b.py
import a
import socket print socket.socket #
从运行结果看,a 模块的 patch 影响到了 b 模块的 socket 模块。 而实际上,我们并不是所有时候都需要 patch 后的 socket 行为, "some tests rely on this patching, and some rely on not being patched."
如何避免这种影响呢?两种方法
即将 b.py 修改为
import a
import socket reload(socket) print socket.socket #
参考:
使用 gevent 的 pool
A pool is a structure designed for handling dynamic numbers of greenlets which need to be concurrency-limited. This is often desirable in cases where one wants to do many network or IO bound tasks in parallel.
在有大量 IO 并发的时候,需要限制并发量。同事限制在了 300. 具体根据机器性能, 测试决定。
示例代码:
使用 gevent.subprocess 并发调用命令