# -*- coding: iso-8859-15 -*-
# Copyright (c) 2014 The New York Times Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module controls the interactions with collectd
"""
import collectd
import re
from collectd_rabbitmq import rabbit
from collectd_rabbitmq import utils
CONFIG = None
AUTH = None
CONN = None
PLUGIN = None
[docs]def init():
"""
Creates the logs stash plugin object.
"""
global PLUGIN # pylint: disable=W0603
PLUGIN = CollectdPlugin()
[docs]def read():
"""
Reads and dispatches data.
"""
collectd.info("Reading data from rabbit and dispatching")
if not PLUGIN:
collectd.warning('Plugin not ready')
return
PLUGIN.read()
[docs]class CollectdPlugin(object):
"""
Controls interaction between rabbitmq stats and collectd.
"""
message_stats = ['ack', 'publish', 'publish_in', 'publish_out', 'confirm',
'deliver', 'deliver_noack', 'get', 'get_noack',
'deliver_get', 'redeliver', 'return']
message_details = ['avg', 'avg_rate', 'rate', 'sample']
node_stats = ['disk_free', 'disk_free_limit', 'fd_total',
'fd_used', 'mem_limit', 'mem_used',
'proc_total', 'proc_used', 'processors', 'run_queue',
'sockets_total', 'sockets_used']
def __init__(self):
self.rabbit = rabbit.RabbitMQStats(CONFIG)
[docs] def read(self):
"""
Dispatches values to collectd.
"""
self.dispatch_nodes()
for vhost_name in self.rabbit.vhost_names:
self.dispatch_exchanges(vhost_name)
self.dispatch_queues(vhost_name)
@staticmethod
[docs] def generate_vhost_name(name):
"""
Generate a "normalized" vhost name without /.
"""
if not name or name == '/':
name = 'default'
else:
name = re.sub(r'^/', 'slash_', name)
name = re.sub(r'/$', '_slash', name)
name = re.sub(r'/', '_slash_', name)
return 'rabbitmq_%s' % name
[docs] def dispatch_message_stats(self, data, vhost, plugin, plugin_instance):
"""
Sends message stats to collectd.
"""
if not data:
collectd.debug("No data for %s in vhost %s" % (plugin, vhost))
return
vhost = self.generate_vhost_name(vhost)
for name in self.message_stats:
if 'message_stats' not in data:
return
collectd.debug("Dispatching stat %s for %s in %s" %
(name, plugin_instance, vhost))
value = data['message_stats'].get(name, 0)
self.dispatch_values(value, vhost, plugin, plugin_instance, name)
details = data['message_stats'].get("%s_details" % name, None)
if not details:
continue
for detail in self.message_details:
self.dispatch_values(
(details.get(detail, 0)), vhost, plugin, plugin_instance,
"%s_details" % name, detail)
[docs] def dispatch_nodes(self):
"""
Dispatches nodes stats.
"""
stats = self.rabbit.get_nodes()
for node in stats:
name = node['name'].split('@')[1]
collectd.debug("Getting stats for %s node" % name)
for stat_name in self.node_stats:
value = node.get(stat_name, 0)
self.dispatch_values(value, name, 'rabbitmq', None, stat_name)
details = node.get("%s_details" % stat_name, None)
if not details:
continue
for detail in self.message_details:
value = details.get(detail, 0)
self.dispatch_values(value, name, 'rabbitmq', None,
"%s_details" % stat_name, detail)
[docs] def dispatch_exchanges(self, vhost_name):
"""
Dispatches exchange data for vhost_name.
"""
collectd.debug("Dispatching exchange data for {0}".format(vhost_name))
stats = self.rabbit.get_exchange_stats(vhost_name=vhost_name)
for exchange_name, value in stats.iteritems():
self.dispatch_message_stats(value, vhost_name, 'exchanges',
exchange_name)
[docs] def dispatch_queues(self, vhost_name):
"""
Dispatches queue data for vhost_name.
"""
collectd.debug("Dispatching queue data for {0}".format(vhost_name))
stats = self.rabbit.get_queue_stats(vhost_name=vhost_name)
for exchange_name, value in stats.iteritems():
self.dispatch_message_stats(value, vhost_name, 'queues',
exchange_name)
# pylint: disable=R0913
@staticmethod
[docs] def dispatch_values(values, host, plugin, plugin_instance,
metric_type, type_instance=None):
"""
Dispatch metrics to collectd.
:param values (tuple or list): The values to dispatch. It will be
coerced into a list.
:param host: (str): The name of the vhost.
:param plugin (str): The name of the plugin. Should be
queue/exchange.
:param plugin_instance (str): The queue/exchange name.
:param metric_type: (str): The name of metric.
:param type_instance: Optional.
"""
path = "{0}.{1}.{2}.{3}.{4}".format(host, plugin,
plugin_instance,
metric_type, type_instance)
collectd.debug("Dispatching %s values: %s" % (path, values))
metric = collectd.Values()
metric.host = host
metric.plugin = plugin
if plugin_instance:
metric.plugin_instance = plugin_instance
metric.type = metric_type
if type_instance:
metric.type_instance = type_instance
if utils.is_sequence(values):
metric.values = values
else:
metric.values = [values]
# Tiny hack to fix bug with write_http plugin in Collectd
# versions < 5.5.
# See https://github.com/phobos182/collectd-elasticsearch/issues/15
# for details
metric.meta = {'0': True}
metric.dispatch()
# Register callbacks
collectd.register_config(configure)
collectd.register_init(init)
collectd.register_read(read)