Initial architecture changes for using dynamic drivers and for a HAProxy driver.

This commit is contained in:
David Shrewsbury 2012-09-21 12:43:48 -04:00
parent d1f6c62ddd
commit 7f01bc3daf
6 changed files with 174 additions and 19 deletions

View File

@ -0,0 +1,13 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,37 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
class LoadBalancerDriver(object):
"""
Load balancer device driver base class.
This defines the API for interacting with various load balancing
appliances. Drivers for these appliances should inherit from this
class and implement the relevant API methods that it can support.
Generally, an appliance driver should queue up any changes made
via these API calls until the activate() method is called.
"""
def bind(self, address, port):
""" Set proxy listening interface and port. """
raise NotImplementedError()
def add_server(self, host, port):
""" Add a server for which we will proxy. """
raise NotImplementedError()
def activate(self):
""" Activate any changes made. """
raise NotImplementedError()

View File

@ -0,0 +1,13 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,44 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from libra.worker.drivers.base import LoadBalancerDriver
class HAProxyDriver(LoadBalancerDriver):
def __init__(self):
self._config_file = '/etc/haproxy/haproxy.cfg'
self._servers = []
self.bind('0.0.0.0', 80)
def _write_config(self):
pass
def _restart(self):
pass
####################
# Driver API Methods
####################
def bind(self, address, port):
self._bind_address = address
self._bind_port = port
def add_server(self, host, port):
self._servers.append((host, port))
def activate(self):
self._write_config()
self._restart()

View File

@ -16,7 +16,6 @@ import daemon
import gearman.errors
import json
import lockfile
import logging
import socket
from time import sleep
@ -28,17 +27,23 @@ from libra.common.options import Options, setup_logging
def lbaas_task(worker, job):
""" Main Gearman worker task. """
NODE_OK = "ENABLED"
NODE_ERR = "DISABLED"
logger = worker.logger
driver = worker.driver
# Turn string into JSON object
data = json.loads(job.data)
lb_name = data['name']
logging.info("LB name: %s" % lb_name)
logger.debug("LB name: %s" % lb_name)
if 'nodes' not in data:
return BadRequest("Missing 'nodes' element").to_json()
for lb_node in data['nodes']:
port, address, status = None, None, None
port, address = None, None
if 'port' in lb_node:
port = lb_node['port']
@ -50,24 +55,45 @@ def lbaas_task(worker, job):
else:
return BadRequest("Missing 'address' element.").to_json()
if 'status' in lb_node:
status = lb_node['status']
logger.debug("Server node: %s:%s" % (address, port))
logging.info("LB node: %s:%s - %s" % (address, port, status))
lb_node['status'] = 'ACTIVE'
try:
driver.add_server(address, port)
except NotImplementedError:
logger.info("Selected driver could not add server.")
lb_node['condition'] = NODE_ERR
except Exception as e:
logger.critical("Failure trying adding server: %s, %s" %
(e.__class__, e))
lb_node['condition'] = NODE_ERR
else:
lb_node['condition'] = NODE_OK
# Return the same JSON object, but with status fields set.
try:
driver.activate()
except NotImplementedError:
logger.info("Selected driver could not activate changes.")
for lb_node in data['nodes']:
lb_node['condition'] = NODE_ERR
# Return the same JSON object, but with condition fields set.
return data
class CustomJSONGearmanWorker(JSONGearmanWorker):
logger = None
driver = None
class Server(object):
"""
Encapsulates server activity so we can run it in either daemon or
non-daemon mode.
"""
def __init__(self, logger, servers, reconnect_sleep):
self.logger = logger
def __init__(self, servers, reconnect_sleep):
self.logger = None
self.driver = None
self.servers = servers
self.reconnect_sleep = reconnect_sleep
@ -76,7 +102,7 @@ class Server(object):
task_name = "lbaas-%s" % my_ip
self.logger.debug("Registering task %s" % task_name)
worker = JSONGearmanWorker(self.servers)
worker = CustomJSONGearmanWorker(self.servers)
worker.set_client_id(my_ip)
worker.register_task(task_name, lbaas_task)
@ -103,13 +129,22 @@ def main():
options = Options('worker', 'Worker Daemon')
options.parser.add_argument(
'-s', dest='reconnect_sleep', type=int, metavar="TIME",
'-s', dest='reconnect_sleep', type=int, metavar='TIME',
default=60, help='seconds to sleep between job server reconnects'
)
options.parser.add_argument(
'--driver', dest='driver', default='haproxy.driver.HAProxyDriver',
help='Class name of device driver to use'
)
args = options.run()
logger = setup_logging('libra_worker', args)
server = Server(logger, ['localhost:4730'], args.reconnect_sleep)
from libra.worker.drivers.haproxy.driver import HAProxyDriver
driver = HAProxyDriver()
server = Server(['localhost:4730'], args.reconnect_sleep)
server.logger = logger
server.driver = driver
if args.nodaemon:
server.main()

View File

@ -1,6 +1,13 @@
import json
import logging
import unittest
import mock
from libra.worker.worker import lbaas_task
from libra.worker.drivers.base import LoadBalancerDriver
class FakeDriver(LoadBalancerDriver):
pass
class FakeJob(object):
@ -11,6 +18,12 @@ class FakeJob(object):
self.data = json.dumps(data)
class FakeWorker(object):
def __init__(self):
self.logger = logging.getLogger('lbaas_worker_test')
self.driver = FakeDriver()
class TestLBaaSTask(unittest.TestCase):
def setUp(self):
pass
@ -21,7 +34,7 @@ class TestLBaaSTask(unittest.TestCase):
def testLBaaSTask(self):
""" Test the lbaas_task() function """
worker = None
worker = FakeWorker()
data = {
"name": "a-new-loadbalancer",
"nodes": [
@ -43,15 +56,15 @@ class TestLBaaSTask(unittest.TestCase):
self.assertEqual(len(r["nodes"]), 2)
self.assertEqual(r["nodes"][0]["address"], data["nodes"][0]["address"])
self.assertEqual(r["nodes"][0]["port"], data["nodes"][0]["port"])
self.assertIn("status", r["nodes"][0])
self.assertIn("condition", r["nodes"][0])
self.assertEqual(r["nodes"][1]["address"], data["nodes"][1]["address"])
self.assertEqual(r["nodes"][1]["port"], data["nodes"][1]["port"])
self.assertIn("status", r["nodes"][1])
self.assertIn("condition", r["nodes"][1])
def testMissingNodes(self):
""" Test invalid messages: missing nodes """
worker = None
worker = FakeWorker()
data = {
"name": "a-new-loadbalancer"
}
@ -63,7 +76,7 @@ class TestLBaaSTask(unittest.TestCase):
def testMissingPort(self):
""" Test invalid messages: missing port """
worker = None
worker = FakeWorker()
data = {
"name": "a-new-loadbalancer",
"nodes": [
@ -80,7 +93,7 @@ class TestLBaaSTask(unittest.TestCase):
def testMissingAddress(self):
""" Test invalid messages: missing address """
worker = None
worker = FakeWorker()
data = {
"name": "a-new-loadbalancer",
"nodes": [