From 7f01bc3daf0de3a789e0e43537c4b73da9e5bdcf Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Fri, 21 Sep 2012 12:43:48 -0400 Subject: [PATCH] Initial architecture changes for using dynamic drivers and for a HAProxy driver. --- libra/worker/drivers/__init__.py | 13 +++++ libra/worker/drivers/base.py | 37 ++++++++++++++ libra/worker/drivers/haproxy/__init__.py | 13 +++++ libra/worker/drivers/haproxy/driver.py | 44 +++++++++++++++++ libra/worker/worker.py | 61 +++++++++++++++++++----- tests/test_lbaas_worker.py | 25 +++++++--- 6 files changed, 174 insertions(+), 19 deletions(-) create mode 100644 libra/worker/drivers/__init__.py create mode 100644 libra/worker/drivers/base.py create mode 100644 libra/worker/drivers/haproxy/__init__.py create mode 100644 libra/worker/drivers/haproxy/driver.py diff --git a/libra/worker/drivers/__init__.py b/libra/worker/drivers/__init__.py new file mode 100644 index 00000000..582348cb --- /dev/null +++ b/libra/worker/drivers/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/libra/worker/drivers/base.py b/libra/worker/drivers/base.py new file mode 100644 index 00000000..6bc71785 --- /dev/null +++ b/libra/worker/drivers/base.py @@ -0,0 +1,37 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations + + +class LoadBalancerDriver(object): + """ + Load balancer device driver base class. + + This defines the API for interacting with various load balancing + appliances. Drivers for these appliances should inherit from this + class and implement the relevant API methods that it can support. + + Generally, an appliance driver should queue up any changes made + via these API calls until the activate() method is called. + """ + + def bind(self, address, port): + """ Set proxy listening interface and port. """ + raise NotImplementedError() + + def add_server(self, host, port): + """ Add a server for which we will proxy. """ + raise NotImplementedError() + + def activate(self): + """ Activate any changes made. """ + raise NotImplementedError() diff --git a/libra/worker/drivers/haproxy/__init__.py b/libra/worker/drivers/haproxy/__init__.py new file mode 100644 index 00000000..582348cb --- /dev/null +++ b/libra/worker/drivers/haproxy/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/libra/worker/drivers/haproxy/driver.py b/libra/worker/drivers/haproxy/driver.py new file mode 100644 index 00000000..abe08717 --- /dev/null +++ b/libra/worker/drivers/haproxy/driver.py @@ -0,0 +1,44 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from libra.worker.drivers.base import LoadBalancerDriver + + +class HAProxyDriver(LoadBalancerDriver): + + def __init__(self): + self._config_file = '/etc/haproxy/haproxy.cfg' + self._servers = [] + self.bind('0.0.0.0', 80) + + def _write_config(self): + pass + + def _restart(self): + pass + + #################### + # Driver API Methods + #################### + + def bind(self, address, port): + self._bind_address = address + self._bind_port = port + + def add_server(self, host, port): + self._servers.append((host, port)) + + def activate(self): + self._write_config() + self._restart() diff --git a/libra/worker/worker.py b/libra/worker/worker.py index 76c2b3cb..4efdcc91 100644 --- a/libra/worker/worker.py +++ b/libra/worker/worker.py @@ -16,7 +16,6 @@ import daemon import gearman.errors import json import lockfile -import logging import socket from time import sleep @@ -28,17 +27,23 @@ from libra.common.options import Options, setup_logging def lbaas_task(worker, job): """ Main Gearman worker task. """ + NODE_OK = "ENABLED" + NODE_ERR = "DISABLED" + + logger = worker.logger + driver = worker.driver + # Turn string into JSON object data = json.loads(job.data) lb_name = data['name'] - logging.info("LB name: %s" % lb_name) + logger.debug("LB name: %s" % lb_name) if 'nodes' not in data: return BadRequest("Missing 'nodes' element").to_json() for lb_node in data['nodes']: - port, address, status = None, None, None + port, address = None, None if 'port' in lb_node: port = lb_node['port'] @@ -50,24 +55,45 @@ def lbaas_task(worker, job): else: return BadRequest("Missing 'address' element.").to_json() - if 'status' in lb_node: - status = lb_node['status'] + logger.debug("Server node: %s:%s" % (address, port)) - logging.info("LB node: %s:%s - %s" % (address, port, status)) - lb_node['status'] = 'ACTIVE' + try: + driver.add_server(address, port) + except NotImplementedError: + logger.info("Selected driver could not add server.") + lb_node['condition'] = NODE_ERR + except Exception as e: + logger.critical("Failure trying adding server: %s, %s" % + (e.__class__, e)) + lb_node['condition'] = NODE_ERR + else: + lb_node['condition'] = NODE_OK - # Return the same JSON object, but with status fields set. + try: + driver.activate() + except NotImplementedError: + logger.info("Selected driver could not activate changes.") + for lb_node in data['nodes']: + lb_node['condition'] = NODE_ERR + + # Return the same JSON object, but with condition fields set. return data +class CustomJSONGearmanWorker(JSONGearmanWorker): + logger = None + driver = None + + class Server(object): """ Encapsulates server activity so we can run it in either daemon or non-daemon mode. """ - def __init__(self, logger, servers, reconnect_sleep): - self.logger = logger + def __init__(self, servers, reconnect_sleep): + self.logger = None + self.driver = None self.servers = servers self.reconnect_sleep = reconnect_sleep @@ -76,7 +102,7 @@ class Server(object): task_name = "lbaas-%s" % my_ip self.logger.debug("Registering task %s" % task_name) - worker = JSONGearmanWorker(self.servers) + worker = CustomJSONGearmanWorker(self.servers) worker.set_client_id(my_ip) worker.register_task(task_name, lbaas_task) @@ -103,13 +129,22 @@ def main(): options = Options('worker', 'Worker Daemon') options.parser.add_argument( - '-s', dest='reconnect_sleep', type=int, metavar="TIME", + '-s', dest='reconnect_sleep', type=int, metavar='TIME', default=60, help='seconds to sleep between job server reconnects' ) + options.parser.add_argument( + '--driver', dest='driver', default='haproxy.driver.HAProxyDriver', + help='Class name of device driver to use' + ) args = options.run() logger = setup_logging('libra_worker', args) - server = Server(logger, ['localhost:4730'], args.reconnect_sleep) + from libra.worker.drivers.haproxy.driver import HAProxyDriver + driver = HAProxyDriver() + + server = Server(['localhost:4730'], args.reconnect_sleep) + server.logger = logger + server.driver = driver if args.nodaemon: server.main() diff --git a/tests/test_lbaas_worker.py b/tests/test_lbaas_worker.py index f2f0ff3a..3e695ad6 100644 --- a/tests/test_lbaas_worker.py +++ b/tests/test_lbaas_worker.py @@ -1,6 +1,13 @@ import json +import logging import unittest +import mock from libra.worker.worker import lbaas_task +from libra.worker.drivers.base import LoadBalancerDriver + + +class FakeDriver(LoadBalancerDriver): + pass class FakeJob(object): @@ -11,6 +18,12 @@ class FakeJob(object): self.data = json.dumps(data) +class FakeWorker(object): + def __init__(self): + self.logger = logging.getLogger('lbaas_worker_test') + self.driver = FakeDriver() + + class TestLBaaSTask(unittest.TestCase): def setUp(self): pass @@ -21,7 +34,7 @@ class TestLBaaSTask(unittest.TestCase): def testLBaaSTask(self): """ Test the lbaas_task() function """ - worker = None + worker = FakeWorker() data = { "name": "a-new-loadbalancer", "nodes": [ @@ -43,15 +56,15 @@ class TestLBaaSTask(unittest.TestCase): self.assertEqual(len(r["nodes"]), 2) self.assertEqual(r["nodes"][0]["address"], data["nodes"][0]["address"]) self.assertEqual(r["nodes"][0]["port"], data["nodes"][0]["port"]) - self.assertIn("status", r["nodes"][0]) + self.assertIn("condition", r["nodes"][0]) self.assertEqual(r["nodes"][1]["address"], data["nodes"][1]["address"]) self.assertEqual(r["nodes"][1]["port"], data["nodes"][1]["port"]) - self.assertIn("status", r["nodes"][1]) + self.assertIn("condition", r["nodes"][1]) def testMissingNodes(self): """ Test invalid messages: missing nodes """ - worker = None + worker = FakeWorker() data = { "name": "a-new-loadbalancer" } @@ -63,7 +76,7 @@ class TestLBaaSTask(unittest.TestCase): def testMissingPort(self): """ Test invalid messages: missing port """ - worker = None + worker = FakeWorker() data = { "name": "a-new-loadbalancer", "nodes": [ @@ -80,7 +93,7 @@ class TestLBaaSTask(unittest.TestCase): def testMissingAddress(self): """ Test invalid messages: missing address """ - worker = None + worker = FakeWorker() data = { "name": "a-new-loadbalancer", "nodes": [