[ADMIN_API] Merge in statsd

Also adds --number_of_servers and --server_id so that we can cycle which
server does the pings

Also fix problem with database layer being loaded too early

Change-Id: Iff1ebc70b3838bbc1559c9e0bcc730fc78b1684b
This commit is contained in:
Andrew Hutchings 2013-07-26 14:54:06 +01:00
parent d4b3b05063
commit 5824438ec7
25 changed files with 353 additions and 633 deletions

4
README
View File

@ -11,10 +11,6 @@ Tools
Python daemon that manages a pool of Nova instances.
* libra_statsd
Python daemon to receive load balancer statistics from Libra workers.
* libra_worker
Python daemon that will receive messages from an API server via

View File

@ -54,3 +54,72 @@ Command Line Options
The path for the SSL key file to be used for the frontend of the API
server
.. option:: --gearman_ssl_ca <PATH>
The path for the Gearman SSL Certificate Authority.
.. option:: --gearman_ssl_cert <PATH>
The path for the Gearman SSL certificate.
.. option:: --gearman_ssl_key <PATH>
The path for the Gearman SSL key.
.. option:: --gearman <HOST:PORT>
Used to specify the Gearman job server hostname and port. This option
can be used multiple times to specify multiple job servers
.. option:: --stats_driver <DRIVER LIST>
The drivers to be used for alerting. This option can be used multiple
times to specift multiple drivers.
.. option:: --stats_ping_timeout <PING_INTERVAL>
How often to run a ping check of load balancers (in seconds), default 60
.. option:: --stats_poll_timer <POLL_INTERVAL>
How long to wait until we consider the initial ping check failed and
send a second ping. Default is 5 seconds.
.. option:: --stats_poll_timeout_retry <POLL_INTERVAL>
How long to wait until we consider the second and final ping check
failed. Default is 30 seconds.
.. option:: --stats_repair_timer <REPAIR_INTERVAL>
How often to run a check to see if damaged load balancers had been
repaired (in seconds), default 180
.. option:: --number_of_servers <NUMBER_OF_SERVER>
The number of Admin API servers in the system.
Used to calculate which Admin API server should stats ping next
.. option:: --server_id <SERVER_ID>
The server ID of this server, used to calculate which Admin API
server should stats ping next (start at 0)
.. option:: --datadog_api_key <KEY>
The API key to be used for the datadog driver
.. option:: --datadog_app_key <KEY>
The Application key to be used for the datadog driver
.. option:: --datadog_message_tail <TEXT>
Some text to add at the end of an alerting message such as a list of
users to alert (using @user@email.com format), used for the datadog
driver.
.. option:: --datadog_tags <TAGS>
A list of tags to be used for the datadog driver

View File

@ -1,24 +0,0 @@
Description
===========
Purpose
-------
The Libra Statsd is a monitoring system for the health of load balancers. It
can query many load balancers in parallel and supports a plugable architecture
for different methods of reporting.
Design
------
Statsd currently only does an advanced "ping" style monitoring. By default it
will get a list of ONLINE load balancers from the API server and will send a
gearman message to the worker of each one. The worker tests its own HAProxy
instance and will report a success/fail. If there is a failure or the gearman
message times-out then this is sent to the alerting backends. There is a
further secheduled run set to every three minutes which will re-test the failed
devices to see if they have been repair. If they have this will trigger a
'repaired' notice.
Alerting is done using a plugin system which can have multiple plugins enabled
at the same time.

View File

@ -1,87 +0,0 @@
Statsd Configuration
====================
These options are specific to statsd in addition to the
:doc:`common options </config>`.
Configuration File
------------------
The ``[statsd]`` section is specific to the libra_statsd utility. Below is
an example:
.. code-block:: ini
[statsd]
Command Line Options
--------------------
.. program:: libra_statsd
.. option:: --api_server <HOST:PORT>
The hostname/IP and port colon separated for use with the HP REST API
driver. Can be specified multiple times for multiple servers. This
option is also used for the hp_rest alerting driver.
.. option:: --gearman_ssl_ca <PATH>
The path for the Gearman SSL Certificate Authority.
.. option:: --gearman_ssl_cert <PATH>
The path for the Gearman SSL certificate.
.. option:: --gearman_ssl_key <PATH>
The path for the Gearman SSL key.
.. option:: --server <HOST:PORT>
Used to specify the Gearman job server hostname and port. This option
can be used multiple times to specify multiple job servers
.. option:: --driver <DRIVER LIST>
The drivers to be used for alerting. This option can be used multiple
times to specift multiple drivers.
.. option:: --ping_interval <PING_INTERVAL>
How often to run a ping check of load balancers (in seconds), default 60
.. option:: --poll_interval <POLL_INTERVAL>
How long to wait until we consider the initial ping check failed and
send a second ping. Default is 5 seconds.
.. option:: --poll_interval_retry <POLL_INTERVAL>
How long to wait until we consider the second and final ping check
failed. Default is 30 seconds.
.. option:: --repair_interval <REPAIR_INTERVAL>
How often to run a check to see if damaged load balancers had been
repaired (in seconds), default 180
.. option:: --datadog_api_key <KEY>
The API key to be used for the datadog driver
.. option:: --datadog_app_key <KEY>
The Application key to be used for the datadog driver
.. option:: --datadog_message_tail <TEXT>
Some text to add at the end of an alerting message such as a list of
users to alert (using @user@email.com format), used for the datadog
driver.
.. option:: --datadog_tags <TAGS>
A list of tags to be used for the datadog driver

View File

@ -1,56 +0,0 @@
Statsd Drivers
==============
Introduction
------------
Statsd has a small driver API to be used for alerting. Multiple drivers can
be loaded at the same time to alert in multiple places.
Design
------
The base class called ``AlertDriver`` is used to create new drivers. These
will be supplied ``self.logger`` to use for logging and ``self.args`` which
contains the arguments supplied to statsd. Drivers using this need to
supply two functions:
.. py:class:: AlertDriver
.. py:method:: send_alert(message, device_id)
:param message: A message with details of the failure
:param device_id: The ID of the device that has failed
.. py:method:: send_repair(message, device_id)
:param message: A message with details of the recovered load balancer
:param device_id: The ID of the device that has been recovered
.. py:data:: known_drivers
This is the dictionary that maps values for the
:option:`--driver <libra_statsd.py --driver>` option
to a class implementing the driver :py:class:`~AlertDriver` API
for the statsd server. After implementing a new driver class, you simply add
a new entry to this dictionary to make it a selectable option.
Dummy Driver
------------
This driver is used for simple testing/debugging. It echos the message details
into statsd's log file.
Datadog Driver
--------------
The Datadog driver uses the Datadog API to send alerts into the Datadog event
stream. Alerts are sent as 'ERROR' and repairs as 'SUCCESS'.
HP REST Driver
--------------
This sends messages to the HP REST API server to mark nodes as ERROR/ONLINE.

View File

@ -1,9 +0,0 @@
Libra Statsd Monitoring Daemon
==============================
.. toctree::
:maxdepth: 2
about
config
drivers

View File

@ -21,8 +21,11 @@ import pwd
import pecan
import sys
import os
from libra.admin_api.stats.scheduler import Stats
from libra.admin_api import config as api_config
from libra.admin_api import model
from libra.admin_api.stats.drivers.base import known_drivers
from libra.openstack.common import importutils
from libra.common.options import Options, setup_logging
from eventlet import wsgi
@ -41,6 +44,12 @@ def setup_app(pecan_config, args):
pecan_config = get_pecan_config()
config = dict(pecan_config)
config['database'] = args.db_sections
config['gearman'] = {
'server': args.gearman,
'ssl_key': args.gearman_ssl_key,
'ssl_cert': args.gearman_ssl_cert,
'ssl_ca': args.gearman_ssl_ca
}
config['conffile'] = args.config
if args.debug:
config['wsme'] = {'debug': True}
@ -93,9 +102,89 @@ def main():
'--ssl_keyfile',
help='Path to an SSL key file'
)
options.parser.add_argument(
'--gearman', action='append', metavar='HOST:PORT', default=[],
help='Gearman job servers'
)
options.parser.add_argument(
'--gearman_ssl_ca', metavar='FILE',
help='Gearman SSL certificate authority'
)
options.parser.add_argument(
'--gearman_ssl_cert', metavar='FILE',
help='Gearman SSL certificate'
)
options.parser.add_argument(
'--gearman_ssl_key', metavar='FILE',
help='Gearman SSL key'
)
options.parser.add_argument(
'--stats_driver',
choices=known_drivers.keys(), default='dummy',
help='type of stats device to use'
)
options.parser.add_argument(
'--stats_ping_timer', type=int, default=60,
help='how often to ping load balancers (in seconds)'
)
options.parser.add_argument(
'--stats_poll_timeout', type=int, default=5,
help='gearman timeout value for initial ping request (in seconds)'
)
options.parser.add_argument(
'--stats_poll_timeout_retry', type=int, default=30,
help='gearman timeout value for retry ping request (in seconds)'
)
options.parser.add_argument(
'--stats_repair_timer', type=int, default=180,
help='how often to check if a load balancer has been repaired (in '
'seconds)'
)
options.parser.add_argument(
'--number_of_servers', type=int, default=1,
help='number of Admin API servers, used to calculate which Admin API '
'server should stats ping next'
)
options.parser.add_argument(
'--server_id', type=int, default=0,
help='server ID of this server, used to calculate which Admin API '
'server should stats ping next (start at 0)'
)
# Datadog plugin options
options.parser.add_argument(
'--datadog_api_key', help='API key for datadog alerting'
)
options.parser.add_argument(
'--datadog_app_key', help='Application key for datadog alerting'
)
options.parser.add_argument(
'--datadog_message_tail',
help='Text to add at the end of a Datadog alert'
)
options.parser.add_argument(
'--datadog_tags',
help='A space separated list of tags for Datadog alerts'
)
options.parser.add_argument(
'--datadog_env', default='unknown',
help='Server enironment'
)
args = options.run()
drivers = []
if not args.gearman:
# NOTE(shrews): Can't set a default in argparse method because the
# value is appended to the specified default.
args.gearman.append('localhost:4730')
elif not isinstance(args.gearman, list):
# NOTE(shrews): The Options object cannot intelligently handle
# creating a list from an option that may have multiple values.
# We convert it to the expected type here.
svr_list = args.gearman.split()
args.gearman = svr_list
required_args = ['db_sections', 'ssl_certfile', 'ssl_keyfile']
missing_args = 0
@ -132,6 +221,13 @@ def main():
logger = setup_logging('', args)
logger.info('Starting on {0}:{1}'.format(args.host, args.port))
api = setup_app(pc, args)
if not isinstance(args.stats_driver, list):
args.stats_driver = args.stats_driver.split()
for driver in args.stats_driver:
drivers.append(importutils.import_class(
known_drivers[driver]
))
Stats(logger, args, drivers)
sys.stderr = LogStdout(logger)
# TODO: set ca_certs and cert_reqs=CERT_REQUIRED
ssl_sock = eventlet.wrap_ssl(

View File

@ -24,38 +24,6 @@ from pecan import conf
import logging
config = ConfigParser.SafeConfigParser()
config.read([conf.conffile])
engines = []
for section in conf.database:
db_conf = config._sections[section]
conn_string = '''mysql://%s:%s@%s:%d/%s''' % (
db_conf['username'],
db_conf['password'],
db_conf['host'],
db_conf.get('port', 3306),
db_conf['schema']
)
if 'ssl_key' in db_conf:
ssl_args = {'ssl': {
'cert': db_conf['ssl_cert'],
'key': db_conf['ssl_key'],
'ca': db_conf['ssl_ca']
}}
engine = create_engine(
conn_string, isolation_level="READ COMMITTED", pool_size=20,
connect_args=ssl_args, pool_recycle=3600
)
else:
engine = create_engine(
conn_string, isolation_level="READ COMMITTED", pool_size=20,
pool_recycle=3600
)
engines.append(engine)
DeclarativeBase = declarative_base()
metadata = DeclarativeBase.metadata
@ -145,21 +113,57 @@ class RoutingSession(Session):
with deadlocks in Galera, see http://tinyurl.com/9h6qlly
switch engines every 60 seconds of idle time """
engines = []
last_engine = None
last_engine_time = 0
def get_bind(self, mapper=None, clause=None):
if not RoutingSession.engines:
self._build_engines()
if (
RoutingSession.last_engine
and time.time() < RoutingSession.last_engine_time + 60
):
RoutingSession.last_engine_time = time.time()
return RoutingSession.last_engine
engine = random.choice(engines)
engine = random.choice(RoutingSession.engines)
RoutingSession.last_engine = engine
RoutingSession.last_engine_time = time.time()
return engine
def _build_engines(self):
config = ConfigParser.SafeConfigParser()
config.read([conf.conffile])
for section in conf.database:
db_conf = config._sections[section]
conn_string = '''mysql://%s:%s@%s:%d/%s''' % (
db_conf['username'],
db_conf['password'],
db_conf['host'],
db_conf.get('port', 3306),
db_conf['schema']
)
if 'ssl_key' in db_conf:
ssl_args = {'ssl': {
'cert': db_conf['ssl_cert'],
'key': db_conf['ssl_key'],
'ca': db_conf['ssl_ca']
}}
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, connect_args=ssl_args, pool_recycle=3600
)
else:
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, pool_recycle=3600
)
RoutingSession.engines.append(engine)
class db_session(object):
def __init__(self):

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
known_drivers = {
'dummy': 'libra.statsd.drivers.dummy.driver.DummyDriver',
'datadog': 'libra.statsd.drivers.datadog.driver.DatadogDriver',
'hp_rest': 'libra.statsd.drivers.hp_rest.driver.HPRestDriver'
'dummy': 'libra.admin_api.stats.drivers.dummy.driver.DummyDriver',
'datadog': 'libra.admin_api.stats.drivers.datadog.driver.DatadogDriver',
'database': 'libra.admin_api.stats.drivers.database.driver.DbDriver'
}

View File

@ -0,0 +1,48 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
from libra.admin_api.model.lbaas import Device, LoadBalancer, db_session
from libra.admin_api.model.lbaas import loadbalancers_devices
from libra.admin_api.stats.drivers.base import AlertDriver
class DbDriver(AlertDriver):
def send_alert(self, message, device_id):
self.update_status(message, device_id, 'ERROR')
def send_repair(self, message, device_id):
self.update_status(message, device_id, 'ONLINE')
def update_status(self, message, device_id, status):
with db_session() as session:
device = session.query(Device).\
filter(Device.id == device_id).first()
device.status = status
lb_status = 'ACTIVE' if status == 'ONLINE' else status
lbs = session.query(
loadbalancers_devices.c.loadbalancer).\
filter(loadbalancers_devices.c.device == device_id).\
all()
for lb in lbs:
session.query(LoadBalancer).\
filter(LoadBalancer.id == lb[0]).\
update({"status": lb_status, "errmsg": message},
synchronize_session='fetch')
session.flush()
session.commit()

View File

@ -15,16 +15,16 @@
import threading
import signal
import sys
from libra.statsd.admin_api import AdminAPI
from libra.statsd.statsd_gearman import GearJobs
from datetime import datetime
from libra.admin_api.model.lbaas import LoadBalancer, Device, db_session
from libra.admin_api.stats.stats_gearman import GearJobs
class NodeNotFound(Exception):
pass
class Sched(object):
class Stats(object):
def __init__(self, logger, args, drivers):
self.logger = logger
self.args = args
@ -34,8 +34,8 @@ class Sched(object):
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
def start(self):
self.ping_lbs()
self.repair_lbs()
@ -58,6 +58,12 @@ class Sched(object):
sys.exit(1)
def repair_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logger.info('Not our turn to run ping check, sleeping')
self.start_repair_sched()
return
tested = 0
repaired = 0
try:
@ -70,6 +76,12 @@ class Sched(object):
self.start_repair_sched()
def ping_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logger.info('Not our turn to run ping check, sleeping')
self.start_ping_sched()
return
pings = 0
failed = 0
try:
@ -86,24 +98,22 @@ class Sched(object):
failed = 0
node_list = []
self.logger.info('Running ping check')
api = AdminAPI(self.args.api_server, self.logger)
if api.is_online():
lb_list = api.get_ping_list()
pings = len(lb_list)
with db_session() as session:
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'ONLINE').all()
pings = len(devices)
if pings == 0:
self.logger.info('No LBs to ping')
return (0, 0)
for lb in lb_list:
node_list.append(lb['name'])
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs(self.logger, self.args)
failed_nodes = gearman.send_pings(node_list)
failed = len(failed_nodes)
if failed > 0:
self._send_fails(failed_nodes, lb_list)
else:
self.logger.error('No working API server found')
return (0, 0)
self._send_fails(failed_nodes, session)
session.commit()
return pings, failed
def _exec_repair(self):
@ -111,33 +121,32 @@ class Sched(object):
repaired = 0
node_list = []
self.logger.info('Running repair check')
api = AdminAPI(self.args.api_server, self.logger)
if api.is_online():
lb_list = api.get_repair_list()
tested = len(lb_list)
with db_session() as session:
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'ERROR').all()
tested = len(devices)
if tested == 0:
self.logger.info('No LBs need repair')
return (0, 0)
for lb in lb_list:
node_list.append(lb['name'])
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs(self.logger, self.args)
repaired_nodes = gearman.send_repair(node_list)
repaired = len(repaired_nodes)
if repaired > 0:
self._send_repair(repaired_nodes, lb_list)
else:
self.logger.error('No working API server found')
return (0, 0)
self._send_repair(repaired_nodes, session)
session.commit()
return tested, repaired
def _send_fails(self, failed_nodes, node_list):
def _send_fails(self, failed_nodes, session):
for node in failed_nodes:
data = self._get_node(node, node_list)
if not len(data['loadBalancers']):
data = self._get_node(node, session)
if not data:
self.logger.error(
'Device {0} has no Loadbalancer attached'.
format(data['id'])
format(data.id)
)
continue
message = (
@ -145,8 +154,8 @@ class Sched(object):
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data['id'], data['floatingIpAddr'],
data['loadBalancers'][0]['hpcs_tenantid']
data.id, data.floatingIpAddr,
data.tenantid
)
)
for driver in self.drivers:
@ -156,18 +165,18 @@ class Sched(object):
node, instance.__class__.__name__
)
)
instance.send_alert(message, data['id'])
instance.send_alert(message, data.id)
def _send_repair(self, repaired_nodes, node_list):
def _send_repair(self, repaired_nodes, session):
for node in repaired_nodes:
data = self._get_node(node, node_list)
data = self._get_node(node, session)
message = (
'Load balancer repaired\n'
'ID: {0}\n'
'IP: {1}\n'
'tenant: {2}\n'.format(
data['id'], data['floatingIpAddr'],
data['loadBalancers'][0]['hpcs_tenantid']
data.id, data.floatingIpAddr,
data.tenantid
)
)
for driver in self.drivers:
@ -177,25 +186,26 @@ class Sched(object):
node, instance.__class__.__name__
)
)
instance.send_repair(message, data['id'])
instance.send_repair(message, data.id)
def _get_node(self, node, node_list):
for n in node_list:
if n['name'] == node:
return n
def _get_node(self, node, session):
lb = session.query(
LoadBalancer.tenantid, Device.floatingIpAddr, Device.id
).join(LoadBalancer.devices).\
filter(Device.name == node).first()
raise NodeNotFound
return lb
def start_ping_sched(self):
self.logger.info('LB ping check timer sleeping for {secs} seconds'
.format(secs=self.args.ping_interval))
self.ping_timer = threading.Timer(self.args.ping_interval,
.format(secs=self.args.stats_ping_timer))
self.ping_timer = threading.Timer(self.args.stats_ping_timer,
self.ping_lbs, ())
self.ping_timer.start()
def start_repair_sched(self):
self.logger.info('LB repair check timer sleeping for {secs} seconds'
.format(secs=self.args.repair_interval))
self.repair_timer = threading.Timer(self.args.repair_interval,
.format(secs=self.args.stats_repair_timer))
self.repair_timer = threading.Timer(self.args.stats_repair_timer,
self.repair_lbs, ())
self.repair_timer.start()

View File

@ -19,14 +19,14 @@ from libra.common.json_gearman import JSONGearmanClient
class GearJobs(object):
def __init__(self, logger, args):
self.logger = logger
self.poll_timeout = args.poll_timeout
self.poll_timeout_retry = args.poll_timeout_retry
self.poll_timeout = args.stats_poll_timeout
self.poll_timeout_retry = args.stats_poll_timeout_retry
if all([args.gearman_ssl_ca, args.gearman_ssl_cert,
args.gearman_ssl_key]):
# Use SSL connections to each Gearman job server.
ssl_server_list = []
for server in args.server:
for server in args.gearman:
host, port = server.split(':')
ssl_server_list.append({'host': host,
'port': int(port),
@ -35,7 +35,7 @@ class GearJobs(object):
'ca_certs': args.gearman_ssl_ca})
self.gm_client = JSONGearmanClient(ssl_server_list)
else:
self.gm_client = JSONGearmanClient(args.server)
self.gm_client = JSONGearmanClient(args.gearman)
def send_pings(self, node_list):
# TODO: lots of duplicated code that needs cleanup

View File

@ -22,6 +22,7 @@ import pecan
import sys
import os
import wsme_overrides
from libra.api.library.expunge import ExpungeScheduler
from libra.api import config as api_config
from libra.api import model
from libra.api import acl
@ -214,8 +215,6 @@ def main():
logger = setup_logging('', args)
logger.info('Starting on {0}:{1}'.format(args.host, args.port))
api = setup_app(pc, args)
# Include this here so that the DB model doesn't cry
from libra.api.library.expunge import ExpungeScheduler
ExpungeScheduler(logger)
sys.stderr = LogStdout(logger)
ssl_sock = eventlet.wrap_ssl(

View File

@ -24,38 +24,6 @@ from pecan import conf
import logging
config = ConfigParser.SafeConfigParser()
config.read([conf.conffile])
engines = []
for section in conf.database:
db_conf = config._sections[section]
conn_string = '''mysql://%s:%s@%s:%d/%s''' % (
db_conf['username'],
db_conf['password'],
db_conf['host'],
db_conf.get('port', 3306),
db_conf['schema']
)
if 'ssl_key' in db_conf:
ssl_args = {'ssl': {
'cert': db_conf['ssl_cert'],
'key': db_conf['ssl_key'],
'ca': db_conf['ssl_ca']
}}
engine = create_engine(
conn_string, isolation_level="READ COMMITTED", pool_size=20,
connect_args=ssl_args, pool_recycle=3600
)
else:
engine = create_engine(
conn_string, isolation_level="READ COMMITTED", pool_size=20,
pool_recycle=3600
)
engines.append(engine)
DeclarativeBase = declarative_base()
metadata = DeclarativeBase.metadata
@ -145,21 +113,57 @@ class RoutingSession(Session):
with deadlocks in Galera, see http://tinyurl.com/9h6qlly
switch engines every 60 seconds of idle time """
engines = []
last_engine = None
last_engine_time = 0
def get_bind(self, mapper=None, clause=None):
if not RoutingSession.engines:
self._build_engines()
if (
RoutingSession.last_engine
and time.time() < RoutingSession.last_engine_time + 60
):
RoutingSession.last_engine_time = time.time()
return RoutingSession.last_engine
engine = random.choice(engines)
engine = random.choice(RoutingSession.engines)
RoutingSession.last_engine = engine
RoutingSession.last_engine_time = time.time()
return engine
def _build_engines(self):
config = ConfigParser.SafeConfigParser()
config.read([conf.conffile])
for section in conf.database:
db_conf = config._sections[section]
conn_string = '''mysql://%s:%s@%s:%d/%s''' % (
db_conf['username'],
db_conf['password'],
db_conf['host'],
db_conf.get('port', 3306),
db_conf['schema']
)
if 'ssl_key' in db_conf:
ssl_args = {'ssl': {
'cert': db_conf['ssl_cert'],
'key': db_conf['ssl_key'],
'ca': db_conf['ssl_ca']
}}
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, connect_args=ssl_args, pool_recycle=3600
)
else:
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, pool_recycle=3600
)
RoutingSession.engines.append(engine)
class db_session(object):
def __init__(self):

View File

@ -1,145 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
import random
import sys
import json
class APIError(Exception):
pass
class AdminAPI(object):
def __init__(self, addresses, logger):
self.logger = logger
self.headers = {'Content-type': 'application/json'}
random.shuffle(addresses)
for address in addresses:
self.url = 'https://{0}/v1'.format(address)
self.logger.info('Trying {url}'.format(url=self.url))
status, data = self._get('{url}/devices/usage'
.format(url=self.url))
if status:
self.logger.info('API Server is online')
self.online = True
return
# if we get this far all API servers are down
self.online = False
def get_ping_list(self):
marker = 0
limit = 20
lb_list = []
while True:
success, nodes = self._get_node_list(limit, marker)
if not success:
raise APIError
# if we hit an empty device list we have hit end of list
if not len(nodes['devices']):
break
for device in nodes['devices']:
if device['status'] == 'ONLINE':
lb_list.append(device)
marker = marker + limit
return lb_list
def get_repair_list(self):
marker = 0
limit = 20
lb_list = []
while True:
success, nodes = self._get_node_list(limit, marker)
if not success:
raise APIError
# if we hit an empty device list we have hit end of list
if not len(nodes['devices']):
break
for device in nodes['devices']:
if device['status'] == 'ERROR' and \
len(device['loadBalancers']) > 0:
lb_list.append(device)
marker = marker + limit
return lb_list
def fail_device(self, device_id):
body = {
"status": "ERROR",
"statusDescription": "Load balancer failed ping test"
}
self._put(
'{url}/devices/{device_id}'.format(
url=self.url, device_id=device_id
), body
)
def repair_device(self, device_id):
body = {
"status": "ONLINE",
"statusDescription": "Load balancer repaired"
}
self._put(
'{url}/devices/{device_id}'.format(
url=self.url, device_id=device_id
), body
)
def get_device(self, device_id):
return self._get(
'{url}/devices/{device_id}'.format(
url=self.url, device_id=device_id
)
)
def _get_node_list(self, limit, marker):
return self._get(
'{url}/devices?marker={marker}&limit={limit}'
.format(url=self.url, marker=marker, limit=limit)
)
def _put(self, url, data):
try:
r = requests.put(
url, data=json.dumps(data), verify=False, headers=self.headers
)
except requests.exceptions.RequestException:
self.logger.exception('Exception communicating to server')
return False, None
if r.status_code != 200:
self.logger.error('Server returned error {code}'
.format(code=r.status_code))
return False, r.json()
return True, r.json()
def _get(self, url):
try:
r = requests.get(url, verify=False)
except requests.exceptions.RequestException:
self.logger.error('Exception communicating to server: {exc}'
.format(exc=sys.exc_info()[0]))
return False, None
if r.status_code != 200:
self.logger.error('Server returned error {code}'
.format(code=r.status_code))
return False, r.json()
return True, r.json()
def is_online(self):
return self.online

View File

@ -1,27 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
from libra.statsd.drivers.base import AlertDriver
from libra.statsd.admin_api import AdminAPI
class HPRestDriver(AlertDriver):
def send_alert(self, message, device_id):
api = AdminAPI(self.args.api_server, self.logger)
if api.is_online():
api.fail_device(device_id)
def send_repair(self, message, device_id):
api = AdminAPI(self.args.api_server, self.logger)
if api.is_online():
api.repair_device(device_id)

View File

@ -1,157 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import daemon
import daemon.pidfile
import daemon.runner
import grp
import pwd
import time
from libra.common.options import Options, setup_logging
from libra.openstack.common import importutils
from libra.statsd.drivers.base import known_drivers
from libra.statsd.scheduler import Sched
def start(args, drivers):
""" Start the main server processing. """
logger = setup_logging('libra_statsd', args)
logger.info("Job server list: %s" % args.server)
logger.info("Selected drivers: {0}".format(args.driver))
scheduler = Sched(logger, args, drivers)
scheduler.start()
while True:
time.sleep(1)
def main():
""" Main Python entry point for the statistics daemon. """
drivers = []
options = Options('statsd', 'Statistics Daemon')
options.parser.add_argument(
'--driver', dest='driver',
choices=known_drivers.keys(), default='dummy',
help='type of device to use'
)
options.parser.add_argument(
'--gearman_ssl_ca', dest='gearman_ssl_ca', metavar='FILE',
help='Gearman SSL certificate authority'
)
options.parser.add_argument(
'--gearman_ssl_cert', dest='gearman_ssl_cert', metavar='FILE',
help='Gearman SSL certificate'
)
options.parser.add_argument(
'--gearman_ssl_key', dest='gearman_ssl_key', metavar='FILE',
help='Gearman SSL key'
)
options.parser.add_argument(
'--server', dest='server', action='append', metavar='HOST:PORT',
default=[],
help='add a Gearman job server to the connection list'
)
options.parser.add_argument(
'--ping_interval', type=int, default=60,
help='how often to ping load balancers (in seconds)'
)
options.parser.add_argument(
'--poll_timeout', type=int, default=5,
help='timeout value for initial ping request (in seconds)'
)
options.parser.add_argument(
'--poll_timeout_retry', type=int, default=30,
help='timeout value for the retry ping request (in seconds)'
)
options.parser.add_argument(
'--repair_interval', type=int, default=180,
help='how often to check if a load balancer has been repaired (in '
'seconds)'
)
options.parser.add_argument(
'--api_server', action='append', metavar='HOST:PORT', default=[],
help='a list of API servers to connect to'
)
# Datadog plugin options
options.parser.add_argument(
'--datadog_api_key', help='API key for datadog alerting'
)
options.parser.add_argument(
'--datadog_app_key', help='Application key for datadog alerting'
)
options.parser.add_argument(
'--datadog_message_tail',
help='Text to add at the end of a Datadog alert'
)
options.parser.add_argument(
'--datadog_tags',
help='A space separated list of tags for Datadog alerts'
)
options.parser.add_argument(
'--datadog_env', default='unknown',
help='Server enironment'
)
args = options.run()
if not args.server:
# NOTE(shrews): Can't set a default in argparse method because the
# value is appended to the specified default.
args.server.append('localhost:4730')
elif not isinstance(args.server, list):
# NOTE(shrews): The Options object cannot intelligently handle
# creating a list from an option that may have multiple values.
# We convert it to the expected type here.
svr_list = args.server.split()
args.server = svr_list
if not args.api_server:
# NOTE(shrews): Can't set a default in argparse method because the
# value is appended to the specified default.
args.api_server.append('localhost:8889')
elif not isinstance(args.api_server, list):
# NOTE(shrews): The Options object cannot intelligently handle
# creating a list from an option that may have multiple values.
# We convert it to the expected type here.
svr_list = args.api_server.split()
args.api_server = svr_list
if not isinstance(args.driver, list):
args.driver = args.driver.split()
for driver in args.driver:
drivers.append(importutils.import_class(
known_drivers[driver]
))
if args.nodaemon:
start(args, drivers)
else:
pidfile = daemon.pidfile.TimeoutPIDLockFile(args.pid, 10)
if daemon.runner.is_pidfile_stale(pidfile):
pidfile.break_lock()
context = daemon.DaemonContext(
umask=0o022,
pidfile=pidfile
)
if args.user:
context.uid = pwd.getpwnam(args.user).pw_uid
if args.group:
context.gid = grp.getgrnam(args.group).gr_gid
context.open()
start(args, drivers)

View File

@ -31,6 +31,5 @@ setup-hooks =
console_scripts =
libra_worker = libra.worker.main:main
libra_pool_mgm = libra.mgm.mgm:main
libra_statsd = libra.statsd.main:main
libra_api = libra.api.app:main
libra_admin_api = libra.admin_api.app:main