Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1239444
  • 博文数量: 389
  • 博客积分: 2874
  • 博客等级: 少校
  • 技术积分: 3577
  • 用 户 组: 普通用户
  • 注册时间: 2009-10-24 10:34
文章分类

全部博文(389)

文章存档

2020年(2)

2018年(39)

2017年(27)

2016年(3)

2015年(55)

2014年(92)

2013年(54)

2012年(53)

2011年(64)

分类: Python/Ruby

2017-12-07 10:19:44

taskflow实现的是一套类似数据库的回滚机制,英文不是很难,代码也很清晰,作者写的不错。 is a library to complete workflows/tasks in a highly available manner. The TaskFlow wiki  and contains a pretty good overview. In the simplest terms: Taskflow is used to organize actions into lightweight task objects which are then linked together as an ordered sequence by a flow. This will be a quick overview summarizing the information already out there and doing a few examples to get the basics down.

The are currently three patterns supported:

  • Linear: Runs a list of tasks/flows, one after the other in a serial manner.
  • Unordered: Runs a set of tasks/flows in any order, usually run in parallel manner and have no dependencies between tasks.
  • Graph: Runs a graph (set of nodes and edges between those nodes) composed of tasks/flows in dependency driven ordering

Several  are also supported (engines power the tasks and flows):

  • , think  here
  • Traditional- run inside your application’s existing framework

Just like any other task/flow based system you also like: PENDING, RUNNING, SUCCESS, FAILURE, etc. You can also create custom states.

Let’s jump into a simple example using the linear pattern. I’m going to fetch the contents and capture the status codes from two websites ( and ). The code is as follows:

from __future__ import print_function
 
import urllib2
 
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
 
 
def fetch(url):
    request = urllib2.Request(url=url)
    response = urllib2.urlopen(request)
    return response.getcode()
 
 
class GoogleFetch(task.Task):
    def execute(self, google_url, *args, **kwargs):
        status_code = fetch(google_url)
        print('Google Response Code: {}'.format(status_code))
 
 
class AmazonFetch(task.Task):
    def execute(self, amazon_url, *args, **kwargs):
        status_code = fetch(amazon_url)
        print('Amazon Response Code: {}'.format(status_code))
 
 
if __name__ == "__main__":
    flow = lf.Flow('simple-linear').add(
        GoogleFetch(),
        AmazonFetch()
    )
 
    taskflow.engines.run(flow, store=dict(google_url='',
                                          amazon_url=''))

Note: Make sure you run the following in your :

$ pip install taskflow

If you run the above code you should see the following output:

Google Response Code: 200
Amazon Response Code: 200

You’ll notice they run in order, the AmazonFetch task will never run before the GoogleFetch task.

Ok, now let’s make sure we can examine the states of the flow (we won’t do the task states just yet). Modify the code to look like this:

from __future__ import print_function
 
import urllib2
 
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
 
 
def flow_watch(state, details):
    print('Flow State: {}'.format(state))
    print('Flow Details: {}'.format(details))
 
 
def fetch(url):
    request = urllib2.Request(url=url)
    response = urllib2.urlopen(request)
    return response.getcode()
 
 
class GoogleFetch(task.Task):
    def execute(self, google_url, *args, **kwargs):
        status_code = fetch(google_url)
        print('Google Response Code: {}'.format(status_code))
 
 
class AmazonFetch(task.Task):
    def execute(self, amazon_url, *args, **kwargs):
        status_code = fetch(amazon_url)
        print('Amazon Response Code: {}'.format(status_code))
 
 
if __name__ == "__main__":
    flow = lf.Flow('simple-linear-listen').add(
        GoogleFetch(),
        AmazonFetch()
    )
 
    engine = taskflow.engines.load(flow,
                                   store=dict(
                                       google_url='',
                                       amazon_url=''))
 
    engine.notifier.register('*', flow_watch)
 
    engine.run()

We registered a wildcard listener which will report to the flow_watch function which in turn prints out the flow state and details.

Running the program now will show something similar to this:

Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '10451c69-23d8-4db3-afd2-fd1925e36a4e'}
Google Response Code: 200
Amazon Response Code: 200
Flow State: SUCCESS
Flow Details: {'old_state': 'RUNNING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '10451c69-23d8-4db3-afd2-fd1925e36a4e'}

You can see the flow states change as program is started and when all the tasks successfully complete.

If you only wanted to capture a flow’s success state you could replace the wildcard with this:

engine.notifier.register('SUCCESS', flow_watch)

So what if you wanted to capture all the tasks states? You simply need to add the following:

engine.task_notifier.register('*', task_watch)

So what happens when there is an error? Let’s find out with this code where I’ve purposely botched the URL for Amazon:

from __future__ import print_function
 
import urllib2
 
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
 
 
def flow_watch(state, details):
    print('Flow State: {}'.format(state))
    print('Flow Details: {}'.format(details))
 
 
def task_watch(state, details):
    print('Task State: {}'.format(state))
    print('Task Details: {}'.format(details))
 
 
def fetch(url):
    request = urllib2.Request(url=url)
    response = urllib2.urlopen(request)
    return response.getcode()
 
 
class GoogleFetch(task.Task):
    def execute(self, google_url, *args, **kwargs):
        status_code = fetch(google_url)
        print('Google Response Code: {}'.format(status_code))
 
 
class AmazonFetch(task.Task):
    def execute(self, amazon_url, *args, **kwargs):
        status_code = fetch(amazon_url)
        print('Amazon Response Code: {}'.format(status_code))
 
 
if __name__ == "__main__":
    flow = lf.Flow('simple-linear-listen').add(
        GoogleFetch(),
        AmazonFetch()
    )
 
    engine = taskflow.engines.load(flow,
                                   store=dict(
                                       google_url='',
                                       amazon_url='HELLO!'))
 
    engine.notifier.register('*', flow_watch)
    engine.task_notifier.register('*', task_watch)
 
    engine.run()

The output is:

Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '4eed8fc0-27d5-415a-ae66-e60980803e19'}
Task State: RUNNING
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Google Response Code: 200
Traceback (most recent call last):
Task State: SUCCESS
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: RUNNING
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: FAILURE
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': , 'task_name': '__main__.AmazonFetch'}
Flow State: FAILURE
Flow Details: {'old_state': 'RUNNING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '4eed8fc0-27d5-415a-ae66-e60980803e19'}
Flow State: REVERTING
Flow Details: {'old_state': 'FAILURE', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '4eed8fc0-27d5-415a-ae66-e60980803e19'}
Task State: REVERTING
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTED
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTING
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: REVERTED
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Flow State: REVERTED
Flow Details: {'old_state': 'REVERTING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': '4eed8fc0-27d5-415a-ae66-e60980803e19'}

Also, you will an exception dump – this is because we haven’t implemented any exception handling yet. We will get to that next. The important thing to notice here is that if one of the flow’s tasks fails, the whole thing fails and a revert will take happen (if you’ve coded it which we haven’t yet).

Let’s add exception handling as well as code to perform a revert.

from __future__ import print_function
 
import urllib2
 
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
 
 
def flow_watch(state, details):
    print('Flow State: {}'.format(state))
    print('Flow Details: {}'.format(details))
 
 
def task_watch(state, details):
    print('Task State: {}'.format(state))
    print('Task Details: {}'.format(details))
 
 
def fetch(url):
    request = urllib2.Request(url=url)
    response = urllib2.urlopen(request)
    return response.getcode()
 
 
class GoogleFetch(task.Task):
    def execute(self, google_url, *args, **kwargs):
        status_code = fetch(google_url)
        print('Google Response Code: {}'.format(status_code))
 
    def revert(self, google_url, *args, **kwargs):
        print('Magically Reverting the Google Call!')
 
 
class AmazonFetch(task.Task):
    def execute(self, amazon_url, *args, **kwargs):
        status_code = fetch(amazon_url)
        print('Amazon Response Code: {}'.format(status_code))
 
    def revert(self, amazon_url, *args, **kwargs):
        print('Magically Reverting the Amazon Call!')
 
 
if __name__ == "__main__":
    flow = lf.Flow('simple-linear-listen').add(
        GoogleFetch(),
        AmazonFetch()
    )
 
    engine = taskflow.engines.load(flow,
                                   store=dict(
                                       google_url='',
                                       amazon_url='HELLO!'))
 
    engine.notifier.register('*', flow_watch)
    engine.task_notifier.register('*', task_watch)
 
    try:
        engine.run()
    except urllib2.URLError:
        print("I think the URL is bad in one of the tasks...")
    except Exception as ex:
        print(ex.message)

Now if you run it you should see something like this:

Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': 'b4835f5e-119c-4f48-8e16-2579deb33591'}
Task State: RUNNING
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Google Response Code: 200
Task State: SUCCESS
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: RUNNING
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: FAILURE
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': , 'task_name': '__main__.AmazonFetch'}
Flow State: FAILURE
Flow Details: {'old_state': 'RUNNING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': 'b4835f5e-119c-4f48-8e16-2579deb33591'}
Flow State: REVERTING
Flow Details: {'old_state': 'FAILURE', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': 'b4835f5e-119c-4f48-8e16-2579deb33591'}
Task State: REVERTING
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Magically Reverting the Amazon Call!
Task State: REVERTED
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTING
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Magically Reverting the Google Call!
Task State: REVERTED
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Flow State: REVERTED
Flow Details: {'old_state': 'REVERTING', 'engine': , 'flow_name': 'simple-linear-listen', 'flow_uuid': 'b4835f5e-119c-4f48-8e16-2579deb33591'}
I think the URL is bad in one of the tasks...

The AmazonFetch task causes an error, from there everything else is put in a failure state and any scripted revert code is called. In this case we simply print a message. In real life with real tasks, example: maybe entries going to numerous differing database back-ends you could then pull the newly inserted data back out that made it in successfully on previous tasks prior to the task that failed.

To fix this code simply pull out the HELLO! from the amazon_url:

engine = taskflow.engines.load(flow,
                               store=dict(
                                   google_url='',
                                   amazon_url=''))

This is just an introduction and we only covered a very small part of , and in fact, only one pattern: , but it serves as a good starting point.

You can grab a complete example .

来自:http://www.giantflyingsaucer.com/blog/?p=4896&utm_source=tuicool&utm_medium=referral

阅读(1226) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~