OpenStack Mitaka从零开始——消息队列基础
安装客户端python-pika
python-pika.spec redhat 6
-
%{!?python_sitelib: %global python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
-
%define short_name pika
-
-
Name: python-%{short_name}
-
Version: 0.10.0
-
Release: 0%{?dist}
-
Summary: AMQP 0-9-1 client library for Python
-
-
Group: Development/Libraries
-
License: MPLv1.1 or GPLv2
-
URL: %{short_name}/%{short_name}
-
# The tarball comes from here:
-
# %{short_name}/%{short_name}/tarball/v%{version}
-
# GitHub has layers of redirection and renames that make this a troublesome
-
# URL to include directly.
-
Source0: %{short_name}-%{version}.tar.gz
-
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
-
BuildArch: noarch
-
-
BuildRequires: python-setuptools
-
BuildRequires: python-devel
-
Requires: python-pyev
-
Requires: python-tornado
-
Requires: python-twisted
-
Requires: python >= 2.6.5
-
-
-
%description
-
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that
-
tries to stay fairly independent of the underlying network support
-
library.
-
-
-
%prep
-
%setup -q -n %{short_name}-%{version}
-
-
%build
-
%{__python} setup.py build
-
-
%install
-
%{__rm} -rf %{buildroot}
-
%{__python} setup.py install -O1 --skip-build --root %{buildroot}
-
install -D -m 644 LICENSE %{buildroot}%{_docdir}/%{name}-%{version}
-
install -D -m 644 README.rst %{buildroot}%{_docdir}/%{name}-%{version}
-
install -D -m 644 PKG-INFO %{buildroot}%{_docdir}/%{name}-%{version}
-
-
-
%clean
-
%{__rm} -rf %{buildroot}
-
-
-
%files
-
%defattr(-,root,root,-)
-
%dir %{python_sitelib}/%{short_name}*
-
%{python_sitelib}/%{short_name}*/*
-
%doc README.rst
-
%doc LICENSE
-
%doc PKG-INFO
-
-
%changelog
-
-
* Sat Aug 20 2016 gcy - 0.10.0
-
- update version 0.10.0
-
-
* Tue Dec 13 2011 Daniel Aharon - 0.9.5-2
-
- Patch pika/adapters/blocking_connection.py
-
-
* Sun Apr 3 2011 Ilia Cheishvili - 0.9.5-1
-
- Upgrade to version 0.9.5
-
-
* Sun Mar 6 2011 Ilia Cheishvili - 0.9.4-1
-
- Upgrade to version 0.9.4
-
-
* Sat Feb 19 2011 Ilia Cheishvili - 0.9.3-1
-
- Upgrade to version 0.9.3
-
-
* Sat Oct 2 2010 Ilia Cheishvili - 0.5.2-1
-
- Initial Package
安装服务端
rabbitmq-server
rabbitmq配置(rpm包自己搞定)
cat /etc/rabbitmq/rabbitmq-env.conf
# 文件位置
RABBITMQ_MNESIA_BASE=/data/rabbitmq_mnesia
# 监听IP 端口
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=5673
如果上述配置无效,自己去init脚本里看怎么读配置文件的
rabbitmqctl add_vhost 添加一个vhost
rabbitmqctl add_user 添加一个用户
添加完就能用了
接下来是如何使用消息队列
先搞清楚几个比较重要的概念
1、什么是消息队列, 什么是Producer、Exchange、Queue、Consumer、Topic、Fanout、Routing key
2、no_ack
http://my.oschina.net/moooofly/blog/143883
3、流量控制
(Fair dispatch 公平分发 部分)
http://my.oschina.net/hncscwc/blog/195560
简介:默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?
通过 basic.qos 方法设置 prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。
换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
现在上简单代码,直接用pika
-
#!/usr/bin/python
-
# -*- coding: UTF-8 -*-
-
-
import pika
-
from pika import PlainCredentials
-
from pika import ConnectionParameters
-
-
-
def main():
-
user = 'phptest'
-
passwd = 'phptest'
-
vhost = 'phptest'
-
ip = '127.0.0.1'
-
port = 5673
-
identified = PlainCredentials(user, passwd)
-
paarameters = ConnectionParameters(ip, port, vhost, identified)
-
connection = pika.BlockingConnection(paarameters)
-
channel = connection.channel()
-
print 'connect success'
-
channel.exchange_declare(exchange='gcy2', auto_delete=True)
-
#channel._delivery_confirmation = 1
-
print 'start send data'
-
channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff1')
-
channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff2')
-
print 'end'
-
-
-
if __name__ == '__main__':
-
main()
发送者部分非常简单
账号密码用PlainCredentials类封装
连接的参数用ConnectionParameters封装, rabbitmq的vhost相当于mysql实例一个库,用来互相隔离权限范围的
BlockingConnection类就是一个
rabbitmq的连接,如果要用多连接的话,pika有一个pika-pool的库,在openstack里,server端是单连接的,只有发送端才用到了多连接。
不要看
BlockingConnection是block打头就以为是block的了,实际上blockconnection是一个分装好的上层类,实际会调用下面的select pool epoll 甚至event。一般都直接使用BlockingConnection,openstack就是用的BlockingConnection(linux上会以epoll来处理socket数据,下次详细讲openstack的rpc通信的时候会详细说明)
channel这个玩意比较蛋碎,之前我看了很久就是为了看明白为什么不直接用connection还要在connection上封一层channel,后来大致明白,其实就是为了不多建立多个connection也能做隔离
数据实际是从connection获取到后分发到channel
生成channel后, 声明一个叫gcy2的 交换机(exchange), 默认的exchange_type是direct,即单点的,
然后发送数据 到gcy2 这个exchange, 接收者的routing_key是a
发送者代码完结
接收者会稍微复杂一些
-
def main():
-
user = 'phptest'
-
passwd = 'phptest'
-
vhost = 'phptest'
-
ip = '127.0.0.1'
-
port = 5673
-
identified = PlainCredentials(user, passwd)
-
paarameters = ConnectionParameters(ip, port, vhost, identified)
-
connection = pika.BlockingConnection(paarameters)
-
channel = connection.channel()
-
print 'connect success'
-
channel.queue_declare(queue='myqeueu')
-
channel.queue_bind(queue='myqeueu', exchange='gcy2', routing_key='a')
-
get_list = []
-
def callback(ch, method, properties, body):
-
print 'get body %s ' % body,
-
get_list.append([method.delivery_tag, body])
-
print method.consumer_tag
-
#ch.basic_ack(method.delivery_tag)
-
#ch.stop_consuming()
-
channel.basic_qos(prefetch_count=5)
-
tag1 = channel.basic_consume(callback, queue='myqeueu', no_ack=True)
-
tag2 = channel.basic_consume(callback, queue='myqeueu', no_ack=False)
-
print tag1
-
print tag2
-
def get_data():
-
while 1:
-
last_queue_size = len(get_list)
-
if last_queue_size >= 5:
-
ret = get_list[:5]
-
del get_list[:5]
-
return ret
-
else:
-
connection.process_data_events()
-
if last_queue_size == len(get_list):
-
ret = get_list[:5]
-
del get_list[:5]
-
return ret
-
while 1:
-
ret = get_data()
-
if ret:
-
print 'fucked ',
-
print ret[0][1]
-
#channel.basic_ack(ret[0][0])
-
print 'end'
-
connection.process_data_events()
-
connection.close()
-
-
-
if __name__ == '__main__':
-
main()
上面没有使用网上常用的start_consuming()写法,这里的写法模仿了openstack的pika驱动的写法
上述代码的写法是有问题的,我们先一步步说明再解释错误
阅读(1428) | 评论(0) | 转发(0) |