Chinaunix首页 | 论坛 | 博客
  • 博客访问: 667561
  • 博文数量: 121
  • 博客积分: 4034
  • 博客等级: 上校
  • 技术积分: 1439
  • 用 户 组: 普通用户
  • 注册时间: 2010-04-28 12:42
文章分类

全部博文(121)

文章存档

2017年(8)

2016年(10)

2013年(2)

2012年(3)

2011年(18)

2010年(80)

分类: Python/Ruby

2010-10-19 22:11:34

Python几种并发实现方案的性能比较
 
1.  前言
偶然看到Erlang vs. Stackless python: a first benchmark,对Erlang和Stackless Python的并发处理性能进行了实验比较,基本结论认为二者有比较相近的性能。我看完产生的问题是,Stackless Python与Python的其他并发实现机制性能又会有多大区别呢,比如线程和进程。因此我采用与这篇文章相同的办法来对Stackless Python、普通Python的thread模块、普通Python的threading模块、普通Python的processing模块这四种并发实现方案进行了性能实验,并将实验过程和基本结果记录在这里。
后来看到了基于greenlet实现的高性能网络框架Eventlet,因而更新了实验方案,将greenlet也加入了比较,虽然greenlet并非是一种真正意义上的并发处理,而是在单个线程下对程序块进行切换轮流执行。
(Edit Section ↓)
2.  实验方案
实验方案与Erlang vs. Stackless python: a first benchmark是相同的,用每种方案分别给出如下问题的实现,记录完成整个处理过程的总时间来作为评判性能的依据:
由n个节点组成一个环状网络,在上面传送共m个消息。
将每个消息(共m个),逐个发送给1号节点。
第1到n-1号节点在接收到消息后,都转发给下一号节点。
第n号节点每次收到消息后,不再继续转发。
当m个消息都从1号逐个到达第n号节点时,认为全部处理结束。
(Edit Section ↓)
2.1  硬件平台
Macbook Pro 3,1上的Vmware Fusion 1.0虚拟机中,注意这里给虚拟机只启用了cpu的单个核心:
原始Cpu:Core 2 Duo,2.4 GHz,2核心,4 MB L2 缓存,总线速度800 MHz
分配给虚拟机的内存:796M
(Edit Section ↓)
2.2  软件平台
Vmware Fusion 1.0下的Debian etch:
原始Python:Debian发行版自带Python 2.4.4
Python 2.4.4 Stackless 3.1b3 060516
processing-0.52-py2.4-linux-i686.egg
原始Python下的greenlet实现:py lib 0.9.2
(Edit Section ↓)
3.  实验过程及结果
各方案的实现代码见后文。实验时使用time指令记录每次运行的总时间,选用的都是不做任何输出的no_io实现(Python的print指令还是挺耗资源的,如果不注释掉十有八九得影响测试结果),每次执行时设定n=300,m=10000(Erlang vs. Stackless python: a first benchmark文章中认为n可以设置为300,m则可以取10000到90000之间的数值分别进行测试)。
(Edit Section ↓)
3.1  Stackless Python的实验结果
real 0m1.651suser 0m1.628ssys 0m0.020s
即使将m扩大到30000,实验结果仍然很突出:
real 0m4.749suser 0m4.716ssys 0m0.028s
(Edit Section ↓)
3.2  使用thread模块的实验结果
real 1m13.009suser 0m2.476ssys 0m59.028s
(Edit Section ↓)
3.3  使用threading模块配合Queue模块的实验结果
不太稳定,有时候这样:
real 1m9.222suser 0m34.418ssys 0m34.622s
也有时这样:
real 2m14.016suser 0m6.644ssys 2m7.260s
(Edit Section ↓)
3.4  使用processing模块配合Queue模块的实验结果
real 3m43.539suser 0m15.345ssys 3m27.953s
(Edit Section ↓)
3.5  greenlet模块的实验结果
real 0m9.225suser 0m0.644ssys 0m8.581s
(Edit Section ↓)
3.6  eventlet模块的实验结果
注意!eventlet 的这个实验结果是后来增补的,硬件平台没变,但是是直接在 OSX 自带 Python 2.5 环境下执行出来的,同时系统中还有 Firefox 等很多程序也在争夺系统资源。因此只能作为大致参考,不能与其他几组数据作直接对比。(其中 eventlet 的版本是 0.9.5)
real    0m21.610suser    0m20.713ssys     0m0.215s
(Edit Section ↓)
4.  结论与分析
(Edit Section ↓)
4.1  Stackless Python
毫无疑问,Stackless Python几乎有匪夷所思的并发性能,比其他方案快上几十倍,而且借助Stackless Python提供的channel机制,实现也相当简单。也许这个结果向我们部分揭示了沈仙人基于Stackless Python实现的Eurasia3能够提供相当于c语言效果的恐怖并发性能的原因。
(Edit Section ↓)
4.2  Python线程
从道理上来讲,thread模块似乎应该和threading提供基本相同的性能,毕竟threading只是对thread的一种封装嘛,后台机制应该是一致的。或许threading由于本身类实例维护方面的开销,应该会比直接用thread慢一点。从实验结果来看,二者性能也确实差不多。只是不大明白为何threading方案的测试结果不是很稳定,即使对其他方案的测试运行多次,误差也不会像threading这么飘。从代码实现体验来说,用threading配合Queue比直接用thread实在是轻松太多了,并且出错的机会也要少很多。
(Edit Section ↓)
4.3  Python进程
processing模块给出的进程方案大致比thread线程要慢一倍,并且这是在我特意调整虚拟机给它预备了足够空闲内存、避免使用交换分区的情况下取得的(特意分给虚拟机700多M内存就是为了这个)。而其他方案仅仅占用数M内存,完全无需特意调大可用内存总量。当然,如果给虚拟机多启用几个核心的话,processing也许会占上点便宜,毕竟目前thread模块是不能有效利用多cpu资源的(经实验,Stackless Python在开启双核的情况下表现的性能和单核是一样的,说明也是不能有效利用多cpu)。因此一种比较合理的做法是根据cpu的数量,启用少量几个进程,而在进程内部再开启线程进行实际业务处理,这也是目前Python社区推荐的有效利用多cpu资源的办法。好在processing配合其自身提供的 Queue模块,编程体验还是比较轻松的。
(Edit Section ↓)
4.4  greenlet超轻量级方案
基于greenlet的实现则性能仅次于Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一个数量级。其实greenlet不是一种真正的并发机制,而是在同一线程内,在不同函数的执行代码块之间切换,实施“你运行一会、我运行一会”,并且在进行切换时必须指定何时切换以及切换到哪。greenlet的接口是比较简单易用的,但是使用greenlet时的思考方式与其他并发方案存在一定区别。线程/进程模型在大逻辑上通常从并发角度开始考虑,把能够并行处理的并且值得并行处理的任务分离出来,在不同的线程 /进程下运行,然后考虑分离过程可能造成哪些互斥、冲突问题,将互斥的资源加锁保护来保证并发处理的正确性。greenlet则是要求从避免阻塞的角度来进行开发,当出现阻塞时,就显式切换到另一段没有被阻塞的代码段执行,直到原先的阻塞状况消失以后,再人工切换回原来的代码段继续处理。因此,greenlet本质是一种合理安排了的串行,实验中greenlet方案能够得到比较好的性能表现,主要也是因为通过合理的代码执行流程切换,完全避免了死锁和阻塞等情况(执行带屏幕输出的ring_greenlet.py我们会看到脚本总是一个一个地处理消息,把一个消息在环上从头传到尾之后,再开始处理下一个消息)。因为greenlet本质是串行,因此在没有进行显式切换时,代码的其他部分是无法被执行到的,如果要避免代码长时间占用运算资源造成程序假死,那么还是要将greenlet与线程/进程机制结合使用(每个线程、进程下都可以建立多个greenlet,但是跨线程/进程时 greenlet之间无法切换或通讯)。
Stackless则比较特别,对很多资源从底层进行了并发改造,并且提供了channel等更适合“并发”的通讯机制实现,使得资源互斥冲突的可能性大大减小,并发性能自然得以提高。粗糙来讲,greenlet是“阻塞了我就先干点儿别的,但是程序员得明确告诉 greenlet能先干点儿啥以及什么时候回来”;Stackless则是“东西我已经改造好了,你只要用我的东西,并发冲突就不用操心,只管放心大胆地并发好了”。greenlet应该是学习了Stackless的上下文切换机制,但是对底层资源没有进行适合并发的改造。并且实际上greenlet也没有必要改造底层资源的并发性,因为它本质是串行的单线程,不与其他并发模型混合使用的话是无法造成对资源的并发访问的。
(Edit Section ↓)
greenlet 封装后的 eventlet 方案
eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。并且这个解决方案源自著名虚拟现实游戏“第二人生”,可以说是久经考验的新兴并发处理模型。其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。
再有,虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。
在性能测试结果方面,eventlet 消耗的运行时间大致是 greenlet 方案的 3 到 5 倍,而 Python 标准线程模型的 thread 方式消耗的运行时间大致是 eventlet 测试代码的 8 到 10 倍。其中前者可能是因为我们在 eventlet 的测试代码中,使用队列机制来完成所有的消息传递,而队列上的访问互斥保护可能额外消耗了一些运算资源。总体而言,eventlet 模型的并发性能虽然比 Stackless Python 和直接使用 greenlet 有一定差距,但仍然比标准线程模型有大约一个数量级的优势,这也就不奇怪近期很多强调并发性能的网络服务器实现采取 eventlet 、线程、进程三者组合使用的实现方案。
(Edit Section ↓)
5.  实验代码
实验代码下载:
版本3 下载:增加了 eventlet 方案的实验代码。
版本2 下载:增加了 greenlet 方案的实验代码。
版本1 下载:包括 Stackless Python 、 thread 、 threading 、 processing 四种方案的实验代码。
为方便阅读,将实验中用到的几个脚本的代码粘贴如下,其中Stackless Python方案的代码实现直接取自Erlang vs. Stackless python: a first benchmark:
(Edit Section ↓)
5.1  ring_no_io_slp.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import stackless as SL
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    firstP = cin = SL.channel()
    for s in xrange(1, n):
        seqn = s
        cout = SL.channel()
        # # print("*> s = %d" % (seqn, ))
        t = SL.tasklet(loop)(seqn, cin, cout)
        cin = cout
    else:
        seqn = s+1
        # # print("$> s = %d" % (seqn, ))
        t = SL.tasklet(mloop)(seqn, cin)
    for r in xrange(m-1, -1, -1):
        # # print("+ sending Msg#  %d" % r)
        firstP.send(r)
    SL.schedule()
def loop(s, cin, cout):
    while True:
        r = cin.receive()
        cout.send(r)
        if r > 0:
            # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
            break
def mloop(s, cin):
    while True:
        r = cin.receive()
        if r > 0:
            # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
            break
 
def pid(): return repr(SL.getcurrent()).split()[-1][2:-1]
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
(Edit Section ↓)
5.2  ring_no_io_thread.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys, time
import thread
 
SLEEP_TIME = 0.0001
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    locks = [thread.allocate_lock() for i in xrange(n)]
    firstP = cin = []
    cin_lock_id = 0
    for s in xrange(1, n):
        seqn = s
        cout = []
        cout_lock_id = s
        # print("*> s = %d" % (seqn, ))
        thread.start_new_thread(loop, (seqn, locks, cin, cin_lock_id, cout, cout_lock_id))
        cin = cout
        cin_lock_id = cout_lock_id
    else:
        seqn = s+1
        # print("$> s = %d" % (seqn, ))
        thread.start_new_thread(mloop, (seqn, locks, cin, cin_lock_id))
    for r in xrange(m-1, -1, -1):
        # print("+ sending Msg#  %d" % r)
        lock = locks[0]
        lock.acquire()
        firstP.append(r)
        lock.release()
        time.sleep(SLEEP_TIME)
    try:
        while True:
            time.sleep(SLEEP_TIME)
    except:
        pass
def loop(s, locks, cin, cin_lock_id, cout, cout_lock_id):
    while True:
        lock = locks[cin_lock_id]
        lock.acquire()
        if len(cin) > 0:
            r = cin.pop(0)
            lock.release()
        else:
            lock.release()
            time.sleep(SLEEP_TIME)
            continue
        lock = locks[cout_lock_id]
        lock.acquire()
        cout.append(r)
        lock.release()
        if r > 0:
            # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
            break
def mloop(s, locks, cin, cin_lock_id):
    while True:
        lock = locks[cin_lock_id]
        lock.acquire()
        if len(cin) > 0:
            r = cin.pop(0)
            lock.release()
        else:
            lock.release()
            time.sleep(SLEEP_TIME)
            continue
        if r > 0:
            # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
            break
    thread.interrupt_main()
 
def pid(): return thread.get_ident()
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
(Edit Section ↓)
5.3  ring_no_io_queue.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import threading, Queue
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    firstP = cin = Queue.Queue()
    for s in xrange(1, n):
        seqn = s
        cout = Queue.Queue()
        # print("*> s = %d" % (seqn, ))
        t = Loop(seqn, cin, cout)
        t.setDaemon(False)
        t.start()
        cin = cout
    else:
        seqn = s+1
        # print("$> s = %d" % (seqn, ))
        t = MLoop(seqn, cin)
        t.setDaemon(False)
        t.start()
    for r in xrange(m-1, -1, -1):
        # print("+ sending Msg#  %d" % r)
        firstP.put(r)
class Loop(threading.Thread):
    def __init__(self, s, cin, cout):
        threading.Thread.__init__(self)
        self.cin = cin
        self.cout = cout
        self.s = s
    def run(self):
        while True:
            r = self.cin.get()
            self.cout.put(r)
            if r > 0:
                 # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
                 pass
            else:
                 # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), self.s))
                 break
class MLoop(threading.Thread):
    def __init__(self, s, cin):
        threading.Thread.__init__(self)
        self.cin = cin
        self.s = s
    def run(self):
        while True:
            r = self.cin.get()
            if r > 0:
                # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
                pass
            else:
                # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), self.s))
                break
 
def pid(): return threading.currentThread()
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
(Edit Section ↓)
5.4  ring_no_io_proc.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import processing, Queue
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    firstP = cin = processing.Queue()
    for s in xrange(1, n):
        seqn = s
        cout = processing.Queue()
        # print("*> s = %d" % (seqn, ))
        p = processing.Process(target = loop, args = [seqn, cin, cout])
        p.start()
        cin = cout
    else:
        seqn = s+1
        # print("$> s = %d" % (seqn, ))
        p = processing.Process(target = mloop, args = [seqn, cin])
        p.start()
    for r in xrange(m-1, -1, -1):
        # print("+ sending Msg#  %d" % r)
        firstP.put(r)
    p.join()
def loop(s, cin, cout):
    while True:
        r = cin.get()
        cout.put(r)
        if r > 0:
             # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
             pass
        else:
             # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
             break
def mloop(s, cin):
    while True:
        r = cin.get()
        if r > 0:
            # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
            break
 
def pid(): return processing.currentProcess()
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
(Edit Section ↓)
5.5  ring_no_io_greenlet.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
from py.magic import greenlet
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    glets = [greenlet.getcurrent()]
    for s in xrange(1, n):
        seqn = s
        glets.append(greenlet(loop))
        # print("*> s = %d" % (seqn, ))
    else:
        seqn = s+1
        glets.append(greenlet(mloop))
        # print("$> s = %d" % (seqn, ))
    glets[-1].switch(seqn, glets)
    for r in xrange(m-1, -1, -1):
        # print("+ sending Msg#  %d" % r)
        glets[1].switch(r)
def loop(s, glets):
    previous = glets[s - 1]
    next = glets[s + 1]
    if s > 1:
        r = previous.switch(s - 1, glets)
    else:
        r = previous.switch()
    while True:
        if r > 0:
            # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("loop", s), s, r))
            pass
        else:
            # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid("loop", s), s))
            break
        next.switch(r)
        r = previous.switch()
    next.switch(r)
def mloop(s, glets):
    previous = glets[s - 1]
    r = previous.switch(s - 1, glets)
    while True:
        if r > 0:
            # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("mloop", s), s, r))
            pass
        else:
            # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid("mloop", s), s))
            break
        r = previous.switch()
 
def pid(func, s): return "<<%s(Greenlet-%d, started)>>" % (func, s)
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
(Edit Section ↓)
5.6  ring_no_io_eventlet.py
[Get Code]
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import eventlet
 
def run_benchmark(n, m):
    # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
    firstP = cin = eventlet.Queue()
    for s in xrange(1, n):
        seqn = s
        cout = eventlet.Queue()
        # print("*> s = %d" % (seqn, ))
        eventlet.spawn_n(loop, seqn, cin, cout)
        cin = cout
    else:
        seqn = s+1
        # print("$> s = %d" % (seqn, ))
    for r in xrange(m-1, -1, -1):
        # print("+ sending Msg#  %d" % r)
        firstP.put(r)
    mloop(seqn, cin)
def loop(s, cin, cout):
    while True:
        r = cin.get()
        cout.put(r)
        if r > 0:
            # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
            break
def mloop(s, cin):
    while True:
        r = cin.get()
        if r > 0:
            # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
            pass
        else:
            # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
            break
 
def pid(): return eventlet.greenthread.getcurrent()
 
if __name__ == '__main__':
    run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
 
阅读(819) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~