Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1900597
  • 博文数量: 152
  • 博客积分: 3730
  • 博客等级: 上尉
  • 技术积分: 3710
  • 用 户 组: 普通用户
  • 注册时间: 2011-06-02 14:36
个人简介

减肥,运动,学习,进步...

文章分类

全部博文(152)

文章存档

2016年(14)

2015年(17)

2014年(16)

2013年(4)

2012年(66)

2011年(35)

分类: Python/Ruby

2016-04-06 23:18:46

近期有项目需要进行了服务器的管理信息采集,由于在之前的项目中使用了python-diamon这个应用,为了实现类似的功能,简单的对该应用进行了代码的分析,对Python在运维中的使用又有了一次更深入的了解。有关python-diamond的详细介绍可参看:。

python-diamond的基本功能是从安装当前软件的服务器中收集一些数据,然后将数据进行处理,目前默认的方式是将数据发送到graphite图表服务器。该软件的实现思路是将采集和处理分开,其中采集数据通过调用各个组件提供的脚本进行处理,而数据处理部分也是通过调用不同的Handler对象实现,Handler对象的实现也是根据提供的脚本实现。
该软件的基本框架如下图所示:
其中Server管理器主要负责管理各个进程的控制,其中管理的进程包括采集的进程,在早期的版本中diamond采用单进程的方式,而在最新的版本中采用了多进程的架构,个人推测是支持的组件(hadoop、ceph、glusterfs、redis等)越来越丰富,为了减少各个组件的影响,采用了多进程的架构。每个进程负责一个组件的信息采集。

在Python-diamond,其中diamond进程的启动实际上就是一个python程序的启动,在该程序中主要是管理pid文件、创建Server类的对象实例。然后将处理交给Server对象控制。

而Server类的实现中主要负责进程的加载和管理,定期的检测子进程的状态。若子进程退出,则再次加载对应的子进程。检测的基本方法是根据加载的采集对象来判断。这部分的实现如下:

点击(此处)折叠或打开

  1. ...
  2. # 路劲加载
  3. sys.path.append(
  4.     os.path.abspath(
  5.         os.path.join(
  6.             os.path.dirname(__file__), "../")))

  7. class Server(object):
  8.     """
  9.     Server class loads and starts Handlers and Collectors
  10.     """

  11.     def __init__(self, configfile):
  12.         # Initialize Logging
  13.         self.log = logging.getLogger('diamond')
  14.         # Initialize Members
  15.         self.configfile = configfile
  16.         self.config = None
  17.         self.handlers = []
  18.         self.handler_queue = []
  19.         self.modules = {}
  20.         self.metric_queue = None #对应的队列


  21.     def run(self):
  22.         """
  23.         Load handler and collector classes and then start collectors
  24.         """

  25.         #######################################################################
  26.         # Config
  27.         #######################################################################
  28.         self.config = load_config(self.configfile) #加载对应的配置文件

  29.         collectors = load_collectors(self.config['server']['collectors_path']) #获取对应的收集对象
  30.         metric_queue_size = int(self.config['server'].get('metric_queue_size',
  31.                                                           16384))
  32.         self.metric_queue = self.manager.Queue(maxsize=metric_queue_size) #创建进程可用的队列
  33.         self.log.debug('metric_queue_size: %d', metric_queue_size)

  34.         if 'handlers_path' in self.config['server']:
  35.             handlers_path = self.config['server']['handlers_path']

  36.             # Make an list if not one
  37.             if isinstance(handlers_path, basestring):
  38.                 handlers_path = handlers_path.split(',')
  39.                 handlers_path = map(str.strip, handlers_path)
  40.                 self.config['server']['handlers_path'] = handlers_path

  41.             load_include_path(handlers_path)

  42.         if 'handlers' not in self.config['server']:
  43.             self.log.critical('handlers missing from server section in config')
  44.             sys.exit(1)

  45.         handlers = self.config['server'].get('handlers')
  46.         if isinstance(handlers, basestring):
  47.             handlers = [handlers]

  48.         # Prevent the Queue Handler from being a normal handler
  49.         if 'diamond.handler.queue.QueueHandler' in handlers:
  50.             handlers.remove('diamond.handler.queue.QueueHandler')

  51.         self.handlers = load_handlers(self.config, handlers) #加载对应的处理器

  52.         QueueHandler = load_dynamic_class(
  53.             'diamond.handler.queue.QueueHandler',
  54.             Handler
  55.         ) #获取一个处理器队列

  56.         self.handler_queue = QueueHandler(
  57.             config=self.config, queue=self.metric_queue, log=self.log)

  58.         process = multiprocessing.Process( #只产生一个进程
  59.             name="Handlers",
  60.             target=handler_process, # 参数为handler_process,调用handler处理对应的发送数据
  61.             args=(self.handlers, self.metric_queue, self.log), #对应的参数如下
  62.         )

  63.         process.daemon = True #设置为daemon函数
  64.         process.start() #启动对应的进程

  65.         if hasattr(signal, 'SIGHUP'):
  66.             signal.signal(signal.SIGHUP, signal_to_exception) #安装对应的异常处理函数

  67.         #######################################################################

  68.         while True:
  69.             try:
  70.                 active_children = multiprocessing.active_children() #获取所有正常运行的子进程
  71.                 running_processes = []
  72.                 for process in active_children:
  73.                     running_processes.append(process.name) #保存子进程的名字
  74.                 running_processes = set(running_processes)

  75.                 ##############################################################
  76.                 # Collectors
  77.                 ##############################################################

  78.                 running_collectors = [] #获取默认启动的Collecter
  79.                 for collector, config in self.config['collectors'].iteritems():
  80.                     if config.get('enabled', False) is not True:
  81.                         continue
  82.                     running_collectors.append(collector) #添加启动的Collecter
  83.                 running_collectors = set(running_collectors)

  84.                 # Collectors that are running but shouldn't be
  85.                 for process_name in running_processes - running_collectors: #关闭不再处理的Collecter进程
  86.                     if 'Collector' not in process_name:
  87.                         continue
  88.                     for process in active_children:
  89.                         if process.name == process_name:
  90.                             process.terminate() #终止对应的进程

  91.                 collector_classes = dict(
  92.                     (cls.__name__.split('.')[-1], cls) #键值和对应的cls
  93.                     for cls in collectors.values()
  94.                 )

  95.                 load_delay = self.config['server'].get('collectors_load_delay',
  96.                                                        1.0)
  97.                 for process_name in running_collectors - running_processes: #当前还没有运行的子进程
  98.                     # To handle running multiple collectors concurrently, we
  99.                     # split on white space and use the first word as the
  100.                     # collector name to spin
  101.                     collector_name = process_name.split()[0]

  102.                     if 'Collector' not in collector_name:
  103.                         continue

  104.                     if collector_name not in collector_classes:
  105.                         self.log.error('Can not find collector %s',
  106.                                        collector_name)
  107.                         continue

  108.                     collector = initialize_collector(
  109.                         collector_classes[collector_name],
  110.                         name=process_name,
  111.                         configfile=self.configfile,
  112.                         handlers=[self.handler_queue]) #初始化对应的Collecter

  113.                     if collector is None:
  114.                         self.log.error('Failed to load collector %s',
  115.                                        process_name)
  116.                         continue

  117.                     # Splay the loads
  118.                     time.sleep(float(load_delay))

  119.                     process = multiprocessing.Process(
  120.                         name=process_name,
  121.                         target=collector_process, #对应的Collecter的处理函数
  122.                         args=(collector, self.metric_queue, self.log)
  123.                     ) #创建对应的子进程,即对应到对应的Collecter-name
  124.                     process.daemon = True #设置为deamon的形式
  125.                     process.start() #加载对应的进程

  126.                 ##############################################################

  127.                 time.sleep(1)

  128.             except SIGHUPException: #如果出现了SIGHUPException则进程重新加载配置的操作,然后重新定义信号
  129.                 # ignore further SIGHUPs for now
  130.                 original_sighup_handler = signal.getsignal(signal.SIGHUP)
  131.                 signal.signal(signal.SIGHUP, signal.SIG_IGN)

  132.                 self.log.info('Reloading state due to HUP')
  133.                 self.config = load_config(self.configfile)
  134.                 collectors = load_collectors(
  135.                     self.config['server']['collectors_path']

在管理进程中主要是进行进程的管理。而在子进程的操作非常的简单,主要是调用组件提供的脚本,进行对应的处理,如下所示:

点击(此处)折叠或打开

  1. def collector_process(collector, metric_queue, log): #Collecter的处理函数
  2.     """
  3.     """
  4.     proc = multiprocessing.current_process()
  5.     if setproctitle:
  6.         setproctitle('%s - %s' % (getproctitle(), proc.name))

  7.     signal.signal(signal.SIGALRM, signal_to_exception)
  8.     signal.signal(signal.SIGHUP, signal_to_exception)
  9.     signal.signal(signal.SIGUSR2, signal_to_exception)

  10.     interval = float(collector.config['interval'])
  11.     if interval <= 0:
  12.         log.critical('interval of %s is not valid!', interval)
  13.         sys.exit(1)
  14.     next_window = math.floor(time.time() / interval) * interval
  15.     stagger_offset = random.uniform(0, interval - 1)

  16.     max_time = int(max(interval - stagger_offset, 1))
  17.     log.debug('Max collection time: %s seconds', max_time)

  18.     sys.stdout = open(os.devnull, 'w')
  19.     sys.stderr = open(os.devnull, 'w')

  20.     while(True):
  21.         try:
  22.             time_to_sleep = (next_window + stagger_offset) - time.time()
  23.             if time_to_sleep > 0:
  24.                 time.sleep(time_to_sleep)
  25.             elif time_to_sleep < 0:
  26.                 # clock has jumped, lets skip missed intervals
  27.                 next_window = time.time()

  28.             next_window += interval

  29.             # Ensure collector run times fit into the collection window
  30.             signal.alarm(max_time)

  31.             #
  32.             collector._run() #调用采集对象的函数

  33.             # Disable the alarm
  34.             signal.alarm(0)

  35.         except SIGALRMException:
  36.             log.error('Took too long to run! Killed!')

  37.             # Adjust the stagger_offset to allow for more time to run the
  38.             # collector
  39.             stagger_offset = stagger_offset * 0.9

  40.             max_time = int(max(interval - stagger_offset, 1))
  41.             log.debug('Max collection time: %s seconds', max_time)

  42.         except SIGHUPException:
  43.             # Reload the config if requested
  44.             # We must first disable the alarm as we don't want it to interrupt
  45.             # us and end up with half a loaded config
  46.             signal.alarm(0) #重新设置对应的定时器

  47.             log.info('Reloading config reload due to HUP')
  48.             collector.load_config() #重新加载配置文件
  49.             log.info('Config reloaded')

  50.         except Exception:
  51.             log.exception('Collector
而handler的处理函数如下所示:

点击(此处)折叠或打开

  1. def handler_process(handlers, metric_queue, log): #处理函数的处理函数
  2.     proc = multiprocessing.current_process()
  3.     if setproctitle:
  4.         setproctitle('%s - %s' % (getproctitle(), proc.name))

  5.     log.debug('Starting process %s', proc.name)

  6.     while(True):
  7.         metric = metric_queue.get(block=True, timeout=None) #从队列中取出对应的指标
  8.         for handler in handlers:
  9.             if metric is not None:
  10.                 handler._process(metric) #调用具体handler的_process(), 该函数调用了子类的process
  11.             else:
  12.                 handler._flush()

接下来就是对应的采集类实现:

点击(此处)折叠或打开

  1. class Collector(object):
  2.     """
  3.     The Collector class is a base class for all metric collectors.
  4.     """

  5.     def __init__(self, config=None, handlers=[], name=None, configfile=None):
  6.         """
  7.         Create a new instance of the Collector class
  8.         """
  9.         # Initialize Logger
  10.         self.log = logging.getLogger('diamond')
  11.         # Initialize Members
  12.         if name is None:
  13.             self.name = self.__class__.__name__
  14.         else:
  15.             self.name = name

  16.         self.handlers = handlers
  17.         self.last_values = {}

  18.         self.configfile = None
  19.         self.load_config(configfile, config)

  20.     ...

  21.     def collect(self): #待组件实现的函数
  22.         """
  23.         Default collector method
  24.         """
  25.         raise NotImplementedError()

  26.     def publish(self, name, value, raw_value=None, precision=0,
  27.                 metric_type='GAUGE', instance=None): #数据的发布操作
  28.         """
  29.         Publish a metric with the given name
  30.         """
  31.         # Check whitelist/blacklist
  32.         if self.config['metrics_whitelist']:
  33.             if not self.config['metrics_whitelist'].match(name):
  34.                 return
  35.         elif self.config['metrics_blacklist']:
  36.             if self.config['metrics_blacklist'].match(name):
  37.                 return

  38.         # Get metric Path
  39.         path = self.get_metric_path(name, instance=instance)

  40.         # Get metric TTL
  41.         ttl = float(self.config['interval']) * float(
  42.             self.config['ttl_multiplier'])

  43.         # Create Metric
  44.         try:
  45.             metric = Metric(path, value, raw_value=raw_value, timestamp=None,
  46.                             precision=precision, host=self.get_hostname(),
  47.                             metric_type=metric_type, ttl=ttl)
  48.         except DiamondException:
  49.             self.log.error(('Error when creating new Metric: path=%r, '
  50.                             'value=%r'), path, value)
  51.             raise

  52.         # Publish Metric
  53.         self.publish_metric(metric)

  54.     def publish_metric(self, metric):
  55.         """
  56.         Publish a Metric object
  57.         """
  58.         # Process Metric
  59.         for handler in self.handlers:
  60.             handler._process(metric) #具体的handler的处理函数

  61.     def _run(self):
  62.         """
  63.         Run the collector unless it's already running
  64.         """
  65.         try:
  66.             start_time = time.time()

  67.             # Collect Data
  68.             self.collect() #调用组件的实现函数

  69.             end_time = time.time()
  70.             collector_time = int((end_time - start_time) * 1000)

  71.             self.log.debug('Collection took %s ms', collector_time)

  72.             if 'measure_collector_time' in self.config:
  73.                 if self.config['measure_collector_time']:
  74.                     metric_name = 'collector_time_ms'
  75.                     metric_value = collector_time
  76.                     self.publish(metric_name, metric_value)
  77.         finally:
  78.             # After collector run, invoke a flush
  79.             # method on each handler.
  80.             for handler in self.handlers:
  81.                 handler._flush()
从上述的代码基本可知,只需要在组件脚本中集成Collector类,然后实现对应的collect()函数即可。具体可以ceph的collect为例进行分析:

点击(此处)折叠或打开

  1. class CephCollector(diamond.collector.Collector):

  2.     def get_default_config_help(self):
  3.         config_help = super(CephCollector, self).get_default_config_help()
  4.         config_help.update({
  5.             'socket_path': 'The location of the ceph monitoring sockets.'
  6.                            ' Defaults to "/var/run/ceph"',
  7.             'socket_prefix': 'The first part of all socket names.'
  8.                              ' Defaults to "ceph-"',
  9.             'socket_ext': 'Extension for socket filenames.'
  10.                           ' Defaults to "asok"',
  11.             'ceph_binary': 'Path to "ceph" executable. '
  12.                            'Defaults to /usr/bin/ceph.',
  13.         })
  14.         return config_help

  15.     def get_default_config(self):
  16.         """
  17.         Returns the default collector settings
  18.         """
  19.         config = super(CephCollector, self).get_default_config()
  20.         config.update({
  21.             'socket_path': '/var/run/ceph',
  22.             'socket_prefix': 'ceph-',
  23.             'socket_ext': 'asok',
  24.             'ceph_binary': '/usr/bin/ceph',
  25.         })
  26.         return config

  27.     def _get_socket_paths(self):
  28.         """Return a sequence of paths to sockets for communicating
  29.         with ceph daemons.
  30.         """
  31.         socket_pattern = os.path.join(self.config['socket_path'],
  32.                                       (self.config['socket_prefix'] +
  33.                                        '*.' + self.config['socket_ext']))
  34.         return glob.glob(socket_pattern)

  35.     def _get_counter_prefix_from_socket_name(self, name):
  36.         """Given the name of a UDS socket, return the prefix
  37.         for counters coming from that source.
  38.         """
  39.         base = os.path.splitext(os.path.basename(name))[0]
  40.         if base.startswith(self.config['socket_prefix']):
  41.             base = base[len(self.config['socket_prefix']):]
  42.         return 'ceph.' + base

  43.     def _get_stats_from_socket(self, name):
  44.         """Return the parsed JSON data returned when ceph is told to
  45.         dump the stats from the named socket.

  46.         In the event of an error error, the exception is logged, and
  47.         an empty result set is returned.
  48.         """
  49.         try:
  50.             json_blob = subprocess.check_output(
  51.                 [self.config['ceph_binary'],
  52.                  '--admin-daemon',
  53.                  name,
  54.                  'perf',
  55.                  'dump',
  56.                  ])
  57.         except subprocess.CalledProcessError, err:
  58.             self.log.info('Could not get stats from %s: %s',
  59.                           name, err)
  60.             self.log.exception('Could not get stats from %s' % name)
  61.             return {}

  62.         try:
  63.             json_data = json.loads(json_blob)
  64.         except Exception, err:
  65.             self.log.info('Could not parse stats from %s: %s',
  66.                           name, err)
  67.             self.log.exception('Could not parse stats from %s' % name)
  68.             return {}

  69.         return json_data

  70.     def _publish_stats(self, counter_prefix, stats):
  71.         """Given a stats dictionary from _get_stats_from_socket,
  72.         publish the individual values.
  73.         """
  74.         for stat_name, stat_value in flatten_dictionary(
  75.             stats,
  76.             prefix=counter_prefix,
  77.         ):
  78.             self.publish_gauge(stat_name, stat_value)

  79.     def collect(self):
  80.         """
  81.         Collect stats
  82.         """
  83.         for path in self._get_socket_paths():
  84.             self.log.debug('checking %s', path)
  85.             counter_prefix = self._get_counter_prefix_from_socket_name(path)
  86.             stats = self._get_stats_from_socket(path)
  87.             self._publish_stats(counter_prefix, stats)
  88.         return
从上述代码可知,从collector类继承,然后实现collect()接口。

有关handler类的实现比较多,目前支持多种格式,这里就不去进行进一步的分析,也是通过Handler基类进行继承的方式实现。


阅读(4930) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~