Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1399813
  • 博文数量: 244
  • 博客积分: 3321
  • 博客等级: 中校
  • 技术积分: 2704
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-26 08:17
个人简介

微信公众号:杰夫弹弹看

文章分类

全部博文(244)

文章存档

2018年(4)

2017年(32)

2016年(25)

2015年(28)

2014年(27)

2013年(34)

2012年(25)

2011年(30)

2010年(39)

分类: Python/Ruby

2013-10-18 17:06:25

qpid 是符合AMQP规范的apache 许可证的消息中间件,目前在openstack中作为一种可选的消息中间件服务配置,其他还有rabbitmq和zeroMQ

如果看过qpid的编程API文档的话,会看到比较简单的一个例子, 如下(接收消息打印消息内容)

点击(此处)折叠或打开

  1. #following code is from
  2. # http://qpid.apache.org/releases/qpid-0.24/messaging-api/python/examples/drain.html


  3. import optparse
  4. from qpid.messaging import *
  5. from qpid.util import URL
  6. from qpid.log import enable, DEBUG, WARN

  7. parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
  8.                                description="Drain messages from the supplied address.")
  9. parser.add_option("-b", "--broker", default="localhost",
  10.                   help="connect to specified BROKER (default %default)")
  11. parser.add_option("-c", "--count", type="int",
  12.                   help="number of messages to drain")
  13. parser.add_option("-f", "--forever", action="store_true",
  14.                   help="ignore timeout and wait forever")
  15. parser.add_option("-r", "--reconnect", action="store_true",
  16.                   help="enable auto reconnect")
  17. parser.add_option("-i", "--reconnect-interval", type="float", default=3,
  18.                   help="interval between reconnect attempts")
  19. parser.add_option("-l", "--reconnect-limit", type="int",
  20.                   help="maximum number of reconnect attempts")
  21. parser.add_option("-t", "--timeout", type="float", default=0,
  22.                   help="timeout in seconds to wait before exiting (default %default)")
  23. parser.add_option("-p", "--print", dest="format", default="%(M)s",
  24.                   help="format string for printing messages (default %default)")
  25. parser.add_option("-v", dest="verbose", action="store_true",
  26.                   help="enable logging")

  27. opts, args = parser.parse_args()

  28. if opts.verbose:
  29.   enable("qpid", DEBUG)
  30. else:
  31.   enable("qpid", WARN)

  32. if args:
  33.   addr = args.pop(0)
  34. else:
  35.   parser.error("address is required")
  36. if opts.forever:
  37.   timeout = None
  38. else:
  39.   timeout = opts.timeout

  40. class Formatter:

  41.   def __init__(self, message):
  42.     self.message = message
  43.     self.environ = {"M": self.message,
  44.                     "P": self.message.properties,
  45.                     "C": self.message.content}

  46.   def __getitem__(self, st):
  47.     return eval(st, self.environ)

  48. conn = Connection(opts.broker,
  49.                   reconnect=opts.reconnect,
  50.                   reconnect_interval=opts.reconnect_interval,
  51.                   reconnect_limit=opts.reconnect_limit)
  52. try:
  53.   conn.open()
  54.   ssn = conn.session()
  55.   rcv = ssn.receiver(addr)

  56.   count = 0
  57.   while not opts.count or count < opts.count:
  58.     try:
  59.       msg = rcv.fetch(timeout=timeout)
  60.       print opts.format % Formatter(msg)
  61.       count += 1
  62.       ssn.acknowledge()
  63.     except Empty:
  64.       break
  65. except ReceiverError, e:
  66.   print e
  67. except KeyboardInterrupt:
  68.   pass

  69. conn.close()

connection和session是1对多的关系,每个session保证消息的顺序接收,让session创建对应的sender和receiver,这里我们只需要创建receiver

这个程序看起来很好,如果直接传一个地址, 比如(我们想要接收openstack glance的message)
运行如下:
python drain.py -b admin/qpid@localhost glance

如果我们想要实现连接断掉后重新自动连接,我们可以传入参数 -r
上面的程序有个问题,如果没有收到消息就断开连接了。

新需求1: 我们要实现持续的监听接收消息, 改一下

点击(此处)折叠或打开

  1. try:
  2.       msg = rcv.fetch(timeout=timeout)
  3.       print opts.format % Formatter(msg)
  4.       count += 1
  5.       ssn.acknowledge()
  6. except Empty:
  7.       time.sleep(0.5)
这样就可以了。

还不行,

新需求2:我们要实现断后重新自动连接, 可以, 传入参数 -r,

解决了

问题出现了,你会发现在service qpidd restart后,程序会抛出exception
qpid.messaging.exceptions.NotFound: no such queue: glance

新需求3:显然这是由于我们创建的queue不是durable的,所以需要在qpid restart后能够正常运行,而不是退出。
这就需要我们创建的queue能够在接收到消息的时候自动创建完成,

解决方法:     rcv = ssn.receiver(addr + "; {create: always}")

这样就完成queue的按需创建


新需求4: qpid 默认的reconnect上面的是基于固定interval,我们想改变重新建立连接的算法,实现2的指数式建立连接
解决方法: 我们写入自己的自动重连方法, 一个简单的例子如下:

点击(此处)折叠或打开

  1. def reconnect():
  2.     global rcv
  3.     global ssn
  4.     attempt = 0
  5.     delay = 1
  6.     while True:
  7.         if conn.opened():
  8.             try:
  9.                 conn.close()
  10.             except exceptions.ConnectionError:
  11.                     pass
  12.         attempt += 1
  13.         print "The %s time attempt for reconnecting qpid server" % str(attempt)
  14.         try:
  15.             connection_init()
  16.             conn.open()
  17.         except exceptions.ConnectionError, e:
  18.             delay = min(2 * delay, 60)
  19.             time.sleep(delay)
  20.             pass
  21.         else:
  22.             break
  23.     print "qpid server reconnection created"
  24.     ssn = conn.session()
  25.     rcv = ssn.receiver(addr + "; {create: always}")

这样就OK了


总结: 经过上面的需求变化,我们新的接收消息程序如下:

点击(此处)折叠或打开

  1. import optparse
  2. import time
  3. from qpid.messaging import *
  4. from qpid.util import URL
  5. from qpid.log import enable, DEBUG, WARN

  6. parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
  7.                                description="Drain messages from the supplied address.")
  8. parser.add_option("-b", "--broker", default="localhost",
  9.                   help="connect to specified BROKER (default %default)")
  10. parser.add_option("-c", "--count", type="int",
  11.                   help="number of messages to drain")
  12. parser.add_option("-f", "--forever", action="store_true",
  13.                   help="ignore timeout and wait forever")
  14. parser.add_option("-r", "--reconnect", action="store_true",
  15.                   help="enable auto reconnect")
  16. parser.add_option("-i", "--reconnect-interval", type="float", default=3,
  17.                   help="interval between reconnect attempts")
  18. parser.add_option("-l", "--reconnect-limit", type="int",
  19.                   help="maximum number of reconnect attempts")
  20. parser.add_option("-t", "--timeout", type="float", default=0,
  21.                   help="timeout in seconds to wait before exiting (default %default)")
  22. parser.add_option("-p", "--print", dest="format", default="%(M)s",
  23.                   help="format string for printing messages (default %default)")
  24. parser.add_option("-v", dest="verbose", action="store_true",
  25.                   help="enable logging")

  26. opts, args = parser.parse_args()

  27. if opts.verbose:
  28.     enable("qpid", DEBUG)
  29. else:
  30.     enable("qpid", WARN)

  31. if args:
  32.     addr = args.pop(0)
  33. else:
  34.     parser.error("address is required")
  35. if opts.forever:
  36.     timeout = None
  37. else:
  38.     timeout = opts.timeout

  39. count = 0
  40. rcv = None
  41. conn = None
  42. ssn = None


  43. class Formatter:

  44.     def __init__(self, message):
  45.         self.message = message
  46.         self.environ = {"M": self.message,
  47.                     "P": self.message.properties,
  48.                     "C": self.message.content}

  49.     def __getitem__(self, st):
  50.         return eval(st, self.environ)


  51. def reconnect():
  52.     global rcv
  53.     global ssn
  54.     attempt = 0
  55.     delay = 1
  56.     while True:
  57.         if conn.opened():
  58.             try:
  59.                 conn.close()
  60.             except exceptions.ConnectionError:
  61.                     pass
  62.         attempt += 1
  63.         print "The %s time attempt for reconnecting qpid server" % str(attempt)
  64.         try:
  65.             connection_init()
  66.             conn.open()
  67.         except exceptions.ConnectionError, e:
  68.             delay = min(2 * delay, 60)
  69.             time.sleep(delay)
  70.             pass
  71.         else:
  72.             break
  73.     print "qpid server reconnection created"
  74.     ssn = conn.session()
  75.     rcv = ssn.receiver(addr + "; {create: always}")


  76. def fetch():
  77.     global count
  78.     while not opts.count or count < opts.count:
  79.         try:
  80.             msg = rcv.fetch(timeout=timeout)
  81.             print opts.format % Formatter(msg)
  82.             count += 1
  83.             ssn.acknowledge()
  84.         except Empty:
  85.             time.sleep(0.5)
  86.         except exceptions.ConnectionError, e:
  87.             reconnect()
  88.         except Exception, e:
  89.             print e
  90.             raise e


  91. def connection_init():
  92.     global conn
  93.     conn = Connection(opts.broker,
  94.                   reconnect=opts.reconnect,
  95.                   reconnect_interval=opts.reconnect_interval,
  96.                   reconnect_limit=opts.reconnect_limit)


  97. try:
  98.     connection_init()
  99.     conn.open()
  100.     ssn = conn.session()
  101.     rcv = ssn.receiver(addr + "; {create: always}")
  102.     fetch()

  103. except ReceiverError, e:
  104.     print e
  105. except KeyboardInterrupt:
  106.     pass
  107. except exceptions.ConnectionError, e:
  108.     reconnect()


  109. conn.close()






阅读(4840) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~