Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7893824
  • 博文数量: 701
  • 博客积分: 2150
  • 博客等级: 上尉
  • 技术积分: 13233
  • 用 户 组: 普通用户
  • 注册时间: 2011-06-29 16:28
个人简介

天行健,君子以自强不息!

文章分类

全部博文(701)

文章存档

2019年(2)

2018年(12)

2017年(76)

2016年(120)

2015年(178)

2014年(129)

2013年(123)

2012年(61)

分类: Python/Ruby

2014-12-15 23:09:20

三、tornado结合RabbitMQ实现异步任务处理

3.1 安装环境

1. 安装tornado
见文章《CentOS6.4安装python2.7.3环境和Tornado》

2. 安装 tornoda-celery
tornado-celery的安装很简单:
$ pip install tornado-celery
Downloading/unpacking tornado-celery
  Downloading tornado-celery-0.3.4.tar.gz
  Running setup.py egg_info for package tornado-celery 
... 
Successfully installed tornado-celery celery pika pytz billiard kombu anyjson amqp
Cleaning up...


3. 安装 RabbitMQ
使用celery需要选择一种broker(中间人)进行消息的接受和发送,中间人通常作为一种独立的服务,
常用的broker又RabbitMQ、Redis以及MongoDB等。
RabbitMQ 是AMPQ高级消息队列协议的实现,是使用最广泛的消息系统,
因为tornado-celery中使用redis无法使用callback,建议使用RabbitMQ作为broker。


安装并启动RabbitMQ
MAC上使用brew安装该服务程序
$ brew install rabbitmq


启动rabbitmq-server
$ sudo rabbitmq-server -detached


准备工作完成后就可以编码了

3.2. 示例程序

1. 创建并运行tasks.py
celery = Celery('tasks', broker='amqp://')
celery.conf.CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'amqp')


@celery.task(name='task.query_users')
def query_users(admin_id):
    # 耗时的数据库操作
    return db.query_all_users(admin_id)


2. 运行tasks.py
$ python tasks.py worker --loglevel=info


3. 创建tornado的 handler
    import tcelery
    tcelery.setup_nonblocking_producer()


    from tasks import query_users
    
    calss Users(RequestHandler):
        @asynchronous
        def get():
            # 参数通过args的list传递,回调通过callback指定
            query_users.apply_async(args=[admin_id], callback=self.on_success)
        def on_success(self, response):
            # 获取返回的结果
            users = response.result
            self.write(users)
            self.finish()
这样,query_users的请求就变成异步非阻塞的了,
同理,其他的耗时操作可以放到task中供tornado的请求调用。


四、使用AMQP和tornado实现异步任务处理

1. 安装AMQP


2. 创建 tasks.py


3. 运行 tasks.py


4. 创建Handler
在Tornado的RequestHandler中调用celery的任务:


from tornado import gen, web
import tcelery, tasks


tcelery.setup_nonblocking_producer()


class AsyncHandler(web.RequestHandler):
    @asynchronous
    def get(self):
        tasks.echo.apply_async(args=['Hello world!'], callback=self.on_result)


    def on_result(self, response):
        self.write(str(response.result))
        self.finish()


使用generator-based 接口调用任务:
class GenAsyncHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[3])
        self.write(str(response.result))
        self.finish()




NOTE: 当前的回调函数只能在 AMQP和 Redis后台下工作,
如果要使用Redis后台,必须安装 tornado-redis.


5. 运行测试用例
上面的测试用例要在AMQP后台下运行:
$ python examples/tasks.py worker
$ cd examples && python -m tcelery -A tasks
$ python tests/functests.py


要让上面的测试用例在Redis后台下运行,需要首先确认redis已经在运行,
然后:
$ CELERY_RESULT_BACKEND=redis:// python examples/tasks.py worker
$ cd examples && CELERY_RESULT_BACKEND=redis:// python -m tcelery -A tasks
$ python tests/functests.py


五、以web server的方式加载tornado-celery
$ cd tornado-celery
$ python -m tcelery --port=8888 --app=examples.tasks --address=0.0.0.0


异步的方式执行一个任务:
$ curl -X POST -d '{"args":["hello"]}'
{"task-id": "a24c9e38-4976-426a-83d6-6b10b4de7ab1", "state": "PENDING"}


获得上面任务执行的结果:
$ curl
{"task-id": "a24c9e38-4976-426a-83d6-6b10b4de7ab1", "state": "SUCCESS", "result": "hello"}


执行一个任务并获得它的结果:
$ curl -X POST -d '{"args":[1,2]}'
{"task-id": "fe3cc5a5-d11b-4b17-a6e2-e7fd2fba7ec6", "state": "SUCCESS", "result": 3}


执行一个任务并设置它的超时:
$ curl -X POST -d '{"args":[5],"timeout":1}'
{"task-id": "9ca78e26-bbb2-404c-b3bb-bc1c63cbdf41", "state": "REVOKED"}

六、API文档

1. POST /apply-async/(.*)/
通过发送消息来应用异步任务


请求示例:
POST /apply-async/examples.tasks.add/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0


{
    "args": [
        1,
        2
    ]
}


响应示例:
HTTP/1.1 200 OK
Content-Length: 71
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2


{
    "state": "PENDING",
    "task-id": "1c9be31f-3094-4319-8895-ad2f0654c699"
}
Status Codes:
200 – no error
400 – invalid request
404 – unknown task


2. GET /tasks/result/(.*)/
通过task-id获得任务结果 


请求示例:
GET /tasks/result/9ec42ba0-be59-488f-a445-4a007d83b954/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0


响应示例:
HTTP/1.1 200 OK
Content-Length: 84
Content-Type: application/json; charset=UTF-8
Etag: "0aef8448588cf040f1daa7a0244c0a7b93abfd71"
Server: TornadoServer/3.2


{
    "result": 3,
    "state": "SUCCESS",
    "task-id": "9ec42ba0-be59-488f-a445-4a007d83b954"
}
Status Codes:
200 – no error
400 – invalid request


3. DELETE /tasks/revoke/(.*)/
移除一个任务


请求示例:
DELETE /tasks/revoke/d776e835-33ac-447f-b27d-bb8529718ae6/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 0
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0


响应示例:
HTTP/1.1 200 OK
Content-Length: 51
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2


{
    "task-id": "d776e835-33ac-447f-b27d-bb8529718ae6"
}
Status Codes:
200 – no error
400 – invalid request


4. POST /apply/(.*)/
同步方式调用任务,当任务结束时函数返回


请求示例:
POST /apply/examples.tasks.add/ HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:8888
User-Agent: HTTPie/0.8.0


{
    "args": [
        1,
        2
    ]
}


响应示例:
HTTP/1.1 200 OK
Content-Length: 84
Content-Type: application/json; charset=UTF-8
Server: TornadoServer/3.2


{
    "result": 3,
    "state": "SUCCESS",
    "task-id": "2ce70595-a028-4e0d-b906-be2183fc6821"
}
Status Codes:
200 – no error
400 – invalid request
404 – unknown task

阅读(11400) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~