Source code for collectd_rabbitmq.rabbit
# -*- coding: iso-8859-15 -*-
# Copyright (c) 2014 The New York Times Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
python plugin for collectd to obtain rabbitmq stats
"""
import collectd
import json
import urllib
import urllib2
[docs]class RabbitMQStats(object):
"""
Class to interface with the RabbitMQ API.
"""
def __init__(self, config):
self.config = config
self.api = "{0}/api".format(self.config.connection.url)
@staticmethod
[docs] def get_names(items):
"""
Return URL encoded names.
"""
collectd.debug("Getting names for %s" % items)
names = list()
for item in items:
name = item.get('name', None)
if name:
name = urllib.quote(name, '')
names.append(name)
return names
[docs] def get_info(self, *args):
"""
return JSON object from URL.
"""
url = "{0}/{1}".format(self.api, '/'.join(args))
collectd.debug("Getting info for %s" % url)
auth_handler = urllib2.HTTPBasicAuthHandler()
auth_handler.add_password(realm=self.config.auth.realm,
uri=self.api,
user=self.config.auth.username,
passwd=self.config.auth.password)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
try:
info = urllib2.urlopen(url)
except urllib2.HTTPError as http_error:
collectd.error("HTTP Error: %s" % http_error)
return None
except urllib2.URLError as url_error:
collectd.error("URL Error: %s" % url_error)
return None
except ValueError as value_error:
collectd.error("Value Error: %s" % value_error)
return None
try:
return_value = json.load(info)
except ValueError as err:
collectd.error("ValueError parsing JSON from %s: %s" % (url, err))
return_value = None
except TypeError as err:
collectd.error("TypeError parsing JSON from %s: %s" % (url, err))
return_value = None
return return_value
[docs] def get_nodes(self):
"""
Return a list of nodes.
"""
return self.get_info("nodes") or list()
nodes = property(get_nodes)
# Vhosts
[docs] def get_vhosts(self):
"""
Returns a list of vhosts.
"""
collectd.debug("Getting a list of vhosts")
return self.get_info("vhosts") or list()
[docs] def get_vhost_names(self):
"""
Returns a list of vhost names.
"""
collectd.debug("Getting vhost names")
all_vhosts = self.get_vhosts()
return self.get_names(all_vhosts) or list()
vhost_names = property(get_vhost_names)
# Exchanges
[docs] def get_exchanges(self, vhost_name=None):
"""
Returns raw exchange data.
"""
collectd.debug("Getting exchanges for %s" % vhost_name)
return self.get_info("exchanges", vhost_name)
[docs] def get_exchange_names(self, vhost_name=None):
"""
Returns a list of all exchange names.
"""
collectd.debug("Getting exchange names for %s" % vhost_name)
all_exchanges = self.get_exchanges(vhost_name)
return self.get_names(all_exchanges)
[docs] def get_exchange_stats(self, exchange_name=None, vhost_name=None):
"""
Returns a dictionary of stats for exchange_name.
"""
collectd.debug("Getting exchange stats for %s in %s" %
(exchange_name, vhost_name))
return self.get_stats('exchange', exchange_name, vhost_name)
# Queues
[docs] def get_queues(self, vhost_name=None):
"""
Returns raw queue data.
"""
collectd.debug("Getting queues for %s" % vhost_name)
return self.get_info("queues", vhost_name)
[docs] def get_queue_names(self, vhost_name=None):
"""
Returns a list of all queue names.
"""
collectd.debug("Getting queue names for %s" % vhost_name)
all_queues = self.get_queues(vhost_name)
return self.get_names(all_queues)
[docs] def get_queue_stats(self, queue_name=None, vhost_name=None):
"""
Returns a dictionary of stats for queue_name.
"""
return self.get_stats('queue', queue_name, vhost_name)
[docs] def get_stats(self, stat_type, stat_name, vhost_name):
"""
Returns a dictionary of stats.
"""
collectd.debug("Getting stats for %s %s%s in %s" %
(stat_name or 'all',
stat_type,
's' if not stat_name else '',
vhost_name))
if stat_type not in('exchange', 'queue'):
raise ValueError("Unsupported stat type {0}".format(stat_type))
stat_name_func = getattr(self, 'get_{0}_names'.format(stat_type))
if not vhost_name:
vhosts = self.get_vhost_names()
else:
vhosts = [vhost_name]
stats = dict()
for vhost in vhosts:
if not stat_name:
names = stat_name_func(vhost)
else:
names = [stat_name]
for name in names:
if not self.config.is_ignored(stat_type, name):
stats[name] = self.get_info("{0}s".format(stat_type),
vhost,
name)
return stats