分类: Python/Ruby

2016-04-06 23:18:46






  1. ...
  2. # 路劲加载
  3. sys.path.append(
  4.     os.path.abspath(
  5.         os.path.join(
  6.             os.path.dirname(__file__), "../")))

  7. class Server(object):
  8.     """
  9.     Server class loads and starts Handlers and Collectors
  10.     """

  11.     def __init__(self, configfile):
  12.         # Initialize Logging
  13.         self.log = logging.getLogger('diamond')
  14.         # Initialize Members
  15.         self.configfile = configfile
  16.         self.config = None
  17.         self.handlers = []
  18.         self.handler_queue = []
  19.         self.modules = {}
  20.         self.metric_queue = None #对应的队列

  21.     def run(self):
  22.         """
  23.         Load handler and collector classes and then start collectors
  24.         """

  25.         #######################################################################
  26.         # Config
  27.         #######################################################################
  28.         self.config = load_config(self.configfile) #加载对应的配置文件

  29.         collectors = load_collectors(self.config['server']['collectors_path']) #获取对应的收集对象
  30.         metric_queue_size = int(self.config['server'].get('metric_queue_size',
  31.                                                           16384))
  32.         self.metric_queue = self.manager.Queue(maxsize=metric_queue_size) #创建进程可用的队列
  33.         self.log.debug('metric_queue_size: %d', metric_queue_size)

  34.         if 'handlers_path' in self.config['server']:
  35.             handlers_path = self.config['server']['handlers_path']

  36.             # Make an list if not one
  37.             if isinstance(handlers_path, basestring):
  38.                 handlers_path = handlers_path.split(',')
  39.                 handlers_path = map(str.strip, handlers_path)
  40.                 self.config['server']['handlers_path'] = handlers_path

  41.             load_include_path(handlers_path)

  42.         if 'handlers' not in self.config['server']:
  43.             self.log.critical('handlers missing from server section in config')
  44.             sys.exit(1)

  45.         handlers = self.config['server'].get('handlers')
  46.         if isinstance(handlers, basestring):
  47.             handlers = [handlers]

  48.         # Prevent the Queue Handler from being a normal handler
  49.         if 'diamond.handler.queue.QueueHandler' in handlers:
  50.             handlers.remove('diamond.handler.queue.QueueHandler')

  51.         self.handlers = load_handlers(self.config, handlers) #加载对应的处理器

  52.         QueueHandler = load_dynamic_class(
  53.             'diamond.handler.queue.QueueHandler',
  54.             Handler
  55.         ) #获取一个处理器队列

  56.         self.handler_queue = QueueHandler(
  57.             config=self.config, queue=self.metric_queue, log=self.log)

  58.         process = multiprocessing.Process( #只产生一个进程
  59.             name="Handlers",
  60.             target=handler_process, # 参数为handler_process,调用handler处理对应的发送数据
  61.             args=(self.handlers, self.metric_queue, self.log), #对应的参数如下
  62.         )

  63.         process.daemon = True #设置为daemon函数
  64.         process.start() #启动对应的进程

  65.         if hasattr(signal, 'SIGHUP'):
  66.             signal.signal(signal.SIGHUP, signal_to_exception) #安装对应的异常处理函数

  67.         #######################################################################

  68.         while True:
  69.             try:
  70.                 active_children = multiprocessing.active_children() #获取所有正常运行的子进程
  71.                 running_processes = []
  72.                 for process in active_children:
  73.                     running_processes.append( #保存子进程的名字
  74.                 running_processes = set(running_processes)

  75.                 ##############################################################
  76.                 # Collectors
  77.                 ##############################################################

  78.                 running_collectors = [] #获取默认启动的Collecter
  79.                 for collector, config in self.config['collectors'].iteritems():
  80.                     if config.get('enabled', False) is not True:
  81.                         continue
  82.                     running_collectors.append(collector) #添加启动的Collecter
  83.                 running_collectors = set(running_collectors)

  84.                 # Collectors that are running but shouldn't be
  85.                 for process_name in running_processes - running_collectors: #关闭不再处理的Collecter进程
  86.                     if 'Collector' not in process_name:
  87.                         continue
  88.                     for process in active_children:
  89.                         if == process_name:
  90.                             process.terminate() #终止对应的进程

  91.                 collector_classes = dict(
  92.                     (cls.__name__.split('.')[-1], cls) #键值和对应的cls
  93.                     for cls in collectors.values()
  94.                 )

  95.                 load_delay = self.config['server'].get('collectors_load_delay',
  96.                                                        1.0)
  97.                 for process_name in running_collectors - running_processes: #当前还没有运行的子进程
  98.                     # To handle running multiple collectors concurrently, we
  99.                     # split on white space and use the first word as the
  100.                     # collector name to spin
  101.                     collector_name = process_name.split()[0]

  102.                     if 'Collector' not in collector_name:
  103.                         continue

  104.                     if collector_name not in collector_classes:
  105.                         self.log.error('Can not find collector %s',
  106.                                        collector_name)
  107.                         continue

  108.                     collector = initialize_collector(
  109.                         collector_classes[collector_name],
  110.                         name=process_name,
  111.                         configfile=self.configfile,
  112.                         handlers=[self.handler_queue]) #初始化对应的Collecter

  113.                     if collector is None:
  114.                         self.log.error('Failed to load collector %s',
  115.                                        process_name)
  116.                         continue

  117.                     # Splay the loads
  118.                     time.sleep(float(load_delay))

  119.                     process = multiprocessing.Process(
  120.                         name=process_name,
  121.                         target=collector_process, #对应的Collecter的处理函数
  122.                         args=(collector, self.metric_queue, self.log)
  123.                     ) #创建对应的子进程,即对应到对应的Collecter-name
  124.                     process.daemon = True #设置为deamon的形式
  125.                     process.start() #加载对应的进程

  126.                 ##############################################################

  127.                 time.sleep(1)

  128.             except SIGHUPException: #如果出现了SIGHUPException则进程重新加载配置的操作,然后重新定义信号
  129.                 # ignore further SIGHUPs for now
  130.                 original_sighup_handler = signal.getsignal(signal.SIGHUP)
  131.                 signal.signal(signal.SIGHUP, signal.SIG_IGN)

  132.       'Reloading state due to HUP')
  133.                 self.config = load_config(self.configfile)
  134.                 collectors = load_collectors(
  135.                     self.config['server']['collectors_path']



  1. def collector_process(collector, metric_queue, log): #Collecter的处理函数
  2.     """
  3.     """
  4.     proc = multiprocessing.current_process()
  5.     if setproctitle:
  6.         setproctitle('%s - %s' % (getproctitle(),

  7.     signal.signal(signal.SIGALRM, signal_to_exception)
  8.     signal.signal(signal.SIGHUP, signal_to_exception)
  9.     signal.signal(signal.SIGUSR2, signal_to_exception)

  10.     interval = float(collector.config['interval'])
  11.     if interval <= 0:
  12.         log.critical('interval of %s is not valid!', interval)
  13.         sys.exit(1)
  14.     next_window = math.floor(time.time() / interval) * interval
  15.     stagger_offset = random.uniform(0, interval - 1)

  16.     max_time = int(max(interval - stagger_offset, 1))
  17.     log.debug('Max collection time: %s seconds', max_time)

  18.     sys.stdout = open(os.devnull, 'w')
  19.     sys.stderr = open(os.devnull, 'w')

  20.     while(True):
  21.         try:
  22.             time_to_sleep = (next_window + stagger_offset) - time.time()
  23.             if time_to_sleep > 0:
  24.                 time.sleep(time_to_sleep)
  25.             elif time_to_sleep < 0:
  26.                 # clock has jumped, lets skip missed intervals
  27.                 next_window = time.time()

  28.             next_window += interval

  29.             # Ensure collector run times fit into the collection window
  30.             signal.alarm(max_time)

  31.             #
  32.             collector._run() #调用采集对象的函数

  33.             # Disable the alarm
  34.             signal.alarm(0)

  35.         except SIGALRMException:
  36.             log.error('Took too long to run! Killed!')

  37.             # Adjust the stagger_offset to allow for more time to run the
  38.             # collector
  39.             stagger_offset = stagger_offset * 0.9

  40.             max_time = int(max(interval - stagger_offset, 1))
  41.             log.debug('Max collection time: %s seconds', max_time)

  42.         except SIGHUPException:
  43.             # Reload the config if requested
  44.             # We must first disable the alarm as we don't want it to interrupt
  45.             # us and end up with half a loaded config
  46.             signal.alarm(0) #重新设置对应的定时器

  47.   'Reloading config reload due to HUP')
  48.             collector.load_config() #重新加载配置文件
  49.   'Config reloaded')

  50.         except Exception:
  51.             log.exception('Collector


  1. def handler_process(handlers, metric_queue, log): #处理函数的处理函数
  2.     proc = multiprocessing.current_process()
  3.     if setproctitle:
  4.         setproctitle('%s - %s' % (getproctitle(),

  5.     log.debug('Starting process %s',

  6.     while(True):
  7.         metric = metric_queue.get(block=True, timeout=None) #从队列中取出对应的指标
  8.         for handler in handlers:
  9.             if metric is not None:
  10.                 handler._process(metric) #调用具体handler的_process(), 该函数调用了子类的process
  11.             else:
  12.                 handler._flush()



  1. class Collector(object):
  2.     """
  3.     The Collector class is a base class for all metric collectors.
  4.     """

  5.     def __init__(self, config=None, handlers=[], name=None, configfile=None):
  6.         """
  7.         Create a new instance of the Collector class
  8.         """
  9.         # Initialize Logger
  10.         self.log = logging.getLogger('diamond')
  11.         # Initialize Members
  12.         if name is None:
  13.    = self.__class__.__name__
  14.         else:
  15.    = name

  16.         self.handlers = handlers
  17.         self.last_values = {}

  18.         self.configfile = None
  19.         self.load_config(configfile, config)

  20.     ...

  21.     def collect(self): #待组件实现的函数
  22.         """
  23.         Default collector method
  24.         """
  25.         raise NotImplementedError()

  26.     def publish(self, name, value, raw_value=None, precision=0,
  27.                 metric_type='GAUGE', instance=None): #数据的发布操作
  28.         """
  29.         Publish a metric with the given name
  30.         """
  31.         # Check whitelist/blacklist
  32.         if self.config['metrics_whitelist']:
  33.             if not self.config['metrics_whitelist'].match(name):
  34.                 return
  35.         elif self.config['metrics_blacklist']:
  36.             if self.config['metrics_blacklist'].match(name):
  37.                 return

  38.         # Get metric Path
  39.         path = self.get_metric_path(name, instance=instance)

  40.         # Get metric TTL
  41.         ttl = float(self.config['interval']) * float(
  42.             self.config['ttl_multiplier'])

  43.         # Create Metric
  44.         try:
  45.             metric = Metric(path, value, raw_value=raw_value, timestamp=None,
  46.                             precision=precision, host=self.get_hostname(),
  47.                             metric_type=metric_type, ttl=ttl)
  48.         except DiamondException:
  49.             self.log.error(('Error when creating new Metric: path=%r, '
  50.                             'value=%r'), path, value)
  51.             raise

  52.         # Publish Metric
  53.         self.publish_metric(metric)

  54.     def publish_metric(self, metric):
  55.         """
  56.         Publish a Metric object
  57.         """
  58.         # Process Metric
  59.         for handler in self.handlers:
  60.             handler._process(metric) #具体的handler的处理函数

  61.     def _run(self):
  62.         """
  63.         Run the collector unless it's already running
  64.         """
  65.         try:
  66.             start_time = time.time()

  67.             # Collect Data
  68.             self.collect() #调用组件的实现函数

  69.             end_time = time.time()
  70.             collector_time = int((end_time - start_time) * 1000)

  71.             self.log.debug('Collection took %s ms', collector_time)

  72.             if 'measure_collector_time' in self.config:
  73.                 if self.config['measure_collector_time']:
  74.                     metric_name = 'collector_time_ms'
  75.                     metric_value = collector_time
  76.                     self.publish(metric_name, metric_value)
  77.         finally:
  78.             # After collector run, invoke a flush
  79.             # method on each handler.
  80.             for handler in self.handlers:
  81.                 handler._flush()


  1. class CephCollector(diamond.collector.Collector):

  2.     def get_default_config_help(self):
  3.         config_help = super(CephCollector, self).get_default_config_help()
  4.         config_help.update({
  5.             'socket_path': 'The location of the ceph monitoring sockets.'
  6.                            ' Defaults to "/var/run/ceph"',
  7.             'socket_prefix': 'The first part of all socket names.'
  8.                              ' Defaults to "ceph-"',
  9.             'socket_ext': 'Extension for socket filenames.'
  10.                           ' Defaults to "asok"',
  11.             'ceph_binary': 'Path to "ceph" executable. '
  12.                            'Defaults to /usr/bin/ceph.',
  13.         })
  14.         return config_help

  15.     def get_default_config(self):
  16.         """
  17.         Returns the default collector settings
  18.         """
  19.         config = super(CephCollector, self).get_default_config()
  20.         config.update({
  21.             'socket_path': '/var/run/ceph',
  22.             'socket_prefix': 'ceph-',
  23.             'socket_ext': 'asok',
  24.             'ceph_binary': '/usr/bin/ceph',
  25.         })
  26.         return config

  27.     def _get_socket_paths(self):
  28.         """Return a sequence of paths to sockets for communicating
  29.         with ceph daemons.
  30.         """
  31.         socket_pattern = os.path.join(self.config['socket_path'],
  32.                                       (self.config['socket_prefix'] +
  33.                                        '*.' + self.config['socket_ext']))
  34.         return glob.glob(socket_pattern)

  35.     def _get_counter_prefix_from_socket_name(self, name):
  36.         """Given the name of a UDS socket, return the prefix
  37.         for counters coming from that source.
  38.         """
  39.         base = os.path.splitext(os.path.basename(name))[0]
  40.         if base.startswith(self.config['socket_prefix']):
  41.             base = base[len(self.config['socket_prefix']):]
  42.         return 'ceph.' + base

  43.     def _get_stats_from_socket(self, name):
  44.         """Return the parsed JSON data returned when ceph is told to
  45.         dump the stats from the named socket.

  46.         In the event of an error error, the exception is logged, and
  47.         an empty result set is returned.
  48.         """
  49.         try:
  50.             json_blob = subprocess.check_output(
  51.                 [self.config['ceph_binary'],
  52.                  '--admin-daemon',
  53.                  name,
  54.                  'perf',
  55.                  'dump',
  56.                  ])
  57.         except subprocess.CalledProcessError, err:
  58.   'Could not get stats from %s: %s',
  59.                           name, err)
  60.             self.log.exception('Could not get stats from %s' % name)
  61.             return {}

  62.         try:
  63.             json_data = json.loads(json_blob)
  64.         except Exception, err:
  65.   'Could not parse stats from %s: %s',
  66.                           name, err)
  67.             self.log.exception('Could not parse stats from %s' % name)
  68.             return {}

  69.         return json_data

  70.     def _publish_stats(self, counter_prefix, stats):
  71.         """Given a stats dictionary from _get_stats_from_socket,
  72.         publish the individual values.
  73.         """
  74.         for stat_name, stat_value in flatten_dictionary(
  75.             stats,
  76.             prefix=counter_prefix,
  77.         ):
  78.             self.publish_gauge(stat_name, stat_value)

  79.     def collect(self):
  80.         """
  81.         Collect stats
  82.         """
  83.         for path in self._get_socket_paths():
  84.             self.log.debug('checking %s', path)
  85.             counter_prefix = self._get_counter_prefix_from_socket_name(path)
  86.             stats = self._get_stats_from_socket(path)
  87.             self._publish_stats(counter_prefix, stats)
  88.         return


