近期有项目需要进行了服务器的管理信息采集,由于在之前的项目中使用了python-diamon这个应用,为了实现类似的功能,简单的对该应用进行了代码的分析,对Python在运维中的使用又有了一次更深入的了解。有关python-diamond的详细介绍可参看:。
python-diamond的基本功能是从安装当前软件的服务器中收集一些数据,然后将数据进行处理,目前默认的方式是将数据发送到graphite图表服务器。该软件的实现思路是将采集和处理分开,其中采集数据通过调用各个组件提供的脚本进行处理,而数据处理部分也是通过调用不同的Handler对象实现,Handler对象的实现也是根据提供的脚本实现。
该软件的基本框架如下图所示:
其中Server管理器主要负责管理各个进程的控制,其中管理的进程包括采集的进程,在早期的版本中diamond采用单进程的方式,而在最新的版本中采用了多进程的架构,个人推测是支持的组件(hadoop、ceph、glusterfs、redis等)越来越丰富,为了减少各个组件的影响,采用了多进程的架构。每个进程负责一个组件的信息采集。
在Python-diamond,其中diamond进程的启动实际上就是一个python程序的启动,在该程序中主要是管理pid文件、创建Server类的对象实例。然后将处理交给Server对象控制。
而Server类的实现中主要负责进程的加载和管理,定期的检测子进程的状态。若子进程退出,则再次加载对应的子进程。检测的基本方法是根据加载的采集对象来判断。这部分的实现如下:
-
...
-
# 路劲加载
-
sys.path.append(
-
os.path.abspath(
-
os.path.join(
-
os.path.dirname(__file__), "../")))
-
-
class Server(object):
-
"""
-
Server class loads and starts Handlers and Collectors
-
"""
-
-
def __init__(self, configfile):
-
# Initialize Logging
-
self.log = logging.getLogger('diamond')
-
# Initialize Members
-
self.configfile = configfile
-
self.config = None
-
self.handlers = []
-
self.handler_queue = []
-
self.modules = {}
-
self.metric_queue = None #对应的队列
-
-
-
def run(self):
-
"""
-
Load handler and collector classes and then start collectors
-
"""
-
-
#######################################################################
-
# Config
-
#######################################################################
-
self.config = load_config(self.configfile) #加载对应的配置文件
-
-
collectors = load_collectors(self.config['server']['collectors_path']) #获取对应的收集对象
-
metric_queue_size = int(self.config['server'].get('metric_queue_size',
-
16384))
-
self.metric_queue = self.manager.Queue(maxsize=metric_queue_size) #创建进程可用的队列
-
self.log.debug('metric_queue_size: %d', metric_queue_size)
-
-
if 'handlers_path' in self.config['server']:
-
handlers_path = self.config['server']['handlers_path']
-
-
# Make an list if not one
-
if isinstance(handlers_path, basestring):
-
handlers_path = handlers_path.split(',')
-
handlers_path = map(str.strip, handlers_path)
-
self.config['server']['handlers_path'] = handlers_path
-
-
load_include_path(handlers_path)
-
-
if 'handlers' not in self.config['server']:
-
self.log.critical('handlers missing from server section in config')
-
sys.exit(1)
-
-
handlers = self.config['server'].get('handlers')
-
if isinstance(handlers, basestring):
-
handlers = [handlers]
-
-
# Prevent the Queue Handler from being a normal handler
-
if 'diamond.handler.queue.QueueHandler' in handlers:
-
handlers.remove('diamond.handler.queue.QueueHandler')
-
-
self.handlers = load_handlers(self.config, handlers) #加载对应的处理器
-
-
QueueHandler = load_dynamic_class(
-
'diamond.handler.queue.QueueHandler',
-
Handler
-
) #获取一个处理器队列
-
-
self.handler_queue = QueueHandler(
-
config=self.config, queue=self.metric_queue, log=self.log)
-
-
process = multiprocessing.Process( #只产生一个进程
-
name="Handlers",
-
target=handler_process, # 参数为handler_process,调用handler处理对应的发送数据
-
args=(self.handlers, self.metric_queue, self.log), #对应的参数如下
-
)
-
-
process.daemon = True #设置为daemon函数
-
process.start() #启动对应的进程
-
-
if hasattr(signal, 'SIGHUP'):
-
signal.signal(signal.SIGHUP, signal_to_exception) #安装对应的异常处理函数
-
-
#######################################################################
-
-
while True:
-
try:
-
active_children = multiprocessing.active_children() #获取所有正常运行的子进程
-
running_processes = []
-
for process in active_children:
-
running_processes.append(process.name) #保存子进程的名字
-
running_processes = set(running_processes)
-
-
##############################################################
-
# Collectors
-
##############################################################
-
-
running_collectors = [] #获取默认启动的Collecter
-
for collector, config in self.config['collectors'].iteritems():
-
if config.get('enabled', False) is not True:
-
continue
-
running_collectors.append(collector) #添加启动的Collecter
-
running_collectors = set(running_collectors)
-
-
# Collectors that are running but shouldn't be
-
for process_name in running_processes - running_collectors: #关闭不再处理的Collecter进程
-
if 'Collector' not in process_name:
-
continue
-
for process in active_children:
-
if process.name == process_name:
-
process.terminate() #终止对应的进程
-
-
collector_classes = dict(
-
(cls.__name__.split('.')[-1], cls) #键值和对应的cls
-
for cls in collectors.values()
-
)
-
-
load_delay = self.config['server'].get('collectors_load_delay',
-
1.0)
-
for process_name in running_collectors - running_processes: #当前还没有运行的子进程
-
# To handle running multiple collectors concurrently, we
-
# split on white space and use the first word as the
-
# collector name to spin
-
collector_name = process_name.split()[0]
-
-
if 'Collector' not in collector_name:
-
continue
-
-
if collector_name not in collector_classes:
-
self.log.error('Can not find collector %s',
-
collector_name)
-
continue
-
-
collector = initialize_collector(
-
collector_classes[collector_name],
-
name=process_name,
-
configfile=self.configfile,
-
handlers=[self.handler_queue]) #初始化对应的Collecter
-
-
if collector is None:
-
self.log.error('Failed to load collector %s',
-
process_name)
-
continue
-
-
# Splay the loads
-
time.sleep(float(load_delay))
-
-
process = multiprocessing.Process(
-
name=process_name,
-
target=collector_process, #对应的Collecter的处理函数
-
args=(collector, self.metric_queue, self.log)
-
) #创建对应的子进程,即对应到对应的Collecter-name
-
process.daemon = True #设置为deamon的形式
-
process.start() #加载对应的进程
-
-
##############################################################
-
-
time.sleep(1)
-
-
except SIGHUPException: #如果出现了SIGHUPException则进程重新加载配置的操作,然后重新定义信号
-
# ignore further SIGHUPs for now
-
original_sighup_handler = signal.getsignal(signal.SIGHUP)
-
signal.signal(signal.SIGHUP, signal.SIG_IGN)
-
-
self.log.info('Reloading state due to HUP')
-
self.config = load_config(self.configfile)
-
collectors = load_collectors(
-
self.config['server']['collectors_path']
在管理进程中主要是进行进程的管理。而在子进程的操作非常的简单,主要是调用组件提供的脚本,进行对应的处理,如下所示:
-
def collector_process(collector, metric_queue, log): #Collecter的处理函数
-
"""
-
"""
-
proc = multiprocessing.current_process()
-
if setproctitle:
-
setproctitle('%s - %s' % (getproctitle(), proc.name))
-
-
signal.signal(signal.SIGALRM, signal_to_exception)
-
signal.signal(signal.SIGHUP, signal_to_exception)
-
signal.signal(signal.SIGUSR2, signal_to_exception)
-
-
interval = float(collector.config['interval'])
-
if interval <= 0:
-
log.critical('interval of %s is not valid!', interval)
-
sys.exit(1)
-
next_window = math.floor(time.time() / interval) * interval
-
stagger_offset = random.uniform(0, interval - 1)
-
-
max_time = int(max(interval - stagger_offset, 1))
-
log.debug('Max collection time: %s seconds', max_time)
-
-
sys.stdout = open(os.devnull, 'w')
-
sys.stderr = open(os.devnull, 'w')
-
-
while(True):
-
try:
-
time_to_sleep = (next_window + stagger_offset) - time.time()
-
if time_to_sleep > 0:
-
time.sleep(time_to_sleep)
-
elif time_to_sleep < 0:
-
# clock has jumped, lets skip missed intervals
-
next_window = time.time()
-
-
next_window += interval
-
-
# Ensure collector run times fit into the collection window
-
signal.alarm(max_time)
-
-
#
-
collector._run() #调用采集对象的函数
-
-
# Disable the alarm
-
signal.alarm(0)
-
-
except SIGALRMException:
-
log.error('Took too long to run! Killed!')
-
-
# Adjust the stagger_offset to allow for more time to run the
-
# collector
-
stagger_offset = stagger_offset * 0.9
-
-
max_time = int(max(interval - stagger_offset, 1))
-
log.debug('Max collection time: %s seconds', max_time)
-
-
except SIGHUPException:
-
# Reload the config if requested
-
# We must first disable the alarm as we don't want it to interrupt
-
# us and end up with half a loaded config
-
signal.alarm(0) #重新设置对应的定时器
-
-
log.info('Reloading config reload due to HUP')
-
collector.load_config() #重新加载配置文件
-
log.info('Config reloaded')
-
-
except Exception:
-
log.exception('Collector
而handler的处理函数如下所示:
-
def handler_process(handlers, metric_queue, log): #处理函数的处理函数
-
proc = multiprocessing.current_process()
-
if setproctitle:
-
setproctitle('%s - %s' % (getproctitle(), proc.name))
-
-
log.debug('Starting process %s', proc.name)
-
-
while(True):
-
metric = metric_queue.get(block=True, timeout=None) #从队列中取出对应的指标
-
for handler in handlers:
-
if metric is not None:
-
handler._process(metric) #调用具体handler的_process(), 该函数调用了子类的process
-
else:
-
handler._flush()
接下来就是对应的采集类实现:
-
class Collector(object):
-
"""
-
The Collector class is a base class for all metric collectors.
-
"""
-
-
def __init__(self, config=None, handlers=[], name=None, configfile=None):
-
"""
-
Create a new instance of the Collector class
-
"""
-
# Initialize Logger
-
self.log = logging.getLogger('diamond')
-
# Initialize Members
-
if name is None:
-
self.name = self.__class__.__name__
-
else:
-
self.name = name
-
-
self.handlers = handlers
-
self.last_values = {}
-
-
self.configfile = None
-
self.load_config(configfile, config)
-
-
...
-
-
def collect(self): #待组件实现的函数
-
"""
-
Default collector method
-
"""
-
raise NotImplementedError()
-
-
def publish(self, name, value, raw_value=None, precision=0,
-
metric_type='GAUGE', instance=None): #数据的发布操作
-
"""
-
Publish a metric with the given name
-
"""
-
# Check whitelist/blacklist
-
if self.config['metrics_whitelist']:
-
if not self.config['metrics_whitelist'].match(name):
-
return
-
elif self.config['metrics_blacklist']:
-
if self.config['metrics_blacklist'].match(name):
-
return
-
-
# Get metric Path
-
path = self.get_metric_path(name, instance=instance)
-
-
# Get metric TTL
-
ttl = float(self.config['interval']) * float(
-
self.config['ttl_multiplier'])
-
-
# Create Metric
-
try:
-
metric = Metric(path, value, raw_value=raw_value, timestamp=None,
-
precision=precision, host=self.get_hostname(),
-
metric_type=metric_type, ttl=ttl)
-
except DiamondException:
-
self.log.error(('Error when creating new Metric: path=%r, '
-
'value=%r'), path, value)
-
raise
-
-
# Publish Metric
-
self.publish_metric(metric)
-
-
def publish_metric(self, metric):
-
"""
-
Publish a Metric object
-
"""
-
# Process Metric
-
for handler in self.handlers:
-
handler._process(metric) #具体的handler的处理函数
-
-
def _run(self):
-
"""
-
Run the collector unless it's already running
-
"""
-
try:
-
start_time = time.time()
-
-
# Collect Data
-
self.collect() #调用组件的实现函数
-
-
end_time = time.time()
-
collector_time = int((end_time - start_time) * 1000)
-
-
self.log.debug('Collection took %s ms', collector_time)
-
-
if 'measure_collector_time' in self.config:
-
if self.config['measure_collector_time']:
-
metric_name = 'collector_time_ms'
-
metric_value = collector_time
-
self.publish(metric_name, metric_value)
-
finally:
-
# After collector run, invoke a flush
-
# method on each handler.
-
for handler in self.handlers:
-
handler._flush()
从上述的代码基本可知,只需要在组件脚本中集成Collector类,然后实现对应的collect()函数即可。具体可以ceph的collect为例进行分析:
-
class CephCollector(diamond.collector.Collector):
-
-
def get_default_config_help(self):
-
config_help = super(CephCollector, self).get_default_config_help()
-
config_help.update({
-
'socket_path': 'The location of the ceph monitoring sockets.'
-
' Defaults to "/var/run/ceph"',
-
'socket_prefix': 'The first part of all socket names.'
-
' Defaults to "ceph-"',
-
'socket_ext': 'Extension for socket filenames.'
-
' Defaults to "asok"',
-
'ceph_binary': 'Path to "ceph" executable. '
-
'Defaults to /usr/bin/ceph.',
-
})
-
return config_help
-
-
def get_default_config(self):
-
"""
-
Returns the default collector settings
-
"""
-
config = super(CephCollector, self).get_default_config()
-
config.update({
-
'socket_path': '/var/run/ceph',
-
'socket_prefix': 'ceph-',
-
'socket_ext': 'asok',
-
'ceph_binary': '/usr/bin/ceph',
-
})
-
return config
-
-
def _get_socket_paths(self):
-
"""Return a sequence of paths to sockets for communicating
-
with ceph daemons.
-
"""
-
socket_pattern = os.path.join(self.config['socket_path'],
-
(self.config['socket_prefix'] +
-
'*.' + self.config['socket_ext']))
-
return glob.glob(socket_pattern)
-
-
def _get_counter_prefix_from_socket_name(self, name):
-
"""Given the name of a UDS socket, return the prefix
-
for counters coming from that source.
-
"""
-
base = os.path.splitext(os.path.basename(name))[0]
-
if base.startswith(self.config['socket_prefix']):
-
base = base[len(self.config['socket_prefix']):]
-
return 'ceph.' + base
-
-
def _get_stats_from_socket(self, name):
-
"""Return the parsed JSON data returned when ceph is told to
-
dump the stats from the named socket.
-
-
In the event of an error error, the exception is logged, and
-
an empty result set is returned.
-
"""
-
try:
-
json_blob = subprocess.check_output(
-
[self.config['ceph_binary'],
-
'--admin-daemon',
-
name,
-
'perf',
-
'dump',
-
])
-
except subprocess.CalledProcessError, err:
-
self.log.info('Could not get stats from %s: %s',
-
name, err)
-
self.log.exception('Could not get stats from %s' % name)
-
return {}
-
-
try:
-
json_data = json.loads(json_blob)
-
except Exception, err:
-
self.log.info('Could not parse stats from %s: %s',
-
name, err)
-
self.log.exception('Could not parse stats from %s' % name)
-
return {}
-
-
return json_data
-
-
def _publish_stats(self, counter_prefix, stats):
-
"""Given a stats dictionary from _get_stats_from_socket,
-
publish the individual values.
-
"""
-
for stat_name, stat_value in flatten_dictionary(
-
stats,
-
prefix=counter_prefix,
-
):
-
self.publish_gauge(stat_name, stat_value)
-
-
def collect(self):
-
"""
-
Collect stats
-
"""
-
for path in self._get_socket_paths():
-
self.log.debug('checking %s', path)
-
counter_prefix = self._get_counter_prefix_from_socket_name(path)
-
stats = self._get_stats_from_socket(path)
-
self._publish_stats(counter_prefix, stats)
-
return
从上述代码可知,从collector类继承,然后实现collect()接口。
有关handler类的实现比较多,目前支持多种格式,这里就不去进行进一步的分析,也是通过Handler基类进行继承的方式实现。
阅读(4930) | 评论(0) | 转发(0) |