全部博文(389)
分类: Python/Ruby
2017-12-07 10:19:44
taskflow实现的是一套类似数据库的回滚机制,英文不是很难,代码也很清晰,作者写的不错。 is a library to complete workflows/tasks in a highly available manner. The TaskFlow wiki and contains a pretty good overview. In the simplest terms: Taskflow is used to organize actions into lightweight task objects which are then linked together as an ordered sequence by a flow. This will be a quick overview summarizing the information already out there and doing a few examples to get the basics down.
The are currently three patterns supported:
Several are also supported (engines power the tasks and flows):
Just like any other task/flow based system you also like: PENDING, RUNNING, SUCCESS, FAILURE, etc. You can also create custom states.
Let’s jump into a simple example using the linear pattern. I’m going to fetch the contents and capture the status codes from two websites ( and ). The code is as follows:
from __future__ import print_function
import urllib2
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
def fetch(url):
request = urllib2.Request(url=url)
response = urllib2.urlopen(request)
return response.getcode()
class GoogleFetch(task.Task):
def execute(self, google_url, *args, **kwargs):
status_code = fetch(google_url)
print('Google Response Code: {}'.format(status_code))
class AmazonFetch(task.Task):
def execute(self, amazon_url, *args, **kwargs):
status_code = fetch(amazon_url)
print('Amazon Response Code: {}'.format(status_code))
if __name__ == "__main__":
flow = lf.Flow('simple-linear').add(
GoogleFetch(),
AmazonFetch()
)
taskflow.engines.run(flow, store=dict(google_url='',
amazon_url=''))
|
Note: Make sure you run the following in your :
$ pip install taskflow
|
If you run the above code you should see the following output:
Google Response Code: 200
Amazon Response Code: 200
|
You’ll notice they run in order, the AmazonFetch task will never run before the GoogleFetch task.
Ok, now let’s make sure we can examine the states of the flow (we won’t do the task states just yet). Modify the code to look like this:
from __future__ import print_function
import urllib2
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
def flow_watch(state, details):
print('Flow State: {}'.format(state))
print('Flow Details: {}'.format(details))
def fetch(url):
request = urllib2.Request(url=url)
response = urllib2.urlopen(request)
return response.getcode()
class GoogleFetch(task.Task):
def execute(self, google_url, *args, **kwargs):
status_code = fetch(google_url)
print('Google Response Code: {}'.format(status_code))
class AmazonFetch(task.Task):
def execute(self, amazon_url, *args, **kwargs):
status_code = fetch(amazon_url)
print('Amazon Response Code: {}'.format(status_code))
if __name__ == "__main__":
flow = lf.Flow('simple-linear-listen').add(
GoogleFetch(),
AmazonFetch()
)
engine = taskflow.engines.load(flow,
store=dict(
google_url='',
amazon_url=''))
engine.notifier.register('*', flow_watch)
engine.run()
|
We registered a wildcard listener which will report to the flow_watch function which in turn prints out the flow state and details.
Running the program now will show something similar to this:
Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine':
Google Response Code: 200
Amazon Response Code: 200
Flow State: SUCCESS
Flow Details: {'old_state': 'RUNNING', 'engine':
|
You can see the flow states change as program is started and when all the tasks successfully complete.
If you only wanted to capture a flow’s success state you could replace the wildcard with this:
engine.notifier.register('SUCCESS', flow_watch)
|
So what if you wanted to capture all the tasks states? You simply need to add the following:
engine.task_notifier.register('*', task_watch)
|
So what happens when there is an error? Let’s find out with this code where I’ve purposely botched the URL for Amazon:
from __future__ import print_function
import urllib2
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
def flow_watch(state, details):
print('Flow State: {}'.format(state))
print('Flow Details: {}'.format(details))
def task_watch(state, details):
print('Task State: {}'.format(state))
print('Task Details: {}'.format(details))
def fetch(url):
request = urllib2.Request(url=url)
response = urllib2.urlopen(request)
return response.getcode()
class GoogleFetch(task.Task):
def execute(self, google_url, *args, **kwargs):
status_code = fetch(google_url)
print('Google Response Code: {}'.format(status_code))
class AmazonFetch(task.Task):
def execute(self, amazon_url, *args, **kwargs):
status_code = fetch(amazon_url)
print('Amazon Response Code: {}'.format(status_code))
if __name__ == "__main__":
flow = lf.Flow('simple-linear-listen').add(
GoogleFetch(),
AmazonFetch()
)
engine = taskflow.engines.load(flow,
store=dict(
google_url='',
amazon_url='HELLO!'))
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
engine.run()
|
The output is:
Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine':
Task State: RUNNING
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Google Response Code: 200
Traceback (most recent call last):
Task State: SUCCESS
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: RUNNING
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: FAILURE
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result':
Flow State: FAILURE
Flow Details: {'old_state': 'RUNNING', 'engine':
Flow State: REVERTING
Flow Details: {'old_state': 'FAILURE', 'engine':
Task State: REVERTING
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTED
Task Details: {'task_uuid': '2c8ee423-b66b-43e1-90b3-bba71162a0e5', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTING
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: REVERTED
Task Details: {'task_uuid': '1eb812e2-3d85-4ef7-8dfb-3ec1589a79e3', 'result': None, 'task_name': '__main__.GoogleFetch'}
Flow State: REVERTED
Flow Details: {'old_state': 'REVERTING', 'engine':
|
Also, you will an exception dump – this is because we haven’t implemented any exception handling yet. We will get to that next. The important thing to notice here is that if one of the flow’s tasks fails, the whole thing fails and a revert will take happen (if you’ve coded it which we haven’t yet).
Let’s add exception handling as well as code to perform a revert.
from __future__ import print_function
import urllib2
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
def flow_watch(state, details):
print('Flow State: {}'.format(state))
print('Flow Details: {}'.format(details))
def task_watch(state, details):
print('Task State: {}'.format(state))
print('Task Details: {}'.format(details))
def fetch(url):
request = urllib2.Request(url=url)
response = urllib2.urlopen(request)
return response.getcode()
class GoogleFetch(task.Task):
def execute(self, google_url, *args, **kwargs):
status_code = fetch(google_url)
print('Google Response Code: {}'.format(status_code))
def revert(self, google_url, *args, **kwargs):
print('Magically Reverting the Google Call!')
class AmazonFetch(task.Task):
def execute(self, amazon_url, *args, **kwargs):
status_code = fetch(amazon_url)
print('Amazon Response Code: {}'.format(status_code))
def revert(self, amazon_url, *args, **kwargs):
print('Magically Reverting the Amazon Call!')
if __name__ == "__main__":
flow = lf.Flow('simple-linear-listen').add(
GoogleFetch(),
AmazonFetch()
)
engine = taskflow.engines.load(flow,
store=dict(
google_url='',
amazon_url='HELLO!'))
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
try:
engine.run()
except urllib2.URLError:
print("I think the URL is bad in one of the tasks...")
except Exception as ex:
print(ex.message)
|
Now if you run it you should see something like this:
Flow State: RUNNING
Flow Details: {'old_state': 'PENDING', 'engine':
Task State: RUNNING
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Google Response Code: 200
Task State: SUCCESS
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Task State: RUNNING
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: FAILURE
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result':
Flow State: FAILURE
Flow Details: {'old_state': 'RUNNING', 'engine':
Flow State: REVERTING
Flow Details: {'old_state': 'FAILURE', 'engine':
Task State: REVERTING
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Magically Reverting the Amazon Call!
Task State: REVERTED
Task Details: {'task_uuid': 'e052e44f-6653-47f0-b193-4dd374a4ac91', 'result': None, 'task_name': '__main__.AmazonFetch'}
Task State: REVERTING
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Magically Reverting the Google Call!
Task State: REVERTED
Task Details: {'task_uuid': '748240d1-78a7-4418-8233-25e1fe29caf0', 'result': None, 'task_name': '__main__.GoogleFetch'}
Flow State: REVERTED
Flow Details: {'old_state': 'REVERTING', 'engine':
I think the URL is bad in one of the tasks...
|
The AmazonFetch task causes an error, from there everything else is put in a failure state and any scripted revert code is called. In this case we simply print a message. In real life with real tasks, example: maybe entries going to numerous differing database back-ends you could then pull the newly inserted data back out that made it in successfully on previous tasks prior to the task that failed.
To fix this code simply pull out the HELLO! from the amazon_url:
engine = taskflow.engines.load(flow,
store=dict(
google_url='',
amazon_url=''))
|
This is just an introduction and we only covered a very small part of , and in fact, only one pattern: , but it serves as a good starting point.
You can grab a complete example .
来自:http://www.giantflyingsaucer.com/blog/?p=4896&utm_source=tuicool&utm_medium=referral