diff --git a/quark/db/api.py b/quark/db/api.py index e35e12a..6748057 100644 --- a/quark/db/api.py +++ b/quark/db/api.py @@ -245,6 +245,37 @@ def port_create(context, **port_dict): return port +def port_disassociate_ip(context, ports, address): + assocs_to_remove = [assoc for assoc in address.associations + if assoc.port in ports] + for assoc in assocs_to_remove: + context.session.delete(assoc) + # NOTE(thomasem): Need to update in-session model for caller. + address.associations.remove(assoc) + context.session.add(address) + return address + + +def port_associate_ip(context, ports, address, enable_port=None): + for port in ports: + assoc = models.PortIpAssociation() + assoc.port_id = port.id + assoc.ip_address_id = address.id + assoc.enabled = port.id in enable_port if enable_port else False + address.associations.append(assoc) + context.session.add(address) + return address + + +def update_port_associations_for_ip(context, ports, address): + assoc_ports = set(address.ports) + new_ports = set(ports) + new_address = port_associate_ip(context, new_ports - assoc_ports, + address) + return port_disassociate_ip(context, + assoc_ports - new_ports, new_address) + + def port_update(context, port, **kwargs): if "addresses" in kwargs: port["ip_addresses"] = kwargs.pop("addresses") @@ -284,23 +315,6 @@ def ip_address_find(context, lock_mode=False, **filters): if lock_mode: query = query.with_lockmode("update") - ip_shared = filters.pop("shared", None) - if ip_shared is not None: - cnt = sql_func.count(models.port_ip_association_table.c.port_id) - stmt = context.session.query(models.IPAddress, - cnt.label("ports_count")) - stmt = stmt.outerjoin(models.port_ip_association_table) - stmt = stmt.group_by(models.IPAddress.id).subquery() - - query = query.outerjoin(stmt, stmt.c.id == models.IPAddress.id) - - # !@# HACK(amir): replace once attributes are configured in ip address - # extension correctly - if "True" in ip_shared: - query = query.filter(stmt.c.ports_count > 1) - else: - query = query.filter(stmt.c.ports_count <= 1) - model_filters = _model_query(context, models.IPAddress, filters) if "do_not_use" in filters: query = query.filter(models.Subnet.do_not_use == filters["do_not_use"]) @@ -308,6 +322,15 @@ def ip_address_find(context, lock_mode=False, **filters): if filters.get("device_id"): model_filters.append(models.IPAddress.ports.any( models.Port.device_id.in_(filters["device_id"]))) + + if filters.get("port_id"): + model_filters.append(models.IPAddress.ports.any( + models.Port.id == filters['port_id'])) + + if filters.get("address_type"): + model_filters.append( + models.IPAddress.address_type == filters['address_type']) + return query.filter(*model_filters) diff --git a/quark/db/ip_types.py b/quark/db/ip_types.py new file mode 100644 index 0000000..e473ace --- /dev/null +++ b/quark/db/ip_types.py @@ -0,0 +1,3 @@ +SHARED = 'shared' +FIXED = 'fixed' +FLOATING = 'floating' diff --git a/quark/db/models.py b/quark/db/models.py index ead84d6..471691e 100644 --- a/quark/db/models.py +++ b/quark/db/models.py @@ -25,6 +25,7 @@ from sqlalchemy.ext import hybrid from sqlalchemy import orm from quark.db import custom_types +from quark.db import ip_types # NOTE(mdietz): This is the only way to actually create the quotas table, # regardless if we need it. This is how it's done upstream. # NOTE(jhammond): If it isn't obvious quota_driver is unused and that's ok. @@ -108,6 +109,10 @@ class IsHazTags(object): return orm.relationship("TagAssociation", backref=backref) +class PortIpAssociation(object): + pass + + port_ip_association_table = sa.Table( "quark_port_ip_address_associations", BASEV2.metadata, @@ -123,6 +128,9 @@ port_ip_association_table = sa.Table( **TABLE_KWARGS) +orm.mapper(PortIpAssociation, port_ip_association_table) + + class IPAddress(BASEV2, models.HasId): """More closely emulate the melange version of the IP table. @@ -133,7 +141,6 @@ class IPAddress(BASEV2, models.HasId): __table_args__ = (sa.UniqueConstraint("subnet_id", "address", name="subnet_id_address"), TABLE_KWARGS) - address_types = set(['fixed', 'shared', 'floating']) address_readable = sa.Column(sa.String(128), nullable=False) address = sa.Column(custom_types.INET(), nullable=False, index=True) subnet_id = sa.Column(sa.String(36), @@ -149,8 +156,17 @@ class IPAddress(BASEV2, models.HasId): _deallocated = sa.Column(sa.Boolean()) # Legacy data used_by_tenant_id = sa.Column(sa.String(255)) - address_type = sa.Column(sa.Enum(*address_types, - name="quark_ip_address_types")) + + address_type = sa.Column(sa.Enum(ip_types.FIXED, ip_types.FLOATING, + ip_types.SHARED, + name="quark_ip_address_types")) + associations = orm.relationship(PortIpAssociation, backref="ip_address", + lazy='subquery') + + def enabled_for_port(self, port): + for assoc in self["associations"]: + if assoc.port_id == port["id"]: + return assoc.enabled @hybrid.hybrid_property def deallocated(self): @@ -311,6 +327,7 @@ class Port(BASEV2, models.HasTenant, models.HasId): device_id = sa.Column(sa.String(255), nullable=False, index=True) device_owner = sa.Column(sa.String(255)) bridge = sa.Column(sa.String(255)) + associations = orm.relationship(PortIpAssociation, backref="port") @declarative.declared_attr def ip_addresses(cls): diff --git a/quark/ipam.py b/quark/ipam.py index 96f99f8..e9fec7e 100644 --- a/quark/ipam.py +++ b/quark/ipam.py @@ -30,6 +30,7 @@ from oslo.config import cfg from oslo.db import exception as db_exception from quark.db import api as db_api +from quark.db import ip_types from quark.db import models from quark import exceptions as q_exc from quark import utils @@ -297,8 +298,9 @@ class QuarkIpam(object): deallocated_at=None, used_by_tenant_id=context.tenant_id, allocated_at=timeutils.utcnow(), + port_id=port_id, address_type=kwargs.get('address_type', - 'fixed')) + ip_types.FIXED)) return [updated_address] else: # Make sure we never find it again @@ -347,7 +349,8 @@ class QuarkIpam(object): context, address=next_ip, subnet_id=subnet["id"], deallocated=0, version=subnet["ip_version"], network_id=net_id, - address_type=kwargs.get('type', 'fixed')) + port_id=port_id, + address_type=kwargs.get('address_type', ip_types.FIXED)) address["deallocated"] = 0 except Exception: # NOTE(mdietz): Our version of sqlalchemy incorrectly raises None @@ -429,7 +432,9 @@ class QuarkIpam(object): context, address, deallocated=False, deallocated_at=None, used_by_tenant_id=context.tenant_id, - allocated_at=timeutils.utcnow()) + allocated_at=timeutils.utcnow(), + address_type=kwargs.get('address_type', + ip_types.FIXED)) # This triggers when the IP is allocated to another tenant, # either because we missed it due to our filters above, or @@ -440,7 +445,8 @@ class QuarkIpam(object): context, address=ip_address, subnet_id=subnet["id"], version=subnet["ip_version"], network_id=net_id, - address_type=kwargs.get('address_type', 'fixed')) + address_type=kwargs.get('address_type', + ip_types.FIXED)) except db_exception.DBDuplicateEntry: LOG.info("{0} exists but was already " "allocated".format(str(ip_address))) diff --git a/quark/plugin.py b/quark/plugin.py index 622960f..11487b0 100644 --- a/quark/plugin.py +++ b/quark/plugin.py @@ -16,6 +16,7 @@ """ v2 Neutron Plug-in API Quark Implementation """ + from neutron.extensions import securitygroup as sg_ext from neutron import neutron_plugin_base_v2 from neutron.openstack.common import log as logging diff --git a/quark/plugin_modules/ip_addresses.py b/quark/plugin_modules/ip_addresses.py index 7a43296..cc9b4d0 100644 --- a/quark/plugin_modules/ip_addresses.py +++ b/quark/plugin_modules/ip_addresses.py @@ -14,19 +14,23 @@ # under the License. from neutron.common import exceptions -from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from oslo.config import cfg import webob from quark.db import api as db_api +from quark.db import ip_types from quark import exceptions as quark_exceptions +from quark import ipam from quark import plugin_views as v - CONF = cfg.CONF LOG = logging.getLogger(__name__) -ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))() + + +def _get_ipam_driver_for_network(context, net_id): + return ipam.IPAM_REGISTRY.get_strategy(db_api.network_find( + context, id=net_id, scope=db_api.ONE)['ipam_strategy']) def get_ip_addresses(context, **filters): @@ -45,22 +49,55 @@ def get_ip_address(context, id): return v._make_ip_dict(addr) -def create_ip_address(context, ip_address): - LOG.info("create_ip_address for tenant %s" % context.tenant_id) +def validate_ports_on_network_and_same_segment(ports, network_id): + first_segment = None + for port in ports: + addresses = port.get("ip_addresses", []) + for address in addresses: + if address["network_id"] != network_id: + raise exceptions.BadRequest(resource="ip_addresses", + msg="Must have ports connected to" + " the requested network") + segment_id = address.subnet.get("segment_id") + first_segment = first_segment or segment_id + if segment_id != first_segment: + raise exceptions.BadRequest(resource="ip_addresses", + msg="Segment id's do not match.") - port = None - ip_dict = ip_address["ip_address"] - port_ids = ip_dict.get('port_ids') + +def _shared_ip_request(ip_address): + port_ids = ip_address.get('ip_address', {}).get('port_ids', []) + return len(port_ids) > 1 + + +def _can_be_shared(address_model): + # Don't share IP if any of the assocs is enabled + return not any(a.enabled for a in address_model.associations) + + +def create_ip_address(context, body): + LOG.info("create_ip_address for tenant %s" % context.tenant_id) + address_type = (ip_types.SHARED if _shared_ip_request(body) + else ip_types.FIXED) + ip_dict = body.get("ip_address") + port_ids = ip_dict.get('port_ids', []) network_id = ip_dict.get('network_id') device_ids = ip_dict.get('device_ids') ip_version = ip_dict.get('version') ip_address = ip_dict.get('ip_address') + # If no version is passed, you would get what the network provides, + # which could be both v4 and v6 addresses. Rather than allow for such + # an ambiguous outcome, we'll raise instead + if not ip_version: + raise exceptions.BadRequest(resource="ip_addresses", + msg="version is required.") + if not network_id: + raise exceptions.BadRequest(resource="ip_addresses", + msg="network_id is required.") + ipam_driver = _get_ipam_driver_for_network(context, network_id) + new_addresses = [] ports = [] - if device_ids and not network_id: - raise exceptions.BadRequest( - resource="ip_addresses", - msg="network_id is required if device_ids are supplied.") with context.session.begin(): if network_id and device_ids: for device_id in device_ids: @@ -70,6 +107,7 @@ def create_ip_address(context, ip_address): ports.append(port) elif port_ids: for port_id in port_ids: + port = db_api.port_find(context, id=port_id, tenant_id=context.tenant_id, scope=db_api.ONE) @@ -79,18 +117,25 @@ def create_ip_address(context, ip_address): raise exceptions.PortNotFound(port_id=port_ids, net_id=network_id) - address = ipam_driver.allocate_ip_address( - context, - port['network_id'], - port['id'], - CONF.QUARK.ipam_reuse_after, - ip_version, - ip_addresses=[ip_address]) + validate_ports_on_network_and_same_segment(ports, network_id) - for port in ports: - port["ip_addresses"].append(address) - - return v._make_ip_dict(address) + # Shared Ips are only new IPs. Two use cases: if we got device_id + # or if we got port_ids. We should check the case where we got port_ids + # and device_ids. The device_id must have a port on the network, + # and any port_ids must also be on that network already. If we have + # more than one port by this step, it's considered a shared IP, + # and therefore will be marked as unconfigured (enabled=False) + # for all ports. + ipam_driver.allocate_ip_address(context, new_addresses, network_id, + None, CONF.QUARK.ipam_reuse_after, + version=ip_version, + ip_addresses=[ip_address] + if ip_address else [], + address_type=address_type) + with context.session.begin(): + new_address = db_api.port_associate_ip(context, ports, + new_addresses[0]) + return v._make_ip_dict(new_address) def _get_deallocated_override(): @@ -98,20 +143,28 @@ def _get_deallocated_override(): return '2000-01-01 00:00:00' +def _raise_if_shared_and_enabled(address_request, address_model): + if (_shared_ip_request(address_request) + and not _can_be_shared(address_model)): + raise exceptions.BadRequest( + resource="ip_addresses", + msg="This IP address is in use on another port and cannot be " + "shared") + + def update_ip_address(context, id, ip_address): LOG.info("update_ip_address %s for tenant %s" % (id, context.tenant_id)) - + ports = [] with context.session.begin(): address = db_api.ip_address_find( context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE) - if not address: raise exceptions.NotFound( message="No IP address found with id=%s" % id) - reset = ip_address['ip_address'].get('reset_allocation_time', - False) + ipam_driver = _get_ipam_driver_for_network(context, address.network_id) + reset = ip_address['ip_address'].get('reset_allocation_time', False) if reset and address['deallocated'] == 1: if context.is_admin: LOG.info("IP's deallocated time being manually reset") @@ -120,27 +173,30 @@ def update_ip_address(context, id, ip_address): msg = "Modification of reset_allocation_time requires admin" raise webob.exc.HTTPForbidden(detail=msg) - old_ports = address['ports'] port_ids = ip_address['ip_address'].get('port_ids') - if port_ids is None: - return v._make_ip_dict(address) - - for port in old_ports: - port['ip_addresses'].remove(address) if port_ids: - ports = db_api.port_find( - context, tenant_id=context.tenant_id, id=port_ids, - scope=db_api.ALL) - - # NOTE: could be considered inefficient because we're converting - # to a list to check length. Maybe revisit + _raise_if_shared_and_enabled(ip_address, address) + ports = db_api.port_find(context, tenant_id=context.tenant_id, + id=port_ids, scope=db_api.ALL) + # NOTE(name): could be considered inefficient because we're + # converting to a list to check length. Maybe revisit if len(ports) != len(port_ids): raise exceptions.NotFound( message="No ports not found with ids=%s" % port_ids) - for port in ports: - port['ip_addresses'].extend([address]) - else: - address["deallocated"] = 1 - return v._make_ip_dict(address) + validate_ports_on_network_and_same_segment(ports, + address["network_id"]) + + LOG.info("Updating IP address, %s, to only be used by the" + "following ports: %s" % (address.address_readable, + [p.id for p in ports])) + new_address = db_api.update_port_associations_for_ip(context, + ports, + address) + else: + if port_ids is not None: + ipam_driver.deallocate_ip_address( + context, address) + return v._make_ip_dict(address) + return v._make_ip_dict(new_address) diff --git a/quark/plugin_views.py b/quark/plugin_views.py index 8f7cbb3..a29a228 100644 --- a/quark/plugin_views.py +++ b/quark/plugin_views.py @@ -202,9 +202,11 @@ def _port_dict(port, fields=None): return res -def _make_port_address_dict(ip, fields=None): +def _make_port_address_dict(ip, port, fields=None): + enabled = ip.enabled_for_port(port) ip_addr = {"subnet_id": ip.get("subnet_id"), - "ip_address": ip.formatted()} + "ip_address": ip.formatted(), + "enabled": enabled} if fields and "port_subnets" in fields: ip_addr["subnet"] = _make_subnet_dict(ip["subnet"]) @@ -213,7 +215,7 @@ def _make_port_address_dict(ip, fields=None): def _make_port_dict(port, fields=None): res = _port_dict(port) - res["fixed_ips"] = [_make_port_address_dict(ip, fields) + res["fixed_ips"] = [_make_port_address_dict(ip, port, fields) for ip in port.ip_addresses] return res @@ -222,7 +224,7 @@ def _make_ports_list(query, fields=None): ports = [] for port in query: port_dict = _port_dict(port, fields) - port_dict["fixed_ips"] = [_make_port_address_dict(addr, fields) + port_dict["fixed_ips"] = [_make_port_address_dict(addr, port, fields) for addr in port.ip_addresses] ports.append(port_dict) return ports @@ -253,13 +255,12 @@ def _make_ip_dict(address): return {"id": address["id"], "network_id": net_id, "address": address.formatted(), - "port_ids": [port["id"] for port in address["ports"]], - "device_ids": [port["device_id"] or "" - for port in address["ports"]], + "port_ids": [assoc.port_id + for assoc in address["associations"]], "subnet_id": address["subnet_id"], "used_by_tenant_id": address["used_by_tenant_id"], "version": address["version"], - "shared": len(address["ports"]) > 1} + "address_type": address['address_type']} def _make_ip_policy_dict(ipp): diff --git a/quark/tests/plugin_modules/test_ip_addresses.py b/quark/tests/plugin_modules/test_ip_addresses.py index 38976fb..ae791d4 100644 --- a/quark/tests/plugin_modules/test_ip_addresses.py +++ b/quark/tests/plugin_modules/test_ip_addresses.py @@ -18,13 +18,41 @@ import contextlib import mock from mock import patch from neutron.common import exceptions +from oslo.config import cfg import webob from quark.db import models from quark import exceptions as quark_exceptions +from quark.plugin_modules import ip_addresses from quark.tests import test_quark_plugin +def _port_associate_stub(context, ports, address, **kwargs): + for port in ports: + assoc = models.PortIpAssociation() + assoc.port_id = port.id + assoc.ip_address_id = address.id + assoc.port = port + # NOTE(thomasem): This causes address['associations'] to gain this + # PortIpAssocation instance. + assoc.ip_address = address + assoc.enabled = address.address_type == "fixed" + return address + + +def _port_disassociate_stub(context, ports, address): + port_ids = [port.id for port in ports] + for idx, assoc in enumerate(address['associations']): + if assoc.port_id in port_ids: + address.associations.pop(idx) + return address + + +def _ip_deallocate_stub(context, address): + address['deallocated'] = 1 + address['address_type'] = None + + class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): @contextlib.contextmanager def _stubs(self, port, addr): @@ -36,12 +64,26 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): if addr: addr_model = models.IPAddress() addr_model.update(addr) + + def _alloc_ip(context, new_addr, net_id, port_m, *args, **kwargs): + new_addr.extend([addr_model]) + with contextlib.nested( mock.patch("quark.db.api.port_find"), - mock.patch("quark.ipam.QuarkIpam.allocate_ip_address") - ) as (port_find, alloc_ip): + mock.patch( + "quark.plugin_modules.ip_addresses" + "._get_ipam_driver_for_network"), + mock.patch( + "quark.plugin_modules.ip_addresses.db_api" + ".port_associate_ip"), + mock.patch( + "quark.plugin_modules.ip_addresses" + ".validate_ports_on_network_and_same_segment") + ) as (port_find, mock_ipam, mock_port_associate_ip, validate): port_find.return_value = port_model - alloc_ip.return_value = addr_model + mock_ipam_driver = mock_ipam.return_value + mock_ipam_driver.allocate_ip_address.side_effect = _alloc_ip + mock_port_associate_ip.side_effect = _port_associate_stub yield def test_create_ip_address_by_network_and_device(self): @@ -49,17 +91,15 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", subnet_id=1, network_id=2, version=4, used_by_tenant_id=1) with self._stubs(port=port, addr=ip): - ip_address = dict(network_id=ip["network_id"], - device_ids=[4]) + ip_address = {"network_id": ip["network_id"], + "version": 4, 'device_ids': [2]} response = self.plugin.create_ip_address( self.context, dict(ip_address=ip_address)) - self.assertIsNotNone(response["id"]) self.assertEqual(response["network_id"], ip_address["network_id"]) - self.assertEqual(response["device_ids"], [""]) self.assertEqual(response["port_ids"], [port["id"]]) self.assertEqual(response["subnet_id"], ip["subnet_id"]) - self.assertFalse(response["shared"]) + self.assertEqual(response['address_type'], None) self.assertEqual(response["version"], 4) self.assertEqual(response["address"], "192.168.1.100") self.assertEqual(response["used_by_tenant_id"], 1) @@ -70,6 +110,8 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): subnet_id=1, network_id=2, version=4) with self._stubs(port=port, addr=ip): ip_address = dict(port_ids=[port["id"]]) + ip_address['version'] = 4 + ip_address['network_id'] = 2 response = self.plugin.create_ip_address( self.context, dict(ip_address=ip_address)) @@ -80,7 +122,7 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): def test_create_ip_address_by_device_no_network_fails(self): with self._stubs(port={}, addr=None): - ip_address = dict(device_ids=[4]) + ip_address = dict(device_ids=[4], version=4) with self.assertRaises(exceptions.BadRequest): self.plugin.create_ip_address(self.context, dict(ip_address=ip_address)) @@ -89,21 +131,265 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin): with self._stubs(port=None, addr=None): with self.assertRaises(exceptions.PortNotFound): ip_address = {'ip_address': {'network_id': 'fake', - 'device_id': 'fake'}} + 'device_id': 'fake', + 'version': 4}} self.plugin.create_ip_address(self.context, ip_address) def test_create_ip_address_invalid_port(self): with self._stubs(port=None, addr=None): with self.assertRaises(exceptions.PortNotFound): - ip_address = {'ip_address': {'port_id': 'fake'}} + ip_address = { + 'ip_address': { + 'port_id': 'fake', + 'version': 4, + 'network_id': 'fake' + } + } self.plugin.create_ip_address(self.context, ip_address) +@mock.patch("quark.plugin_modules.ip_addresses.v") +@mock.patch("quark.plugin_modules.ip_addresses" + ".validate_ports_on_network_and_same_segment") +@mock.patch("quark.plugin_modules.ip_addresses._get_ipam_driver_for_network") +@mock.patch("quark.plugin_modules.ip_addresses.db_api") +class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin): + def _alloc_stub(self, ip_model): + def _alloc_ip(context, addr, *args, **kwargs): + addr.append(ip_model) + return _alloc_ip + + def test_create_ip_address_calls_port_associate_ip(self, mock_dbapi, + mock_ipam, *args): + port = dict(id=1, network_id=2, ip_addresses=[]) + ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", + subnet_id=1, network_id=2, version=4, used_by_tenant_id=1) + port_model = models.Port() + port_model.update(port) + ip_model = models.IPAddress() + ip_model.update(ip) + + mock_dbapi.port_find.return_value = port_model + mock_ipam_driver = mock_ipam.return_value + mock_ipam_driver.allocate_ip_address.side_effect = ( + self._alloc_stub(ip_model)) + ip_address = {"network_id": ip["network_id"], + "version": 4, 'device_ids': [2], + "port_ids": [1]} + + self.plugin.create_ip_address(self.context, + dict(ip_address=ip_address)) + mock_dbapi.port_associate_ip.assert_called_once_with( + self.context, [port_model], ip_model) + + def test_create_ip_address_address_type_shared(self, mock_dbapi, mock_ipam, + *args): + cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK") + ports = [dict(id=1, network_id=2, ip_addresses=[]), + dict(id=2, network_id=2, ip_addresses=[])] + ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", + subnet_id=1, network_id=2, version=4, used_by_tenant_id=1) + port_models = [models.Port(**p) for p in ports] + ip_model = models.IPAddress() + ip_model.update(ip) + mock_dbapi.port_find.side_effect = port_models + mock_ipam_driver = mock_ipam.return_value + mock_ipam_driver.allocate_ip_address.side_effect = ( + self._alloc_stub(ip_model)) + + ip_address = {"network_id": ip["network_id"], + "version": 4, 'device_ids': [2], + "port_ids": [pm.id for pm in port_models]} + self.plugin.create_ip_address(self.context, + dict(ip_address=ip_address)) + # NOTE(thomasem): Having to assert that [ip_model] was passed instead + # of an empty list due to the expected behavior of this method being + # that it mutates the passed in list. So, after it's run, the list + # has already been mutated and it's a reference to that list that + # we're checking. This method ought to be changed to return the new + # IP and let the caller mutate the list, not the other way around. + mock_ipam_driver.allocate_ip_address.assert_called_once_with( + self.context, [ip_model], ip['network_id'], None, 100, + version=ip_address['version'], ip_addresses=[], + address_type="shared") + + def test_create_ip_address_address_type_fixed(self, mock_dbapi, mock_ipam, + *args): + cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK") + ports = [dict(id=1, network_id=2, ip_addresses=[])] + ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", + subnet_id=1, network_id=2, version=4, used_by_tenant_id=1) + port_models = [models.Port(**p) for p in ports] + ip_model = models.IPAddress() + ip_model.update(ip) + mock_dbapi.port_find.side_effect = port_models + mock_ipam_driver = mock_ipam.return_value + mock_ipam_driver.allocate_ip_address.side_effect = ( + self._alloc_stub(ip_model)) + + ip_address = {"network_id": ip["network_id"], + "version": 4, 'device_ids': [2], + "port_ids": [pm.id for pm in port_models]} + self.plugin.create_ip_address(self.context, + dict(ip_address=ip_address)) + # NOTE(thomasem): Having to assert that [ip_model] was passed instead + # of an empty list due to the expected behavior of this method being + # that it mutates the passed in list. So, after it's run, the list + # has already been mutated and it's a reference to that list that + # we're checking. This method ought to be changed to return the new + # IP and let the caller mutate the list, not the other way around. + mock_ipam_driver.allocate_ip_address.assert_called_once_with( + self.context, [ip_model], ip['network_id'], None, 100, + version=ip_address['version'], ip_addresses=[], + address_type="fixed") + + +class TestQuarkSharedIPAddressPortsValid(test_quark_plugin.TestQuarkPlugin): + def test_validate_ports_on_network_raise_segment(self): + mock_ports = [models.Port(id="1", network_id="2"), + models.Port(id="2", network_id="2")] + mock_subnets = [models.Subnet(id="1", segment_id="2"), + models.Subnet(id="2", segment_id="3")] + for i, subnet in enumerate(mock_subnets): + mock_address = models.IPAddress(id="2", network_id="2") + mock_address.subnet = subnet + mock_ports[i].ip_addresses.append(mock_address) + + with self.assertRaises(exceptions.BadRequest): + ip_addresses.validate_ports_on_network_and_same_segment( + mock_ports, "2") + + def test_validate_ports_on_network_raise_segment_multiple_ips(self): + mock_ports = [models.Port(id="1", network_id="2"), + models.Port(id="2", network_id="2")] + mock_subnets = [models.Subnet(id="1", segment_id="2"), + models.Subnet(id="2", segment_id="3")] + for i, subnet in enumerate(mock_subnets): + mock_address = models.IPAddress(id="2", network_id="2") + mock_address.subnet = subnet + for x in xrange(i + 1): + mock_ports[x].ip_addresses.append(mock_address) + + with self.assertRaises(exceptions.BadRequest): + ip_addresses.validate_ports_on_network_and_same_segment( + mock_ports, "2") + + def test_validate_ports_on_network_raise_network(self): + mock_ports = [models.Port(id="1", network_id="2"), + models.Port(id="2", network_id="3")] + mock_addresses = [models.IPAddress(id="1", network_id="2"), + models.IPAddress(id="2", network_id="3")] + + for i, ip_address in enumerate(mock_addresses): + ip_address.subnet = models.Subnet(id="1", segment_id="2") + mock_ports[i].ip_addresses.append(ip_address) + + with self.assertRaises(exceptions.BadRequest): + ip_addresses.validate_ports_on_network_and_same_segment( + mock_ports, "2") + + def test_validate_ports_on_network_valid(self): + mock_ports = [models.Port(id="1", network_id="2"), + models.Port(id="2", network_id="2")] + for p in mock_ports: + p.ip_addresses.append(models.IPAddress(id="1", network_id="2")) + p.ip_addresses[-1].subnet = models.Subnet(id="1", segment_id="1") + + r = ip_addresses.validate_ports_on_network_and_same_segment( + mock_ports, "2") + self.assertEqual(r, None) + + +class TestQuarkSharedIPAddress(test_quark_plugin.TestQuarkPlugin): + def test_shared_ip_request(self): + ip_address_mock = {"ip_address": {"port_ids": [1, 2, 3]}} + r = ip_addresses._shared_ip_request(ip_address_mock) + self.assertTrue(r) + + def test_shared_ip_request_false(self): + ip_address_mock = {"ip_address": {"port_ids": [1]}} + r = ip_addresses._shared_ip_request(ip_address_mock) + self.assertFalse(r) + + def test_can_be_shared(self): + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + mock_assocs = [] + for x in xrange(3): + assoc = models.PortIpAssociation() + assoc.ip_address_id = mock_address.id + assoc.ip_address = mock_address + assoc.enabled = [False, False, False][x] + mock_assocs.append(assoc) + r = ip_addresses._can_be_shared(mock_address) + self.assertTrue(r) + + def test_can_be_shared_false(self): + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + mock_assocs = [] + for x in xrange(3): + assoc = models.PortIpAssociation() + assoc.ip_address_id = mock_address.id + assoc.ip_address = mock_address + assoc.enabled = [False, True, False][x] + mock_assocs.append(assoc) + r = ip_addresses._can_be_shared(mock_address) + self.assertFalse(r) + + @mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request") + @mock.patch("quark.plugin_modules.ip_addresses._can_be_shared") + def test_raise_if_shared_and_enabled(self, can_be_shared_mock, + shared_ip_request_mock): + can_be_shared_mock.return_value = False + shared_ip_request_mock.return_value = True + obj = mock.MagicMock() + with self.assertRaises(exceptions.BadRequest): + ip_addresses._raise_if_shared_and_enabled(obj, obj) + + @mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request") + @mock.patch("quark.plugin_modules.ip_addresses._can_be_shared") + def test_raise_if_shared_and_enabled_noraise(self, can_be_shared_mock, + shared_ip_request_mock): + can_be_shared_mock.return_value = True + shared_ip_request_mock.return_value = True + obj = mock.MagicMock() + r = ip_addresses._raise_if_shared_and_enabled(obj, obj) + self.assertEqual(r, None) + + @mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request") + @mock.patch("quark.plugin_modules.ip_addresses._can_be_shared") + def test_raise_if_shared_and_enabled_fixed_request(self, + can_be_shared_mock, + shared_ip_request_mock): + can_be_shared_mock.return_value = True + shared_ip_request_mock.return_value = False + obj = mock.MagicMock() + r = ip_addresses._raise_if_shared_and_enabled(obj, obj) + self.assertEqual(r, None) + + @mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request") + @mock.patch("quark.plugin_modules.ip_addresses._can_be_shared") + def test_raise_if_shared_and_enabled_fixed_request_and_not_shareable( + self, can_be_shared_mock, shared_ip_request_mock): + can_be_shared_mock.return_value = False + shared_ip_request_mock.return_value = False + obj = mock.MagicMock() + r = ip_addresses._raise_if_shared_and_enabled(obj, obj) + self.assertEqual(r, None) + + class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager def _stubs(self, ports, addr, addr_ports=False): port_models = [] addr_model = None + for port in ports: port_model = models.Port() port_model.update(port) @@ -118,9 +404,21 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin): with contextlib.nested( mock.patch("%s.port_find" % db_mod), mock.patch("%s.ip_address_find" % db_mod), - ) as (port_find, ip_find): + mock.patch("%s.port_associate_ip" % db_mod), + mock.patch("%s.port_disassociate_ip" % db_mod), + mock.patch("quark.plugin_modules.ip_addresses" + ".validate_ports_on_network_and_same_segment"), + mock.patch("quark.plugin_modules.ip_addresses" + "._get_ipam_driver_for_network") + ) as (port_find, ip_find, port_associate_ip, + port_disassociate_ip, val, mock_ipam): port_find.return_value = port_models ip_find.return_value = addr_model + port_associate_ip.side_effect = _port_associate_stub + port_disassociate_ip.side_effect = _port_disassociate_stub + mock_ipam_driver = mock_ipam.return_value + mock_ipam_driver.deallocate_ip_address.side_effect = ( + _ip_deallocate_stub) yield def test_update_ip_address_does_not_exist(self): @@ -145,7 +443,8 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin): ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", subnet_id=1, network_id=2, version=4) with self._stubs(ports=[port], addr=ip): - ip_address = {'ip_address': {'port_ids': [port['id']]}} + ip_address = {'ip_address': {'port_ids': [port['id']], + 'network_id': 2}} response = self.plugin.update_ip_address(self.context, ip['id'], ip_address) @@ -222,50 +521,33 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin): self.assertEqual(response['port_ids'], []) -class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin): +class TestQuarkGetIpAddress(test_quark_plugin.TestQuarkPlugin): @contextlib.contextmanager def _stubs(self, ips, ports): with mock.patch("quark.db.api.ip_address_find") as ip_find: - ip_models = [] port_models = [] for port in ports: p = models.Port() p.update(port) port_models.append(p) - if isinstance(ips, list): - for ip in ips: - version = ip.pop("version") - ip_mod = models.IPAddress() - ip_mod.update(ip) - ip_mod.version = version - ip_mod.ports = port_models - ip_models.append(ip_mod) - ip_find.return_value = ip_models - else: if ips: version = ips.pop("version") ip_mod = models.IPAddress() ip_mod.update(ips) ip_mod.version = version ip_mod.ports = port_models + # Set up Port to IP associations + assoc = models.PortIpAssociation() + assoc.port = p + assoc.port_id = p.id + assoc.ip_address = ip_mod + assoc.ip_address_id = ip_mod.id + ip_mod.associations.append(assoc) ip_find.return_value = ip_mod else: ip_find.return_value = ips yield - def test_get_ip_addresses(self): - port = dict(id=100, device_id="foobar") - ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", - subnet_id=1, network_id=2, version=4) - with self._stubs(ips=[ip], ports=[port]): - res = self.plugin.get_ip_addresses(self.context) - addr_res = res[0] - self.assertEqual(ip["id"], addr_res["id"]) - self.assertEqual(ip["subnet_id"], addr_res["subnet_id"]) - self.assertEqual(ip["address_readable"], addr_res["address"]) - self.assertEqual(addr_res["port_ids"][0], port["id"]) - self.assertEqual(addr_res["device_ids"][0], port["device_id"]) - def test_get_ip_address(self): port = dict(id=100) ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", @@ -282,3 +564,58 @@ class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin): with self._stubs(ips=None, ports=[port]): with self.assertRaises(quark_exceptions.IpAddressNotFound): self.plugin.get_ip_address(self.context, 1) + + +class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin): + @contextlib.contextmanager + def _stubs(self, ips, ports): + with mock.patch("quark.db.api.ip_address_find") as ip_find: + ip_models = [] + port_models = [] + for port in ports: + p = models.Port() + p.update(port) + port_models.append(p) + for ip in ips: + version = ip.pop("version") + ip_mod = models.IPAddress() + ip_mod.update(ip) + ip_mod.version = version + ip_mod.ports = port_models + # Set up Port to IP associations + assoc = models.PortIpAssociation() + assoc.port = p + assoc.port_id = p.id + assoc.ip_address = ip_mod + assoc.ip_address_id = ip_mod.id + ip_mod.associations.append(assoc) + ip_models.append(ip_mod) + ip_find.return_value = ip_models + yield + + def test_get_ip_addresses(self): + port = dict(id=100, device_id="foobar") + ip = dict(id=1, address=3232235876, address_readable="192.168.1.100", + subnet_id=1, network_id=2, version=4) + with self._stubs(ips=[ip], ports=[port]): + res = self.plugin.get_ip_addresses(self.context) + addr_res = res[0] + self.assertEqual(ip["id"], addr_res["id"]) + self.assertEqual(ip["subnet_id"], addr_res["subnet_id"]) + self.assertEqual(ip["address_readable"], addr_res["address"]) + self.assertEqual(addr_res["port_ids"][0], port["id"]) + + def test_get_ip_addresses_multiple(self): + port = dict(id=100, device_id="foobar") + ips = [dict(id=1, address=3232235876, address_readable="192.168.1.100", + subnet_id=1, network_id=2, version=4), + dict(id=2, address=3232235878, address_readable="192.168.1.101", + subnet_id=1, network_id=2, version=4)] + with self._stubs(ips=ips, ports=[port]): + res = self.plugin.get_ip_addresses(self.context) + self.assertEqual(len(res), 2) + for i, addr in enumerate(sorted(res, key=lambda x: x['id'])): + self.assertEqual(ips[i]["id"], addr["id"]) + self.assertEqual(ips[i]["subnet_id"], addr["subnet_id"]) + self.assertEqual(ips[i]["address_readable"], addr["address"]) + self.assertEqual(addr["port_ids"][0], port["id"]) diff --git a/quark/tests/plugin_modules/test_ports.py b/quark/tests/plugin_modules/test_ports.py index 624c133..84c04d3 100644 --- a/quark/tests/plugin_modules/test_ports.py +++ b/quark/tests/plugin_modules/test_ports.py @@ -217,7 +217,10 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin): ) as (network_find, port_count, port_create, port_find, allocate_ip, allocate_mac): def allocate_ip_effect(context, addresses, *args, **kwargs): - addresses.extend(ip_models) + + for ip_model in ip_models: + ip_model.enabled_for_port = lambda x: True + addresses.append(ip_models) network_find.return_value = network port_count.return_value = 0 port_create.return_value = port_model @@ -238,9 +241,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin): "subnet_id": "subnet2", "version": 4} fixed_ips = [{"ip_address": ip1['address_readable'], - "subnet_id": ip1['subnet_id']}, + "subnet_id": ip1['subnet_id'], + "enabled": True}, {"ip_address": ip2['address_readable'], - "subnet_id": ip2['subnet_id']}] + "subnet_id": ip2['subnet_id'], + "enabled": True}] port = {"port": {'fixed_ips': fixed_ips, 'id': "11111111-2222-3333-4444-555555555555", @@ -277,9 +282,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin): "subnet_id": "subnet2", "version": 6} fixed_ips = [{"ip_address": ip1['address_readable'], - "subnet_id": ip1['subnet_id']}, + "subnet_id": ip1['subnet_id'], + "enabled": True}, {"ip_address": ip2['address_readable'], - "subnet_id": ip2['subnet_id']}] + "subnet_id": ip2['subnet_id'], + "enabled": True}] port = {"port": {'fixed_ips': fixed_ips, 'id': "11111111-2222-3333-4444-555555555555", @@ -316,9 +323,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin): "subnet_id": "subnet2", "version": 6} fixed_ips = [{"ip_address": ip1['address_readable'], - "subnet_id": ip1['subnet_id']}, + "subnet_id": ip1['subnet_id'], + "enabled": True}, {"ip_address": ip2['address_readable'], - "subnet_id": ip2['subnet_id']}] + "subnet_id": ip2['subnet_id'], + "enabled": True}] port = {"port": {'fixed_ips': fixed_ips, 'id': "11111111-2222-3333-4444-555555555555", @@ -395,9 +404,18 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin): if network: network["network_plugin"] = "BASE" network["ipam_strategy"] = "ANY" - port_model = models.Port() - port_model.update(port) - port_models = port_model + + def _create_db_port(context, **kwargs): + port_model = models.Port() + port_model.update(kwargs) + return port_model + + def _alloc_ip(context, new_ips, *args, **kwargs): + ip_mod = models.IPAddress() + ip_mod.update(addr) + ip_mod.enabled_for_port = lambda x: True + new_ips.extend([ip_mod]) + return mock.DEFAULT with contextlib.nested( mock.patch("quark.db.api.port_create"), @@ -408,9 +426,9 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin): mock.patch("neutron.quota.QuotaEngine.limit_check") ) as (port_create, net_find, alloc_ip, alloc_mac, port_count, limit_check): - port_create.return_value = port_models + port_create.side_effect = _create_db_port net_find.return_value = network - alloc_ip.return_value = addr + alloc_ip.side_effect = _alloc_ip alloc_mac.return_value = mac port_count.return_value = 0 if limit_checks: @@ -425,6 +443,7 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin): port = dict(port=dict(mac_address=mac["address"], network_id=1, tenant_id=self.context.tenant_id, device_id=2, name=port_name)) + expected = {'status': "ACTIVE", 'name': port_name, 'device_owner': None, @@ -493,10 +512,13 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin): ip = mock.MagicMock() ip.get = lambda x, *y: 1 if x == "subnet_id" else None ip.formatted = lambda: "192.168.10.45" - fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")] + ip.enabled_for_port = lambda x: True + fixed_ips = [dict(subnet_id=1, enabled=True, + ip_address="192.168.10.45")] port = dict(port=dict(mac_address=mac["address"], network_id=1, tenant_id=self.context.tenant_id, device_id=2, fixed_ips=fixed_ips, ip_addresses=[ip])) + expected = {'status': "ACTIVE", 'device_owner': None, 'mac_address': mac["address"], diff --git a/quark/tests/test_db_api.py b/quark/tests/test_db_api.py index 18d64d3..a2b834e 100644 --- a/quark/tests/test_db_api.py +++ b/quark/tests/test_db_api.py @@ -17,6 +17,7 @@ import mock import netaddr from quark.db import api as db_api +from quark.db import models from quark.tests.functional.base import BaseFunctionalTest @@ -43,6 +44,35 @@ class TestDBAPI(BaseFunctionalTest): db_api.ip_address_find(self.context, device_id="foo") self.assertEqual(filter_mock.filter.call_count, 1) + def test_ip_address_find_address_type(self): + self.context.session.query = mock.MagicMock() + second_query_mock = self.context.session.query.return_value + filter_mock = second_query_mock.join.return_value + + db_api.ip_address_find(self.context, address_type="foo") + # NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression + # when using SQLAlchemy models in expressions. + expected_filter = models.IPAddress.address_type == "foo" + self.assertEqual(len(filter_mock.filter.call_args[0]), 1) + # NOTE(thomasem): Unfortunately BinaryExpression.compare isn't + # showing to be a reliable comparison, so using the string + # representation which dumps the associated SQL for the filter. + self.assertEqual(str(expected_filter), str( + filter_mock.filter.call_args[0][0])) + + def test_ip_address_find_port_id(self): + self.context.session.query = mock.MagicMock() + second_query_mock = self.context.session.query.return_value + final_query_mock = second_query_mock.join.return_value + + db_api.ip_address_find(self.context, port_id="foo") + # NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression + # when using SQLAlchemy models in expressions. + expected_filter = models.IPAddress.ports.any(models.Port.id == "foo") + self.assertEqual(len(final_query_mock.filter.call_args[0]), 1) + self.assertEqual(str(expected_filter), str( + final_query_mock.filter.call_args[0][0])) + def test_ip_address_find_ip_address_object(self): ip_address = netaddr.IPAddress("192.168.10.1") try: @@ -58,3 +88,96 @@ class TestDBAPI(BaseFunctionalTest): scope=db_api.ONE) except Exception as e: self.fail("Expected no exceptions: %s" % e) + + def test_port_associate_ip(self): + self.context.session.add = mock.Mock() + mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[]) + for x in xrange(4)] + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + r = db_api.port_associate_ip(self.context, mock_ports, mock_address) + self.assertEqual(len(r.associations), len(mock_ports)) + for assoc, port in zip(r.associations, mock_ports): + self.assertEqual(assoc.port_id, port.id) + self.assertEqual(assoc.ip_address_id, mock_address.id) + self.assertEqual(assoc.enabled, False) + self.context.session.add.assert_called_once_with(r) + + def test_port_associate_ip_enable_port(self): + self.context.session.add = mock.Mock() + mock_port = models.Port(id="1", network_id="2", ip_addresses=[]) + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + r = db_api.port_associate_ip(self.context, [mock_port], mock_address, + enable_port="1") + self.assertEqual(len(r.associations), 1) + assoc = r.associations[0] + self.assertEqual(assoc.port_id, mock_port.id) + self.assertEqual(assoc.ip_address_id, mock_address.id) + self.assertEqual(assoc.enabled, True) + self.context.session.add.assert_called_once_with(r) + + def test_port_disassociate_ip(self): + self.context.session.add = mock.Mock() + self.context.session.delete = mock.Mock() + mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[]) + for x in xrange(4)] + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + mock_assocs = [] + for p in mock_ports: + assoc = models.PortIpAssociation() + assoc.port_id = p.id + assoc.port = p + assoc.ip_address_id = mock_address.id + assoc.ip_address = mock_address + mock_assocs.append(assoc) + + r = db_api.port_disassociate_ip(self.context, mock_ports[1:3], + mock_address) + + self.assertEqual(len(r.associations), 2) + self.assertEqual(r.associations[0], mock_assocs[0]) + self.assertEqual(r.associations[1], mock_assocs[3]) + self.context.session.add.assert_called_once_with(r) + self.context.session.delete.assert_has_calls( + [mock.call(mock_assocs[1]), mock.call(mock_assocs[2])]) + + @mock.patch("quark.db.api.port_disassociate_ip") + @mock.patch("quark.db.api.port_associate_ip") + def test_update_port_associations_for_ip(self, associate_mock, + disassociate_mock): + self.context.session.add = mock.Mock() + self.context.session.delete = mock.Mock() + mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[]) + for x in xrange(4)] + mock_address = models.IPAddress(id="1", address=3232235876, + address_readable="192.168.1.100", + subnet_id="1", network_id="2", + version=4, used_by_tenant_id="1") + mock_address.ports = mock_ports + new_port_list = mock_ports[1:3] + new_port_list.append(models.Port(id="4", network_id="2", + ip_addresses=[])) + # NOTE(thomasem): Should be the new address after associating + # any new ports in the list. + mock_new_address = associate_mock.return_value + + db_api.update_port_associations_for_ip(self.context, + new_port_list, + mock_address) + + associate_mock.assert_called_once_with(self.context, + set([new_port_list[2]]), + mock_address) + + disassociate_mock.assert_called_once_with(self.context, + set([mock_ports[0], + mock_ports[3]]), + mock_new_address) diff --git a/quark/tests/test_migrations.py b/quark/tests/test_migrations.py index ff597e0..b76dc70 100644 --- a/quark/tests/test_migrations.py +++ b/quark/tests/test_migrations.py @@ -16,7 +16,6 @@ from sqlalchemy.sql import table from quark.db.custom_types import INET import quark.db.migration -from quark.db import models from quark.tests import test_base @@ -800,7 +799,7 @@ class Test4fc07b41d45c(BaseMigrationTest): 'quark_ip_addresses', self.metadata, sa.Column('id', sa.String(length=36), primary_key=True), sa.Column('_deallocated', sa.Boolean()), - sa.Column('address_type', sa.Enum(*models.IPAddress.address_types)) + sa.Column('address_type', sa.Enum('fixed', 'shared', 'floating')) ) self.metadata.create_all() alembic_command.stamp(self.config, self.previous_revision)