Chinaunix首页 | 论坛 | 博客
  • 博客访问: 235670
  • 博文数量: 57
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 557
  • 用 户 组: 普通用户
  • 注册时间: 2015-10-01 18:05
文章分类

全部博文(57)

文章存档

2017年(57)

我的朋友

分类: Python/Ruby

2017-12-20 14:57:48

多worker,多队列
代码示例如下:

点击(此处)折叠或打开

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. from celery import Celery

  4. app = Celery()
  5. app.config_from_object("celeryconfig")

  6. @app.task
  7. def taskA(x,y):
  8. return x + y

  9. @app.task
  10. def taskB(x,y,z):
  11. return x + y + z


  12. @app.task
  13. def add(x,y):
  14. return x + y


点击(此处)折叠或打开

  1. [root@localhost celery]# cat celeryconfig.py
  2. #!/usr/bin/env python
  3. #-*- coding:utf-8 -*-

  4. from kombu import Exchange,Queue

  5. BROKER_URL = "redis://192.168.48.131:6379/1"
  6. CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"

  7. CELERY_QUEUES = (
  8. Queue("default",Exchange("default"),routing_key="default"),
  9. Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
  10. Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
  11. )

  12. CELERY_ROUTES = {
  13. 'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
  14. 'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}

配置文件一般单独写在一个文件中。


启动一个worker来指定taskA

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B


from tasks import *

re1 = taskA.delay(100, 200)

print(re1.result)

re2 = taskB.delay(1, 2, 3)

print(re2.result)

re3 = add.delay(1, 2, 3)

print(re3.status)     #PENDING

我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

celery -A tasks worker -l info -n worker.%h -Q celery

print(re3.status)    #SUCCESS

Celery与定时任务

下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:


CELERY_TIMEZONE = 'UTC'

CELERYBEAT_SCHEDULE = {

    'taskA_schedule' : {

        'task':'tasks.taskA',

        'schedule':20,

        'args':(5,6)

    },

    'taskB_scheduler' : {

        'task':"tasks.taskB",

        "schedule":200,

        "args":(10,20,30)

    },

    'add_schedule': {

        "task":"tasks.add",

        "schedule":10,

        "args":(1,2)

    }

}


注意格式,否则会有问题



阅读(2425) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~