Chinaunix首页 | 论坛 | 博客
  • 博客访问: 19912770
  • 博文数量: 679
  • 博客积分: 10495
  • 博客等级: 上将
  • 技术积分: 9308
  • 用 户 组: 普通用户
  • 注册时间: 2006-07-18 10:51
文章分类

全部博文(679)

文章存档

2012年(5)

2011年(38)

2010年(86)

2009年(145)

2008年(170)

2007年(165)

2006年(89)

分类: Python/Ruby

2010-02-09 15:01:55

import os
import sys
import threading

from robot.errors import FrameworkError

if os.name == 'java':
    from java.lang import Runnable, Throwable
    from java.lang import Thread as JavaThread
    java_exceptions = (Throwable,)
else:
    from stoppablethread import StoppablePythonThread
    class Runnable:
        pass
    java_exceptions = ()


class _FakeSemaphore:
    def acquire(self):
        pass
    def release(self):
        pass
    
def Semaphore():
    # Cygwin Python threads are buggy so use a fake semaphore when possible
    if sys.platform.count('cygwin') > 0 \
            and threading.currentThread().getName() == 'MainThread':
        return _FakeSemaphore()
    return threading.Semaphore()
       

Event = threading.Event


def current_thread():
    if os.name == 'java':
        return JavaThread.currentThread()
    return threading.currentThread()


class Runner(Runnable):

    def __init__(self, runnable, args=None, kwargs=None, notifier=None):
        self._runnable = runnable
        self._args = args is not None and args or ()
        self._kwargs = kwargs is not None and kwargs or {}
        self._notifier = notifier is not None and notifier or threading.Event()
        self._result = None
        self._error = None

    def run(self):
        if self.is_done():
            raise FrameworkError('Runner can be run only once')
        try:
            self._result = self._runnable(*self._args, **self._kwargs)
        except java_exceptions, error:
            self._error = error
        except:
            self._error = sys.exc_info()[1]
        self._notifier.set()
       
    __call__ = run

    def is_done(self):
        return self._notifier.isSet()

    def get_result(self):
        if not self.is_done():
            self._notifier.wait()
        if self._error is not None:
            raise self._error
        return self._result
   
   
def Thread(runner, stoppable=False, daemon=False, name=None):
    if os.name == 'java':       
        thread = JavaThread(runner)   # This is always stoppable
    elif not stoppable:
        thread = threading.Thread(target=runner)
    else:
        thread = StoppablePythonThread(target=runner)
    thread.setDaemon(daemon)
    if name is not None:
        thread.setName(name)
    return thread

文件路径:robotframework-2.1.2\src\robot\utils\robotthread.py
功能:构建了线程模型。里面较多的考虑了java和cygwin,可以不涉及
阅读(27390) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~