2016年(1)
分类: Python/Ruby
2016-03-23 13:58:30
原文地址:ceph管理平台Calamari的扩展开发 作者:gongping11
其中红框部分为Calamari代码实现的部分,非红框部分为非Calamari实现的开源框架。
在Ceph server node安装的组件有Diamond和Salt-minion。Diamond负责收集监控数据,它支持非常多的数据类型和metrics;每一个类型的数据都是上图中的一个collector,它除了收集Ceph本身的状态信息,它还可以收集关键的资源使用情况和性能数据,包括CPU,内存,网络,I / O负载和磁盘指标。Collector都是使用本地的命令行来收集数据,然后报告给Graphite。
Graphite不仅是一个企业级的监控工具, 还可以实时绘图。carbon-cache是Python实现的高度可扩展的事件驱动的I/O架构的后端进程,它可以有效地跟大量的客户端通信并且以较低的开销处理大量的业务量。
Whisper跟RRDtool类似,提供数据库开发库给应用程序来操纵和检索存储在特殊格式的文件数据(时间数据点数据),Whisper最基本的操作是创建作出新的Whisper文件,更新写入新的数据点到一个文件中,并获取检索的数据点
Graphite_web是用户接口,用来生成图片,用户可以直接通过URL的方式访问这些生成的图片。
Calamari 使用了Saltstack让Calamari Server和Ceph server node通信。Saltstack是一个开源的自动化运维管理工具,与Chef和Puppet功能类似。Salt-master发送指令给指定的Salt-minion来完成对Cpeh Cluster的管理工作;Salt-minion 在Ceph server node安装后都会从master同步并安装一个ceph.py文件,里面包含Ceph操作的API,它会调用librados或命令行来最终和Ceph Cluster通信。
calamari_rest提供Calamari REST API,详细的接口请大家参照官方文档。Ceph的REST API是一种低层次的接口,其中每个URL直接映射到等效的CEPH CLI;Calamari REST API提供了一个更高层次的接口,API的使用者可以习惯的使用GET/POST/PATCH方法来操作对象,而无需知道底层的Ceph的命令;它们之间的主要区别在于,Ceph的REST API的使用者需要非常了解Ceph本身,而Calamari 的REST API更贴近对Ceph资源的描述,所以更加适合给上层的应用程序调用。
cthulhu可以理解是Calamari Server的Service层,对上为API提供接口,对下调用Salt-master。
calamari_clients是一套用户界面,Calamari Server在安装的过程中会首先创建opt/calamari/webapp目录,并且把webapp/calamari下的manager.py(django 配置)文件考进去, calamari_web的所有内容到要放到opt/calamari/webapp下面来提供UI的访问页面。
calamari-web包下面的文件提供所有web相关的配置,calamari_rest和calamari_clients都要用到。在Calamari的基础之上进行新的功能开发,主要分为如下的几个模块,这部分包括Rest-API部分,Cthulhu、salt客户端的扩展。关于扩展新功能的基本步骤如下:
>> 扩展URL模块,确定对应的响应接口参数、对应ViewSet中的响应接口。
>> 完成ViewSet中部分接口的实现,这部分主要涉及与cthulhu的交互,如何获取数据信息,有些情况下还需要获取serializer中对象的序列化操作。
>> 完成后台rpc.py中对应类型的扩展,这部分主要是针对部分的post操作。
>> 完成cluster_monitor.py的扩展,对于提供操作的部分功能需要支持create、update、delete等操作,必须提供对应的RequestFactory。而在cluster_monitor.py中需要将对应的RequestFactory添加代码中。
>> 完成对应RequestFactory类的编写,这部分主要是完成命令操作的封装。并构建对应的请求操作。
>> salt-minion的扩展,这部分主要是针对ceph.py文件的扩展,当然也可以提供新的xxx.py文件。
接下来以PG的控制和操作为例进行说明。
目前Calmamari采用Rest-API形式,采用Django的Rest-Framework框架支持,这部分在rest-api代码目录中。Django采用Url和代码逻辑分离的实现方式,因此URL可以单独的扩展。
在rest-api/calamari-rest/urls/v2.py中添加如下的有关PG的URL:
url(r'^cluster/(?P
url(r'^cluster/(?P
calamari_rest.views.v2.PgViewSet.as_view({'post': 'apply'}),
name='cluster-pool-pg-control'),
以上定义了两个URL,分别是:
api/v2/cluster/xxxx/pool/x/pg
api/v2/cluster/xxxx/pool/x/pg/xx/command/xxx
以上两个URL分别指定了PgViewSet中的接口,url的get方法对应了list接口。post接口对应的apply接口。这两个接口就是PgViewSet中必须实现的。
在扩展URL之后,接下来就是进行对应响应接口的扩展,这部分的扩展主要是针对在URL中指定的接口类进行实现。在之前的PG指定了两个不同的接口,分别是获取和操作命令,对应的代码路径为/rest-api/calamari-rest/view/v2.py,具体的代码如下:
class PgViewSet(RPCViewSet):
serializer_class = PgSerializer
def list(self, request, fsid, pool_id):
poolName = self.client.get(fsid, POOL, int(pool_id))['pool_name']
pg_summary = self.client.get_sync_object(fsid, PgSummary.str)
pg_pools = pg_summary['pg_pools']['by_pool'][int(pool_id)]
for pg in pg_pools:
pg['pool'] = poolName
return Response(PgSerializer(pg_pools, many=True).data)
def apply(self, request, fsid, pool_id, pg_id, command):
return Response(self.client.apply(fsid, PG, pg_id, command), status=202)
从如上的实现可知,代码实现了两个接口,分别是list和apply接口,即对应与之前的get、post操作。以上两个操作都会与后台cthulhu进行交互。分别是获取参数和提交请求。返回内容也有一定的差异。
同时在list接口中进行了序列化设置,即PgSerializer,该实现在rest-api/calamari-rest/serializer/v2.py中。
通常在Rest-Api中会进行数据的序列化,这部分并不是一定要进行的,通常在需要更改的操作中是有必要的。如下是Pg的序列化操作:
class PgSerializer(serializers.Serializer):
class Meta:
fields = ('id', 'pool', 'state', 'up', 'acting', 'up_primary', 'acting_primary')
id = serializers.CharField(source='pgid')
pool = serializers.CharField(help_text='pool name')
state = serializers.CharField(source='state', help_text='pg state')
up = serializers.Field(help_text='pg Up set')
acting = serializers.Field(help_text='pg acting set')
up_primary = serializers.IntegerField(help_text='pg up primary')
acting_primary = serializers.IntegerField(help_text='pg acting primary')
这部分并不是必须的。有些模块可能不存在这部分的操作。在之前的三个步骤中基本上就实现了Rest-API部分的扩展,其中主要的ViewSet的扩展。有关ViewSet实际上实现了cthulhu与rest-api的交互方法。
在ViewSet的扩展中实际上采用了rpc与后台交互,因此在cthulhu的实现部分主要是处理对应的rpc请求。
rpc.py中实现了所有请求的操作,但是新扩展的操作也是需要支持扩展的,以pg为例继续说明:
def apply(self, fs_id, object_type, object_id, command):
"""
Apply commands that do not modify an object in a cluster.
"""
cluster = self._fs_resolve(fs_id)
if object_type == OSD:
# Run a resolve to throw exception if it's unknown
self._osd_resolve(cluster, object_id)
return cluster.request_apply(OSD, object_id, command)
elif object_type == PG:
return cluster.request_apply(PG, object_id, command)
else:
raise NotImplementedError(object_type)
而Pg的列表是通过PgSummary获取。这部分在之前的实现中已存在,之前的代码实现如下:
def get_sync_object(self, fs_id, object_type, path=None):
"""
Get one of the objects that ClusterMonitor keeps a copy of from the mon, such
as the cluster maps.
:param fs_id: The fsid of a cluster
:param object_type: String, one of SYNC_OBJECT_TYPES
:param path: List, optional, a path within the object to return instead of the whole thing
:return: the requested data, or None if it was not found (including if any element of ``path``
was not found)
"""
if path:
obj = self._fs_resolve(fs_id).get_sync_object(SYNC_OBJECT_STR_TYPE[object_type])
try:
for part in path:
if isinstance(obj, dict):
obj = obj[part]
else:
obj = getattr(obj, part)
except (AttributeError, KeyError) as e:
log.exception("Exception %s traversing %s: obj=%s" % (e, path, obj))
raise NotFound(object_type, path)
return obj
else:
return self._fs_resolve(fs_id).get_sync_object_data(SYNC_OBJECT_STR_TYPE[object_type])
有关请求的操作都会进行集群的控制,这部分可以通过cluster_monitor进行实现,以pg为例进行说明。
def __init__(self, fsid, cluster_name, notifier, persister, servers, eventer, requests):
super(ClusterMonitor, self).__init__()
self.fsid = fsid
self.name = cluster_name
self.update_time = datetime.datetime.utcnow().replace(tzinfo=utc)
self._notifier = notifier
self._persister = persister
self._servers = servers
self._eventer = eventer
self._requests = requests
# Which mon we are currently using for running requests,
# identified by minion ID
self._favorite_mon = None
self._last_heartbeat = {}
self._complete = gevent.event.Event()
self.done = gevent.event.Event()
self._sync_objects = SyncObjects(self.name)
self._request_factories = {
CRUSH_MAP: CrushRequestFactory,
CRUSH_NODE: CrushNodeRequestFactory,
OSD: OsdRequestFactory,
POOL: PoolRequestFactory,
CACHETIER: CacheTierRequestFactory,
PG: PgRequestFactory,
ERASURE_PROFILE: ErasureProfileRequestFactory,
ASYNC_COMMAND: AsyncComRequestFactory
}
self._plugin_monitor = PluginMonitor(servers)
self._ready = gevent.event.Event()
这部分主要是将对应的请求与对应的请求工厂类进行绑定,这样才能产生出合适的请求。
该工厂类主要是针对不同的需求,实现具体的接口类,不同的对象有不同的请求类,以Pg为例说明:
from cthulhu.manager.request_factory import RequestFactory
from cthulhu.manager.user_request import RadosRequest
from calamari_common.types import PG_IMPLEMENTED_COMMANDS, PgSummary
class PgRequestFactory(RequestFactory):
def scrub(self, pg_id):
return RadosRequest(
"Initiating scrub on {cluster_name}-pg{id}".format(cluster_name=self._cluster_monitor.name, id=pg_id),
self._cluster_monitor.fsid,
self._cluster_monitor.name,
[('pg scrub', {'pgid': pg_id})])
def deep_scrub(self, pg_id):
return RadosRequest(
"Initiating deep-scrub on {cluster_name}-osd.{id}".format(cluster_name=self._cluster_monitor.name, id=pg_id),
self._cluster_monitor.fsid,
self._cluster_monitor.name,
[('pg deep-scrub', {'pgid': pg_id})])
def repair(self, pg_id):
return RadosRequest(
"Initiating repair on {cluster_name}-osd.{id}".format(cluster_name=self._cluster_monitor.name, id=pg_id),
self._cluster_monitor.fsid,
self._cluster_monitor.name,
[('pg repair', {'pgid': pg_id})])
def get_valid_commands(self, pg_id):
ret_val = {}
file('/tmp/pgsummary.txt', 'a+').write(PgSummary.str + '\n')
pg_summary = self._cluster_monitor.get_sync_object(PgSummary)
pg_pools = pg_summary['pg_pools']['by_pool']
pool_id = int(pg_id.split('.')[0])
pool = pg_pools[pool_id]
for pg in pool:
if pg['pgid'] == pg_id:
ret_val[pg_id] = {'valid_commands': PG_IMPLEMENTED_COMMANDS}
else:
ret_val[pg_id] = {'valid_commands': []}
return ret_val
该类中实现了三个不同的命令的实现,该命令主要是进行对应的封装,这部分关键字需要根据ceph源码中的参数进行选择,因此在编码时需要参照ceph源码中对应命令的json参数名。