diff --git a/debian/libra_statsd.upstart b/debian/libra_statsd.upstart new file mode 100644 index 00000000..d6c42cf9 --- /dev/null +++ b/debian/libra_statsd.upstart @@ -0,0 +1,12 @@ +description "statsd" + +start on (net-device-up + and local-filesystems + and runlevel [2345]) +stop on runlevel [!2345] + +script + libra_statsd -c /etc/libra.cfg + +end script + diff --git a/debian/rules b/debian/rules index 6eac57fc..d6af4074 100755 --- a/debian/rules +++ b/debian/rules @@ -9,3 +9,4 @@ WITH_PYTHON2 = $(shell test -f /usr/bin/dh_python2 && echo "--with python2") override_dh_installinit: dh_installinit --name=libra_worker dh_installinit --name=libra_pool_mgm --no-start + sh_installinit --name=libra_statsd --no-start diff --git a/etc/sample_libra.cfg b/etc/sample_libra.cfg index 0371e8df..fdd665c0 100644 --- a/etc/sample_libra.cfg +++ b/etc/sample_libra.cfg @@ -56,3 +56,15 @@ check_interval = 5 failed_interval = 15 node_basename = 'libra' az = 1 + +[statsd] +api_server=127.0.0.1:8889 +server=127.0.0.1:4730 +logfile=/tmp/statsd.log +pid=/tmp/statsd.pid +driver=dummy datadog hp_rest +datadog_api_key=0987654321 +datadog_app_key=1234567890 +datadog_message_tail="@user@domain.com" +datadog_tags=service:lbaas +ping_interval = 60 diff --git a/libra/statsd/admin_api.py b/libra/statsd/admin_api.py index 30fda4d5..082232a0 100644 --- a/libra/statsd/admin_api.py +++ b/libra/statsd/admin_api.py @@ -40,8 +40,6 @@ class AdminAPI(object): self.online = False def get_ping_list(self): - # TODO: we need an error list to ping (maybe separate sched?) with - # all clear marker = 0 limit = 20 lb_list = [] @@ -59,6 +57,24 @@ class AdminAPI(object): marker = marker + limit return lb_list + def get_repair_list(self): + marker = 0 + limit = 20 + lb_list = [] + while True: + success, nodes = self._get_node_list(limit, marker) + if not success: + raise APIError + # if we hit an empty device list we have hit end of list + if not len(nodes['devices']): + break + + for device in nodes['devices']: + if device['status'] == 'ERROR': + lb_list.append(device) + marker = marker + limit + return lb_list + def fail_device(self, device_id): body = { "status": "ERROR", @@ -70,6 +86,17 @@ class AdminAPI(object): ), body ) + def repair_device(self, device_id): + body = { + "status": "ONLINE", + "statusDescription": "Load balancer repaired" + } + self._put( + '{url}/devices/{device_id}'.format( + url=self.url, device_id=device_id + ), body + ) + def _get_node_list(self, limit, marker): return self._get( '{url}/devices?marker={marker}&limit={limit}' diff --git a/libra/statsd/drivers/base.py b/libra/statsd/drivers/base.py index 483c19ba..1073357d 100644 --- a/libra/statsd/drivers/base.py +++ b/libra/statsd/drivers/base.py @@ -25,3 +25,6 @@ class AlertDriver(object): def send_alert(self, message, device_id): raise NotImplementedError() + + def send_repair(self, message, device_id): + raise NotImplementedError() diff --git a/libra/statsd/drivers/datadog/driver.py b/libra/statsd/drivers/datadog/driver.py index 1963baea..70a7f22a 100644 --- a/libra/statsd/drivers/datadog/driver.py +++ b/libra/statsd/drivers/datadog/driver.py @@ -16,9 +16,12 @@ from dogapi import dog_http_api as api class DatadogDriver(AlertDriver): + def __init__(self, logger, args): + api.api_key = args.datadog_api_key + api.application_key = args.datadog_app_key + super(DatadogDriver, self).__init__(logger, args) + def send_alert(self, message, device_id): - api.api_key = self.args.datadog_api_key - api.application_key = self.args.datadog_app_key title = 'Load balancer failure' text = 'Load balancer failed with message {0} {1}'.format( message, self.args.datadog_message_tail @@ -28,3 +31,14 @@ class DatadogDriver(AlertDriver): title, text, tags=tags, alert_type='error' ) self.logger.info('Datadog alert response: {0}'.format(resp)) + + def send_repair(self, message, device_id): + title = 'Load balancer recovered' + text = 'Load balancer recovered with message {0} {1}'.format( + message, self.args.datadog_message_tail + ) + tags = self.args.datadog_tags.split() + resp = api.event_with_response( + title, text, tags=tags, alert_type='success' + ) + self.logger.info('Datadog alert response: {0}'.format(resp)) diff --git a/libra/statsd/drivers/dummy/driver.py b/libra/statsd/drivers/dummy/driver.py index 7c4d0826..ebafe746 100644 --- a/libra/statsd/drivers/dummy/driver.py +++ b/libra/statsd/drivers/dummy/driver.py @@ -16,4 +16,7 @@ from libra.statsd.drivers.base import AlertDriver class DummyDriver(AlertDriver): def send_alert(self, message, device_id): - self.logger.info('Dummy alert send of {0}'.format(message)) + self.logger.info('Dummy alert of: {0}'.format(message)) + + def send_repair(self, message, dervice_id): + self.logger.info('Dummy repair of: {0}'.format(message)) diff --git a/libra/statsd/drivers/hp_rest/driver.py b/libra/statsd/drivers/hp_rest/driver.py index d8200a98..43973ba5 100644 --- a/libra/statsd/drivers/hp_rest/driver.py +++ b/libra/statsd/drivers/hp_rest/driver.py @@ -20,3 +20,8 @@ class HPRestDriver(AlertDriver): api = AdminAPI(self.args.api_server, self.logger) if api.is_online(): api.fail_device(device_id) + + def send_repair(self, message, device_id): + api = AdminAPI(self.args.api_server, self.logger) + if api.is_online(): + api.repair_device(device_id) diff --git a/libra/statsd/gearman.py b/libra/statsd/gearman.py index f8209cd5..2586ac69 100644 --- a/libra/statsd/gearman.py +++ b/libra/statsd/gearman.py @@ -45,3 +45,29 @@ class GearJobs(object): continue return failed_list + + def send_repair(self, node_list): + list_of_jobs = [] + repaired_list = [] + job_data = {"hpcs_action": "STATS"} + for node in node_list: + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_pings = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=5.0 + ) + for ping in submitted_pings: + if ping.state == 'UNKNOWN': + # TODO: Gearman server failed, ignoring for now + self.logger.error('Gearman Job server fail') + continue + elif ping.timed_out: + # Ping timeout + continue + elif ping.result['hpcs_response'] == 'FAIL': + # Error returned by Gearman + continue + else: + repaired_list.append(ping.job.task) + + return repaired_list diff --git a/libra/statsd/main.py b/libra/statsd/main.py index 336e5726..defcf7dc 100644 --- a/libra/statsd/main.py +++ b/libra/statsd/main.py @@ -55,6 +55,11 @@ def main(): '--ping_interval', type=int, default=60, help='how often to ping load balancers (in seconds)' ) + options.parser.add_argument( + '--repair_interval', type=int, default=180, + help='how often to check if a load balancer has been repaired (in ' + 'seconds)' + ) options.parser.add_argument( '--api_server', action='append', metavar='HOST:PORT', default=[], help='a list of API servers to connect to' diff --git a/libra/statsd/scheduler.py b/libra/statsd/scheduler.py index 9a051ea4..adba0833 100644 --- a/libra/statsd/scheduler.py +++ b/libra/statsd/scheduler.py @@ -29,14 +29,15 @@ class Sched(object): self.logger = logger self.args = args self.drivers = drivers - self.rlock = threading.RLock() self.ping_timer = None + self.repair_timer = None signal.signal(signal.SIGINT, self.exit_handler) signal.signal(signal.SIGTERM, self.exit_handler) def start(self): self.ping_lbs() + self.repair_lbs() def exit_handler(self, signum, frame): signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -46,6 +47,8 @@ class Sched(object): def shutdown(self, error): if self.ping_timer: self.ping_timer.cancel() + if self.repair_timer: + self.repair_timer.cancel() if not error: self.logger.info('Safely shutting down') @@ -54,14 +57,25 @@ class Sched(object): self.logger.info('Shutting down due to error') sys.exit(1) + def repair_lbs(self): + tested = 0 + repaired = 0 + try: + tested, repaired = self._exec_repair() + except Exception: + self.logger.exception('Uncaught exception during LB repair') + # Need to restart timer after every ping cycle + self.logger.info('{tested} loadbalancers tested, {repaired} repaired' + .format(tested=tested, repaired=repaired)) + self.start_repair_sched() + def ping_lbs(self): pings = 0 failed = 0 - with self.rlock: - try: - pings, failed = self._exec_ping() - except Exception: - self.logger.exception('Uncaught exception during LB ping') + try: + pings, failed = self._exec_ping() + except Exception: + self.logger.exception('Uncaught exception during LB ping') # Need to restart timer after every ping cycle self.logger.info('{pings} loadbalancers pinged, {failed} failed' .format(pings=pings, failed=failed)) @@ -89,6 +103,28 @@ class Sched(object): return pings, failed + def _exec_repair(self): + tested = 0 + repaired = 0 + node_list = [] + self.logger.info('Running repair check') + api = AdminAPI(self.args.api_server, self.logger) + if api.is_online(): + lb_list = api.get_repair_list() + tested = len(lb_list) + for lb in lb_list: + node_list.append(lb['name']) + gearman = GearJobs(self.logger, self.args) + repaired_nodes = gearman.send_repair(node_list) + repaired = len(repaired_nodes) + if repaired > 0: + self._send_repair(repaired_nodes, lb_list) + else: + self.logger.error('No working API server found') + return (0, 0) + + return tested, repaired + def _send_fails(self, failed_nodes, node_list): for node in failed_nodes: data = self._get_node(node, node_list) @@ -110,6 +146,27 @@ class Sched(object): ) instance.send_alert(message, data['id']) + def _send_repair(self, repaired_nodes, node_list): + for node in repaired_nodes: + data = self._get_node(node, node_list) + message = ( + 'Load balancer repaired\n' + 'ID: {0}\n' + 'IP: {1}\n' + 'tenant: {2}\n'.format( + data['id'], data['floatingIpAddr'], + data['loadBalancers'][0]['hpcs_tenantid'] + ) + ) + for driver in self.drivers: + instance = driver(self.logger, self.args) + self.logger.info( + 'Sending repair of {0} to {1}'.format( + node, instance.__class__.__name__ + ) + ) + instance.send_repair(message, data['id']) + def _get_node(self, node, node_list): for n in node_list: if n['name'] == node: