三、tornado结合RabbitMQ实现异步任务处理
3.1 安装环境
1. 安装tornado
见文章《CentOS6.4安装python2.7.3环境和Tornado》
2. 安装 tornoda-celery
tornado-celery的安装很简单:
$ pip install tornado-celery
Downloading/unpacking tornado-celery
Downloading tornado-celery-0.3.4.tar.gz
Running setup.py egg_info for package tornado-celery
...
Successfully installed tornado-celery celery pika pytz billiard kombu anyjson amqp
Cleaning up...
3. 安装 RabbitMQ
使用celery需要选择一种broker(中间人)进行消息的接受和发送,中间人通常作为一种独立的服务,
常用的broker又RabbitMQ、Redis以及MongoDB等。
RabbitMQ 是AMPQ高级消息队列协议的实现,是使用最广泛的消息系统,
因为tornado-celery中使用redis无法使用callback,建议使用RabbitMQ作为broker。
安装并启动RabbitMQ
MAC上使用brew安装该服务程序
$ brew install rabbitmq
启动rabbitmq-server
$ sudo rabbitmq-server -detached
准备工作完成后就可以编码了
3.2. 示例程序
1. 创建并运行tasks.py
celery = Celery('tasks', broker='amqp://')
celery.conf.CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'amqp')
@celery.task(name='task.query_users')
def query_users(admin_id):
# 耗时的数据库操作
return db.query_all_users(admin_id)
2. 运行tasks.py
$ python tasks.py worker --loglevel=info
3. 创建tornado的 handler
import tcelery
tcelery.setup_nonblocking_producer()
from tasks import query_users
calss Users(RequestHandler):
@asynchronous
def get():
# 参数通过args的list传递,回调通过callback指定
query_users.apply_async(args=[admin_id], callback=self.on_success)
def on_success(self, response):
# 获取返回的结果
users = response.result
self.write(users)
self.finish()
这样,query_users的请求就变成异步非阻塞的了,
同理,其他的耗时操作可以放到task中供tornado的请求调用。
四、使用AMQP和tornado实现异步任务处理
1. 安装AMQP
2. 创建 tasks.py
3. 运行 tasks.py
4. 创建Handler
在Tornado的RequestHandler中调用celery的任务:
from tornado import gen, web
import tcelery, tasks
tcelery.setup_nonblocking_producer()
class AsyncHandler(web.RequestHandler):
@asynchronous
def get(self):
tasks.echo.apply_async(args=['Hello world!'], callback=self.on_result)
def on_result(self, response):
self.write(str(response.result))
self.finish()
使用generator-based 接口调用任务:
class GenAsyncHandler(web.RequestHandler):
@asynchronous
@gen.coroutine
def get(self):
response = yield gen.Task(tasks.sleep.apply_async, args=[3])
self.write(str(response.result))
self.finish()
NOTE: 当前的回调函数只能在 AMQP和 Redis后台下工作,
如果要使用Redis后台,必须安装 tornado-redis.
5. 运行测试用例
上面的测试用例要在AMQP后台下运行:
$ python examples/tasks.py worker
$ cd examples && python -m tcelery -A tasks
$ python tests/functests.py
要让上面的测试用例在Redis后台下运行,需要首先确认redis已经在运行,
然后:
$ CELERY_RESULT_BACKEND=redis:// python examples/tasks.py worker
$ cd examples && CELERY_RESULT_BACKEND=redis:// python -m tcelery -A tasks
$ python tests/functests.py
五、以web server的方式加载tornado-celery
$ cd tornado-celery
$ python -m tcelery --port=8888 --app=examples.tasks --address=0.0.0.0
异步的方式执行一个任务:
$ curl -X POST -d '{"args":["hello"]}'
{"task-id": "a24c9e38-4976-426a-83d6-6b10b4de7ab1", "state": "PENDING"}
获得上面任务执行的结果:
$ curl
{"task-id": "a24c9e38-4976-426a-83d6-6b10b4de7ab1", "state": "SUCCESS", "result": "hello"}
执行一个任务并获得它的结果:
$ curl -X POST -d '{"args":[1,2]}'
{"task-id": "fe3cc5a5-d11b-4b17-a6e2-e7fd2fba7ec6", "state": "SUCCESS", "result": 3}
执行一个任务并设置它的超时:
$ curl -X POST -d '{"args":[5],"timeout":1}'
{"task-id": "9ca78e26-bbb2-404c-b3bb-bc1c63cbdf41", "state": "REVOKED"}
六、API文档
1. POST /apply-async/(.*)/
通过发送消息来应用异步任务
请求示例:
POST /apply-async/examples.tasks.add/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0
{
"args": [
1,
2
]
}
响应示例:
HTTP/1.1 200 OK
Content-Length: 71
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2
{
"state": "PENDING",
"task-id": "1c9be31f-3094-4319-8895-ad2f0654c699"
}
Status Codes:
200 – no error
400 – invalid request
404 – unknown task
2. GET /tasks/result/(.*)/
通过task-id获得任务结果
请求示例:
GET /tasks/result/9ec42ba0-be59-488f-a445-4a007d83b954/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0
响应示例:
HTTP/1.1 200 OK
Content-Length: 84
Content-Type: application/json; charset=UTF-8
Etag: "0aef8448588cf040f1daa7a0244c0a7b93abfd71"
Server: TornadoServer/3.2
{
"result": 3,
"state": "SUCCESS",
"task-id": "9ec42ba0-be59-488f-a445-4a007d83b954"
}
Status Codes:
200 – no error
400 – invalid request
3. DELETE /tasks/revoke/(.*)/
移除一个任务
请求示例:
DELETE /tasks/revoke/d776e835-33ac-447f-b27d-bb8529718ae6/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 0
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0
响应示例:
HTTP/1.1 200 OK
Content-Length: 51
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2
{
"task-id": "d776e835-33ac-447f-b27d-bb8529718ae6"
}
Status Codes:
200 – no error
400 – invalid request
4. POST /apply/(.*)/
同步方式调用任务,当任务结束时函数返回
请求示例:
POST /apply/examples.tasks.add/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0
{
"args": [
1,
2
]
}
响应示例:
HTTP/1.1 200 OK
Content-Length: 84
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2
{
"result": 3,
"state": "SUCCESS",
"task-id": "2ce70595-a028-4e0d-b906-be2183fc6821"
}
Status Codes:
200 – no error
400 – invalid request
404 – unknown task