Chinaunix首页 | 论坛 | 博客
  • 博客访问: 455855
  • 博文数量: 45
  • 博客积分: 2526
  • 博客等级: 少校
  • 技术积分: 478
  • 用 户 组: 普通用户
  • 注册时间: 2008-05-12 21:04
文章分类

全部博文(45)

文章存档

2014年(1)

2011年(1)

2010年(3)

2009年(22)

2008年(18)

我的朋友

分类: Python/Ruby

2011-04-22 23:28:39

不解释,直接上代码:

#!/usr/bin/env python
#
# Description: demo codes to show call function in parallel
#
# Author : Ray Chen (chenrano2002@gmail.com)
# Date : 2011-04-21

import os,sys,time,random
import threading

DEBUG = False

def run_function_in_parallel(func_list):
    for fun_thread in func_list:
        if not isinstance(fun_thread, FuncThread):
            continue
        fun_thread.start()
    # wait all thread finish
    for fun_thread in func_list:
        if not isinstance(fun_thread, FuncThread):
            continue
        fun_thread.join()
    # end for

class FuncThread(threading.Thread):
    """Class of using threading to run function"""
    def __init__(self, function, *args, **kwargs):
        self._function = function
        self._args = args
        self._kwargs = kwargs
        threading.Thread.__init__(self,verbose=DEBUG)
        
    def __str__(self):
        s = []
        s.append("function name: " + str(self._function) )
        s.append("turple optional parameters:" + str(self._args) )
        s.append("dict optional parameters:" + str(self._kwargs) )
        return '\n'.join(s)

    def run(self):
        """override threading's run routines"""
        return self._function(*self._args, **self._kwargs)

class Action(object):
    """
        Action object to encapsulate calling functions
    "
""
 
    def __init__(self, function, *args, **kwargs):
        self.function = function
        self.args = args
        self.kwargs = kwargs
 
    def __str__(self):
        s = []
        s.append("function name: " + str(self.function) )
        s.append("turple optional parameters:" + str(self.args) )
        s.append("dict optional parameters:" + str(self.kwargs) )
        return '\n'.join(s)
 
    def do(self):
 
        ret = None
        if callable(self.function):
            if DEBUG:
                print "call %s " %self.function

            try:
                ret = self.function(*self.args, **self.kwargs)
            except:
                if DEBUG:
                    print "call function failed, type = %s, value = %s, \
                          trackback = %s"
%(sys.exc_info())
                pass
        return ret
 
class ActionList(list):
    """
        Action List Handle
    "
""
 
    def __init__(self, actions=[]):
        self.action_list = actions
 
    def add(self, action):
        if isinstance(action, Action):
            self.action_list.append(action)
 
    def do_in_parallel(self):
 
        if len(self.action_list) == 1:
            return self.action_list[0].do()
 
        pid_list = []
        ret_list = []
        for action in self.action_list:
            pid = os.fork()
            if pid > 0:
                # parent process
                if DEBUG:
                    print "parent process, push %d..." %pid
                pid_list.append(pid)
            else:
                # child process
                if DEBUG:
                    print "run child process: %d..." %os.getpid()
                ret = action.do()
                ret_list.append(ret)
 
                # finish this child process
                sys.exit(0)
        # end for
 
        # Parent process wait for all Child process over
        for pid in pid_list:
            if DEBUG:
                print "wait for %d" %pid
            os.waitpid(pid, os.WNOHANG)
 
        return ret_list
 
def do_action_in_parallel(action_list):
    actions = ActionList(action_list)
    return actions.do_in_parallel()

if __name__ == "__main__":

    print "======start testing========="

    def funA(word):
        time.sleep(random.randint(1, 6))
        print "say", word
        return word

    def funB(config={}):
        time.sleep(random.randint(1, 6))
        print config
        return config

    def funC(key, *args):
        time.sleep(random.randint(1, 6))
        print key, args
        return (key, args)

    def funD(key, *args, **kwargs):
        time.sleep(random.randint(1, 6))
        print key, args, kwargs
        return [key, args, kwargs]

    a = Action(funA, "hello, funA")
    b = Action(funB, {"name":"funB"})
    c = Action(funC, 1, "optional", "funC")
    d = Action(funD, 2, "optional", {"name":"funD"}, index=1)

    ac = ActionList()
    ac.add(a)
    ac.add(b)
    ac.add(c)
    ac.add(d)
    print ac.do_in_parallel()

    time.sleep(30)
    print "======another handy way to run========="
    print do_action_in_parallel([a,b,c,d])

    time.sleep(30)
    print "=====try threading to run mutil function======"
    fa = FuncThread(funA, "hello, funA")
    fb = FuncThread(funB, {'name':"funB"})
    fc = FuncThread(funC, 1, "optional", "funC")
    fd = FuncThread(funD, 2, {'name':'funD'}, index=1)
    run_function_in_parallel([fa, fb, fc, fd])


上结果:

$ python action.py
======start testing=========
[]
say hello, funA
{'name': 'funB'}
1 ('optional', 'funC')
2 ('optional', {'name': 'funD'}) {'index': 1}
======another handy way to run=========
[]
say hello, funA
{'name': 'funB'}
1 ('optional', 'funC')
2 ('optional', {'name': 'funD'}) {'index': 1}
=====try threading to run mutil function======
2 ({'name': 'funD'},) {'index': 1}
say hello, funA
{'name': 'funB'}
1 ('optional', 'funC')



阅读(6059) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~