commit
daa3c9dee2
@ -245,6 +245,37 @@ def port_create(context, **port_dict):
|
||||
return port
|
||||
|
||||
|
||||
def port_disassociate_ip(context, ports, address):
|
||||
assocs_to_remove = [assoc for assoc in address.associations
|
||||
if assoc.port in ports]
|
||||
for assoc in assocs_to_remove:
|
||||
context.session.delete(assoc)
|
||||
# NOTE(thomasem): Need to update in-session model for caller.
|
||||
address.associations.remove(assoc)
|
||||
context.session.add(address)
|
||||
return address
|
||||
|
||||
|
||||
def port_associate_ip(context, ports, address, enable_port=None):
|
||||
for port in ports:
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.port_id = port.id
|
||||
assoc.ip_address_id = address.id
|
||||
assoc.enabled = port.id in enable_port if enable_port else False
|
||||
address.associations.append(assoc)
|
||||
context.session.add(address)
|
||||
return address
|
||||
|
||||
|
||||
def update_port_associations_for_ip(context, ports, address):
|
||||
assoc_ports = set(address.ports)
|
||||
new_ports = set(ports)
|
||||
new_address = port_associate_ip(context, new_ports - assoc_ports,
|
||||
address)
|
||||
return port_disassociate_ip(context,
|
||||
assoc_ports - new_ports, new_address)
|
||||
|
||||
|
||||
def port_update(context, port, **kwargs):
|
||||
if "addresses" in kwargs:
|
||||
port["ip_addresses"] = kwargs.pop("addresses")
|
||||
@ -284,23 +315,6 @@ def ip_address_find(context, lock_mode=False, **filters):
|
||||
if lock_mode:
|
||||
query = query.with_lockmode("update")
|
||||
|
||||
ip_shared = filters.pop("shared", None)
|
||||
if ip_shared is not None:
|
||||
cnt = sql_func.count(models.port_ip_association_table.c.port_id)
|
||||
stmt = context.session.query(models.IPAddress,
|
||||
cnt.label("ports_count"))
|
||||
stmt = stmt.outerjoin(models.port_ip_association_table)
|
||||
stmt = stmt.group_by(models.IPAddress.id).subquery()
|
||||
|
||||
query = query.outerjoin(stmt, stmt.c.id == models.IPAddress.id)
|
||||
|
||||
# !@# HACK(amir): replace once attributes are configured in ip address
|
||||
# extension correctly
|
||||
if "True" in ip_shared:
|
||||
query = query.filter(stmt.c.ports_count > 1)
|
||||
else:
|
||||
query = query.filter(stmt.c.ports_count <= 1)
|
||||
|
||||
model_filters = _model_query(context, models.IPAddress, filters)
|
||||
if "do_not_use" in filters:
|
||||
query = query.filter(models.Subnet.do_not_use == filters["do_not_use"])
|
||||
@ -308,6 +322,15 @@ def ip_address_find(context, lock_mode=False, **filters):
|
||||
if filters.get("device_id"):
|
||||
model_filters.append(models.IPAddress.ports.any(
|
||||
models.Port.device_id.in_(filters["device_id"])))
|
||||
|
||||
if filters.get("port_id"):
|
||||
model_filters.append(models.IPAddress.ports.any(
|
||||
models.Port.id == filters['port_id']))
|
||||
|
||||
if filters.get("address_type"):
|
||||
model_filters.append(
|
||||
models.IPAddress.address_type == filters['address_type'])
|
||||
|
||||
return query.filter(*model_filters)
|
||||
|
||||
|
||||
|
3
quark/db/ip_types.py
Normal file
3
quark/db/ip_types.py
Normal file
@ -0,0 +1,3 @@
|
||||
SHARED = 'shared'
|
||||
FIXED = 'fixed'
|
||||
FLOATING = 'floating'
|
@ -25,6 +25,7 @@ from sqlalchemy.ext import hybrid
|
||||
from sqlalchemy import orm
|
||||
|
||||
from quark.db import custom_types
|
||||
from quark.db import ip_types
|
||||
# NOTE(mdietz): This is the only way to actually create the quotas table,
|
||||
# regardless if we need it. This is how it's done upstream.
|
||||
# NOTE(jhammond): If it isn't obvious quota_driver is unused and that's ok.
|
||||
@ -108,6 +109,10 @@ class IsHazTags(object):
|
||||
return orm.relationship("TagAssociation", backref=backref)
|
||||
|
||||
|
||||
class PortIpAssociation(object):
|
||||
pass
|
||||
|
||||
|
||||
port_ip_association_table = sa.Table(
|
||||
"quark_port_ip_address_associations",
|
||||
BASEV2.metadata,
|
||||
@ -123,6 +128,9 @@ port_ip_association_table = sa.Table(
|
||||
**TABLE_KWARGS)
|
||||
|
||||
|
||||
orm.mapper(PortIpAssociation, port_ip_association_table)
|
||||
|
||||
|
||||
class IPAddress(BASEV2, models.HasId):
|
||||
"""More closely emulate the melange version of the IP table.
|
||||
|
||||
@ -133,7 +141,6 @@ class IPAddress(BASEV2, models.HasId):
|
||||
__table_args__ = (sa.UniqueConstraint("subnet_id", "address",
|
||||
name="subnet_id_address"),
|
||||
TABLE_KWARGS)
|
||||
address_types = set(['fixed', 'shared', 'floating'])
|
||||
address_readable = sa.Column(sa.String(128), nullable=False)
|
||||
address = sa.Column(custom_types.INET(), nullable=False, index=True)
|
||||
subnet_id = sa.Column(sa.String(36),
|
||||
@ -149,8 +156,17 @@ class IPAddress(BASEV2, models.HasId):
|
||||
_deallocated = sa.Column(sa.Boolean())
|
||||
# Legacy data
|
||||
used_by_tenant_id = sa.Column(sa.String(255))
|
||||
address_type = sa.Column(sa.Enum(*address_types,
|
||||
name="quark_ip_address_types"))
|
||||
|
||||
address_type = sa.Column(sa.Enum(ip_types.FIXED, ip_types.FLOATING,
|
||||
ip_types.SHARED,
|
||||
name="quark_ip_address_types"))
|
||||
associations = orm.relationship(PortIpAssociation, backref="ip_address",
|
||||
lazy='subquery')
|
||||
|
||||
def enabled_for_port(self, port):
|
||||
for assoc in self["associations"]:
|
||||
if assoc.port_id == port["id"]:
|
||||
return assoc.enabled
|
||||
|
||||
@hybrid.hybrid_property
|
||||
def deallocated(self):
|
||||
@ -311,6 +327,7 @@ class Port(BASEV2, models.HasTenant, models.HasId):
|
||||
device_id = sa.Column(sa.String(255), nullable=False, index=True)
|
||||
device_owner = sa.Column(sa.String(255))
|
||||
bridge = sa.Column(sa.String(255))
|
||||
associations = orm.relationship(PortIpAssociation, backref="port")
|
||||
|
||||
@declarative.declared_attr
|
||||
def ip_addresses(cls):
|
||||
|
@ -30,6 +30,7 @@ from oslo.config import cfg
|
||||
from oslo.db import exception as db_exception
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark.db import ip_types
|
||||
from quark.db import models
|
||||
from quark import exceptions as q_exc
|
||||
from quark import utils
|
||||
@ -297,8 +298,9 @@ class QuarkIpam(object):
|
||||
deallocated_at=None,
|
||||
used_by_tenant_id=context.tenant_id,
|
||||
allocated_at=timeutils.utcnow(),
|
||||
port_id=port_id,
|
||||
address_type=kwargs.get('address_type',
|
||||
'fixed'))
|
||||
ip_types.FIXED))
|
||||
return [updated_address]
|
||||
else:
|
||||
# Make sure we never find it again
|
||||
@ -347,7 +349,8 @@ class QuarkIpam(object):
|
||||
context, address=next_ip, subnet_id=subnet["id"],
|
||||
deallocated=0, version=subnet["ip_version"],
|
||||
network_id=net_id,
|
||||
address_type=kwargs.get('type', 'fixed'))
|
||||
port_id=port_id,
|
||||
address_type=kwargs.get('address_type', ip_types.FIXED))
|
||||
address["deallocated"] = 0
|
||||
except Exception:
|
||||
# NOTE(mdietz): Our version of sqlalchemy incorrectly raises None
|
||||
@ -429,7 +432,9 @@ class QuarkIpam(object):
|
||||
context, address, deallocated=False,
|
||||
deallocated_at=None,
|
||||
used_by_tenant_id=context.tenant_id,
|
||||
allocated_at=timeutils.utcnow())
|
||||
allocated_at=timeutils.utcnow(),
|
||||
address_type=kwargs.get('address_type',
|
||||
ip_types.FIXED))
|
||||
|
||||
# This triggers when the IP is allocated to another tenant,
|
||||
# either because we missed it due to our filters above, or
|
||||
@ -440,7 +445,8 @@ class QuarkIpam(object):
|
||||
context, address=ip_address,
|
||||
subnet_id=subnet["id"],
|
||||
version=subnet["ip_version"], network_id=net_id,
|
||||
address_type=kwargs.get('address_type', 'fixed'))
|
||||
address_type=kwargs.get('address_type',
|
||||
ip_types.FIXED))
|
||||
except db_exception.DBDuplicateEntry:
|
||||
LOG.info("{0} exists but was already "
|
||||
"allocated".format(str(ip_address)))
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
v2 Neutron Plug-in API Quark Implementation
|
||||
"""
|
||||
|
||||
from neutron.extensions import securitygroup as sg_ext
|
||||
from neutron import neutron_plugin_base_v2
|
||||
from neutron.openstack.common import log as logging
|
||||
|
@ -14,19 +14,23 @@
|
||||
# under the License.
|
||||
|
||||
from neutron.common import exceptions
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from oslo.config import cfg
|
||||
import webob
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark.db import ip_types
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark import ipam
|
||||
from quark import plugin_views as v
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
|
||||
|
||||
|
||||
def _get_ipam_driver_for_network(context, net_id):
|
||||
return ipam.IPAM_REGISTRY.get_strategy(db_api.network_find(
|
||||
context, id=net_id, scope=db_api.ONE)['ipam_strategy'])
|
||||
|
||||
|
||||
def get_ip_addresses(context, **filters):
|
||||
@ -45,22 +49,55 @@ def get_ip_address(context, id):
|
||||
return v._make_ip_dict(addr)
|
||||
|
||||
|
||||
def create_ip_address(context, ip_address):
|
||||
LOG.info("create_ip_address for tenant %s" % context.tenant_id)
|
||||
def validate_ports_on_network_and_same_segment(ports, network_id):
|
||||
first_segment = None
|
||||
for port in ports:
|
||||
addresses = port.get("ip_addresses", [])
|
||||
for address in addresses:
|
||||
if address["network_id"] != network_id:
|
||||
raise exceptions.BadRequest(resource="ip_addresses",
|
||||
msg="Must have ports connected to"
|
||||
" the requested network")
|
||||
segment_id = address.subnet.get("segment_id")
|
||||
first_segment = first_segment or segment_id
|
||||
if segment_id != first_segment:
|
||||
raise exceptions.BadRequest(resource="ip_addresses",
|
||||
msg="Segment id's do not match.")
|
||||
|
||||
port = None
|
||||
ip_dict = ip_address["ip_address"]
|
||||
port_ids = ip_dict.get('port_ids')
|
||||
|
||||
def _shared_ip_request(ip_address):
|
||||
port_ids = ip_address.get('ip_address', {}).get('port_ids', [])
|
||||
return len(port_ids) > 1
|
||||
|
||||
|
||||
def _can_be_shared(address_model):
|
||||
# Don't share IP if any of the assocs is enabled
|
||||
return not any(a.enabled for a in address_model.associations)
|
||||
|
||||
|
||||
def create_ip_address(context, body):
|
||||
LOG.info("create_ip_address for tenant %s" % context.tenant_id)
|
||||
address_type = (ip_types.SHARED if _shared_ip_request(body)
|
||||
else ip_types.FIXED)
|
||||
ip_dict = body.get("ip_address")
|
||||
port_ids = ip_dict.get('port_ids', [])
|
||||
network_id = ip_dict.get('network_id')
|
||||
device_ids = ip_dict.get('device_ids')
|
||||
ip_version = ip_dict.get('version')
|
||||
ip_address = ip_dict.get('ip_address')
|
||||
# If no version is passed, you would get what the network provides,
|
||||
# which could be both v4 and v6 addresses. Rather than allow for such
|
||||
# an ambiguous outcome, we'll raise instead
|
||||
if not ip_version:
|
||||
raise exceptions.BadRequest(resource="ip_addresses",
|
||||
msg="version is required.")
|
||||
if not network_id:
|
||||
raise exceptions.BadRequest(resource="ip_addresses",
|
||||
msg="network_id is required.")
|
||||
|
||||
ipam_driver = _get_ipam_driver_for_network(context, network_id)
|
||||
new_addresses = []
|
||||
ports = []
|
||||
if device_ids and not network_id:
|
||||
raise exceptions.BadRequest(
|
||||
resource="ip_addresses",
|
||||
msg="network_id is required if device_ids are supplied.")
|
||||
with context.session.begin():
|
||||
if network_id and device_ids:
|
||||
for device_id in device_ids:
|
||||
@ -70,6 +107,7 @@ def create_ip_address(context, ip_address):
|
||||
ports.append(port)
|
||||
elif port_ids:
|
||||
for port_id in port_ids:
|
||||
|
||||
port = db_api.port_find(context, id=port_id,
|
||||
tenant_id=context.tenant_id,
|
||||
scope=db_api.ONE)
|
||||
@ -79,18 +117,25 @@ def create_ip_address(context, ip_address):
|
||||
raise exceptions.PortNotFound(port_id=port_ids,
|
||||
net_id=network_id)
|
||||
|
||||
address = ipam_driver.allocate_ip_address(
|
||||
context,
|
||||
port['network_id'],
|
||||
port['id'],
|
||||
CONF.QUARK.ipam_reuse_after,
|
||||
ip_version,
|
||||
ip_addresses=[ip_address])
|
||||
validate_ports_on_network_and_same_segment(ports, network_id)
|
||||
|
||||
for port in ports:
|
||||
port["ip_addresses"].append(address)
|
||||
|
||||
return v._make_ip_dict(address)
|
||||
# Shared Ips are only new IPs. Two use cases: if we got device_id
|
||||
# or if we got port_ids. We should check the case where we got port_ids
|
||||
# and device_ids. The device_id must have a port on the network,
|
||||
# and any port_ids must also be on that network already. If we have
|
||||
# more than one port by this step, it's considered a shared IP,
|
||||
# and therefore will be marked as unconfigured (enabled=False)
|
||||
# for all ports.
|
||||
ipam_driver.allocate_ip_address(context, new_addresses, network_id,
|
||||
None, CONF.QUARK.ipam_reuse_after,
|
||||
version=ip_version,
|
||||
ip_addresses=[ip_address]
|
||||
if ip_address else [],
|
||||
address_type=address_type)
|
||||
with context.session.begin():
|
||||
new_address = db_api.port_associate_ip(context, ports,
|
||||
new_addresses[0])
|
||||
return v._make_ip_dict(new_address)
|
||||
|
||||
|
||||
def _get_deallocated_override():
|
||||
@ -98,20 +143,28 @@ def _get_deallocated_override():
|
||||
return '2000-01-01 00:00:00'
|
||||
|
||||
|
||||
def _raise_if_shared_and_enabled(address_request, address_model):
|
||||
if (_shared_ip_request(address_request)
|
||||
and not _can_be_shared(address_model)):
|
||||
raise exceptions.BadRequest(
|
||||
resource="ip_addresses",
|
||||
msg="This IP address is in use on another port and cannot be "
|
||||
"shared")
|
||||
|
||||
|
||||
def update_ip_address(context, id, ip_address):
|
||||
LOG.info("update_ip_address %s for tenant %s" %
|
||||
(id, context.tenant_id))
|
||||
|
||||
ports = []
|
||||
with context.session.begin():
|
||||
address = db_api.ip_address_find(
|
||||
context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
|
||||
|
||||
if not address:
|
||||
raise exceptions.NotFound(
|
||||
message="No IP address found with id=%s" % id)
|
||||
|
||||
reset = ip_address['ip_address'].get('reset_allocation_time',
|
||||
False)
|
||||
ipam_driver = _get_ipam_driver_for_network(context, address.network_id)
|
||||
reset = ip_address['ip_address'].get('reset_allocation_time', False)
|
||||
if reset and address['deallocated'] == 1:
|
||||
if context.is_admin:
|
||||
LOG.info("IP's deallocated time being manually reset")
|
||||
@ -120,27 +173,30 @@ def update_ip_address(context, id, ip_address):
|
||||
msg = "Modification of reset_allocation_time requires admin"
|
||||
raise webob.exc.HTTPForbidden(detail=msg)
|
||||
|
||||
old_ports = address['ports']
|
||||
port_ids = ip_address['ip_address'].get('port_ids')
|
||||
if port_ids is None:
|
||||
return v._make_ip_dict(address)
|
||||
|
||||
for port in old_ports:
|
||||
port['ip_addresses'].remove(address)
|
||||
|
||||
if port_ids:
|
||||
ports = db_api.port_find(
|
||||
context, tenant_id=context.tenant_id, id=port_ids,
|
||||
scope=db_api.ALL)
|
||||
|
||||
# NOTE: could be considered inefficient because we're converting
|
||||
# to a list to check length. Maybe revisit
|
||||
_raise_if_shared_and_enabled(ip_address, address)
|
||||
ports = db_api.port_find(context, tenant_id=context.tenant_id,
|
||||
id=port_ids, scope=db_api.ALL)
|
||||
# NOTE(name): could be considered inefficient because we're
|
||||
# converting to a list to check length. Maybe revisit
|
||||
if len(ports) != len(port_ids):
|
||||
raise exceptions.NotFound(
|
||||
message="No ports not found with ids=%s" % port_ids)
|
||||
for port in ports:
|
||||
port['ip_addresses'].extend([address])
|
||||
else:
|
||||
address["deallocated"] = 1
|
||||
|
||||
return v._make_ip_dict(address)
|
||||
validate_ports_on_network_and_same_segment(ports,
|
||||
address["network_id"])
|
||||
|
||||
LOG.info("Updating IP address, %s, to only be used by the"
|
||||
"following ports: %s" % (address.address_readable,
|
||||
[p.id for p in ports]))
|
||||
new_address = db_api.update_port_associations_for_ip(context,
|
||||
ports,
|
||||
address)
|
||||
else:
|
||||
if port_ids is not None:
|
||||
ipam_driver.deallocate_ip_address(
|
||||
context, address)
|
||||
return v._make_ip_dict(address)
|
||||
return v._make_ip_dict(new_address)
|
||||
|
@ -202,9 +202,11 @@ def _port_dict(port, fields=None):
|
||||
return res
|
||||
|
||||
|
||||
def _make_port_address_dict(ip, fields=None):
|
||||
def _make_port_address_dict(ip, port, fields=None):
|
||||
enabled = ip.enabled_for_port(port)
|
||||
ip_addr = {"subnet_id": ip.get("subnet_id"),
|
||||
"ip_address": ip.formatted()}
|
||||
"ip_address": ip.formatted(),
|
||||
"enabled": enabled}
|
||||
if fields and "port_subnets" in fields:
|
||||
ip_addr["subnet"] = _make_subnet_dict(ip["subnet"])
|
||||
|
||||
@ -213,7 +215,7 @@ def _make_port_address_dict(ip, fields=None):
|
||||
|
||||
def _make_port_dict(port, fields=None):
|
||||
res = _port_dict(port)
|
||||
res["fixed_ips"] = [_make_port_address_dict(ip, fields)
|
||||
res["fixed_ips"] = [_make_port_address_dict(ip, port, fields)
|
||||
for ip in port.ip_addresses]
|
||||
return res
|
||||
|
||||
@ -222,7 +224,7 @@ def _make_ports_list(query, fields=None):
|
||||
ports = []
|
||||
for port in query:
|
||||
port_dict = _port_dict(port, fields)
|
||||
port_dict["fixed_ips"] = [_make_port_address_dict(addr, fields)
|
||||
port_dict["fixed_ips"] = [_make_port_address_dict(addr, port, fields)
|
||||
for addr in port.ip_addresses]
|
||||
ports.append(port_dict)
|
||||
return ports
|
||||
@ -253,13 +255,12 @@ def _make_ip_dict(address):
|
||||
return {"id": address["id"],
|
||||
"network_id": net_id,
|
||||
"address": address.formatted(),
|
||||
"port_ids": [port["id"] for port in address["ports"]],
|
||||
"device_ids": [port["device_id"] or ""
|
||||
for port in address["ports"]],
|
||||
"port_ids": [assoc.port_id
|
||||
for assoc in address["associations"]],
|
||||
"subnet_id": address["subnet_id"],
|
||||
"used_by_tenant_id": address["used_by_tenant_id"],
|
||||
"version": address["version"],
|
||||
"shared": len(address["ports"]) > 1}
|
||||
"address_type": address['address_type']}
|
||||
|
||||
|
||||
def _make_ip_policy_dict(ipp):
|
||||
|
@ -18,13 +18,41 @@ import contextlib
|
||||
import mock
|
||||
from mock import patch
|
||||
from neutron.common import exceptions
|
||||
from oslo.config import cfg
|
||||
import webob
|
||||
|
||||
from quark.db import models
|
||||
from quark import exceptions as quark_exceptions
|
||||
from quark.plugin_modules import ip_addresses
|
||||
from quark.tests import test_quark_plugin
|
||||
|
||||
|
||||
def _port_associate_stub(context, ports, address, **kwargs):
|
||||
for port in ports:
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.port_id = port.id
|
||||
assoc.ip_address_id = address.id
|
||||
assoc.port = port
|
||||
# NOTE(thomasem): This causes address['associations'] to gain this
|
||||
# PortIpAssocation instance.
|
||||
assoc.ip_address = address
|
||||
assoc.enabled = address.address_type == "fixed"
|
||||
return address
|
||||
|
||||
|
||||
def _port_disassociate_stub(context, ports, address):
|
||||
port_ids = [port.id for port in ports]
|
||||
for idx, assoc in enumerate(address['associations']):
|
||||
if assoc.port_id in port_ids:
|
||||
address.associations.pop(idx)
|
||||
return address
|
||||
|
||||
|
||||
def _ip_deallocate_stub(context, address):
|
||||
address['deallocated'] = 1
|
||||
address['address_type'] = None
|
||||
|
||||
|
||||
class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, port, addr):
|
||||
@ -36,12 +64,26 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
if addr:
|
||||
addr_model = models.IPAddress()
|
||||
addr_model.update(addr)
|
||||
|
||||
def _alloc_ip(context, new_addr, net_id, port_m, *args, **kwargs):
|
||||
new_addr.extend([addr_model])
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.port_find"),
|
||||
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address")
|
||||
) as (port_find, alloc_ip):
|
||||
mock.patch(
|
||||
"quark.plugin_modules.ip_addresses"
|
||||
"._get_ipam_driver_for_network"),
|
||||
mock.patch(
|
||||
"quark.plugin_modules.ip_addresses.db_api"
|
||||
".port_associate_ip"),
|
||||
mock.patch(
|
||||
"quark.plugin_modules.ip_addresses"
|
||||
".validate_ports_on_network_and_same_segment")
|
||||
) as (port_find, mock_ipam, mock_port_associate_ip, validate):
|
||||
port_find.return_value = port_model
|
||||
alloc_ip.return_value = addr_model
|
||||
mock_ipam_driver = mock_ipam.return_value
|
||||
mock_ipam_driver.allocate_ip_address.side_effect = _alloc_ip
|
||||
mock_port_associate_ip.side_effect = _port_associate_stub
|
||||
yield
|
||||
|
||||
def test_create_ip_address_by_network_and_device(self):
|
||||
@ -49,17 +91,15 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
|
||||
with self._stubs(port=port, addr=ip):
|
||||
ip_address = dict(network_id=ip["network_id"],
|
||||
device_ids=[4])
|
||||
ip_address = {"network_id": ip["network_id"],
|
||||
"version": 4, 'device_ids': [2]}
|
||||
response = self.plugin.create_ip_address(
|
||||
self.context, dict(ip_address=ip_address))
|
||||
|
||||
self.assertIsNotNone(response["id"])
|
||||
self.assertEqual(response["network_id"], ip_address["network_id"])
|
||||
self.assertEqual(response["device_ids"], [""])
|
||||
self.assertEqual(response["port_ids"], [port["id"]])
|
||||
self.assertEqual(response["subnet_id"], ip["subnet_id"])
|
||||
self.assertFalse(response["shared"])
|
||||
self.assertEqual(response['address_type'], None)
|
||||
self.assertEqual(response["version"], 4)
|
||||
self.assertEqual(response["address"], "192.168.1.100")
|
||||
self.assertEqual(response["used_by_tenant_id"], 1)
|
||||
@ -70,6 +110,8 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
subnet_id=1, network_id=2, version=4)
|
||||
with self._stubs(port=port, addr=ip):
|
||||
ip_address = dict(port_ids=[port["id"]])
|
||||
ip_address['version'] = 4
|
||||
ip_address['network_id'] = 2
|
||||
response = self.plugin.create_ip_address(
|
||||
self.context, dict(ip_address=ip_address))
|
||||
|
||||
@ -80,7 +122,7 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
|
||||
def test_create_ip_address_by_device_no_network_fails(self):
|
||||
with self._stubs(port={}, addr=None):
|
||||
ip_address = dict(device_ids=[4])
|
||||
ip_address = dict(device_ids=[4], version=4)
|
||||
with self.assertRaises(exceptions.BadRequest):
|
||||
self.plugin.create_ip_address(self.context,
|
||||
dict(ip_address=ip_address))
|
||||
@ -89,21 +131,265 @@ class TestIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
with self._stubs(port=None, addr=None):
|
||||
with self.assertRaises(exceptions.PortNotFound):
|
||||
ip_address = {'ip_address': {'network_id': 'fake',
|
||||
'device_id': 'fake'}}
|
||||
'device_id': 'fake',
|
||||
'version': 4}}
|
||||
self.plugin.create_ip_address(self.context, ip_address)
|
||||
|
||||
def test_create_ip_address_invalid_port(self):
|
||||
with self._stubs(port=None, addr=None):
|
||||
with self.assertRaises(exceptions.PortNotFound):
|
||||
ip_address = {'ip_address': {'port_id': 'fake'}}
|
||||
ip_address = {
|
||||
'ip_address': {
|
||||
'port_id': 'fake',
|
||||
'version': 4,
|
||||
'network_id': 'fake'
|
||||
}
|
||||
}
|
||||
self.plugin.create_ip_address(self.context, ip_address)
|
||||
|
||||
|
||||
@mock.patch("quark.plugin_modules.ip_addresses.v")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses"
|
||||
".validate_ports_on_network_and_same_segment")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._get_ipam_driver_for_network")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses.db_api")
|
||||
class TestQuarkSharedIPAddressCreate(test_quark_plugin.TestQuarkPlugin):
|
||||
def _alloc_stub(self, ip_model):
|
||||
def _alloc_ip(context, addr, *args, **kwargs):
|
||||
addr.append(ip_model)
|
||||
return _alloc_ip
|
||||
|
||||
def test_create_ip_address_calls_port_associate_ip(self, mock_dbapi,
|
||||
mock_ipam, *args):
|
||||
port = dict(id=1, network_id=2, ip_addresses=[])
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
ip_model = models.IPAddress()
|
||||
ip_model.update(ip)
|
||||
|
||||
mock_dbapi.port_find.return_value = port_model
|
||||
mock_ipam_driver = mock_ipam.return_value
|
||||
mock_ipam_driver.allocate_ip_address.side_effect = (
|
||||
self._alloc_stub(ip_model))
|
||||
ip_address = {"network_id": ip["network_id"],
|
||||
"version": 4, 'device_ids': [2],
|
||||
"port_ids": [1]}
|
||||
|
||||
self.plugin.create_ip_address(self.context,
|
||||
dict(ip_address=ip_address))
|
||||
mock_dbapi.port_associate_ip.assert_called_once_with(
|
||||
self.context, [port_model], ip_model)
|
||||
|
||||
def test_create_ip_address_address_type_shared(self, mock_dbapi, mock_ipam,
|
||||
*args):
|
||||
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
|
||||
ports = [dict(id=1, network_id=2, ip_addresses=[]),
|
||||
dict(id=2, network_id=2, ip_addresses=[])]
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
|
||||
port_models = [models.Port(**p) for p in ports]
|
||||
ip_model = models.IPAddress()
|
||||
ip_model.update(ip)
|
||||
mock_dbapi.port_find.side_effect = port_models
|
||||
mock_ipam_driver = mock_ipam.return_value
|
||||
mock_ipam_driver.allocate_ip_address.side_effect = (
|
||||
self._alloc_stub(ip_model))
|
||||
|
||||
ip_address = {"network_id": ip["network_id"],
|
||||
"version": 4, 'device_ids': [2],
|
||||
"port_ids": [pm.id for pm in port_models]}
|
||||
self.plugin.create_ip_address(self.context,
|
||||
dict(ip_address=ip_address))
|
||||
# NOTE(thomasem): Having to assert that [ip_model] was passed instead
|
||||
# of an empty list due to the expected behavior of this method being
|
||||
# that it mutates the passed in list. So, after it's run, the list
|
||||
# has already been mutated and it's a reference to that list that
|
||||
# we're checking. This method ought to be changed to return the new
|
||||
# IP and let the caller mutate the list, not the other way around.
|
||||
mock_ipam_driver.allocate_ip_address.assert_called_once_with(
|
||||
self.context, [ip_model], ip['network_id'], None, 100,
|
||||
version=ip_address['version'], ip_addresses=[],
|
||||
address_type="shared")
|
||||
|
||||
def test_create_ip_address_address_type_fixed(self, mock_dbapi, mock_ipam,
|
||||
*args):
|
||||
cfg.CONF.set_override('ipam_reuse_after', 100, "QUARK")
|
||||
ports = [dict(id=1, network_id=2, ip_addresses=[])]
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4, used_by_tenant_id=1)
|
||||
port_models = [models.Port(**p) for p in ports]
|
||||
ip_model = models.IPAddress()
|
||||
ip_model.update(ip)
|
||||
mock_dbapi.port_find.side_effect = port_models
|
||||
mock_ipam_driver = mock_ipam.return_value
|
||||
mock_ipam_driver.allocate_ip_address.side_effect = (
|
||||
self._alloc_stub(ip_model))
|
||||
|
||||
ip_address = {"network_id": ip["network_id"],
|
||||
"version": 4, 'device_ids': [2],
|
||||
"port_ids": [pm.id for pm in port_models]}
|
||||
self.plugin.create_ip_address(self.context,
|
||||
dict(ip_address=ip_address))
|
||||
# NOTE(thomasem): Having to assert that [ip_model] was passed instead
|
||||
# of an empty list due to the expected behavior of this method being
|
||||
# that it mutates the passed in list. So, after it's run, the list
|
||||
# has already been mutated and it's a reference to that list that
|
||||
# we're checking. This method ought to be changed to return the new
|
||||
# IP and let the caller mutate the list, not the other way around.
|
||||
mock_ipam_driver.allocate_ip_address.assert_called_once_with(
|
||||
self.context, [ip_model], ip['network_id'], None, 100,
|
||||
version=ip_address['version'], ip_addresses=[],
|
||||
address_type="fixed")
|
||||
|
||||
|
||||
class TestQuarkSharedIPAddressPortsValid(test_quark_plugin.TestQuarkPlugin):
|
||||
def test_validate_ports_on_network_raise_segment(self):
|
||||
mock_ports = [models.Port(id="1", network_id="2"),
|
||||
models.Port(id="2", network_id="2")]
|
||||
mock_subnets = [models.Subnet(id="1", segment_id="2"),
|
||||
models.Subnet(id="2", segment_id="3")]
|
||||
for i, subnet in enumerate(mock_subnets):
|
||||
mock_address = models.IPAddress(id="2", network_id="2")
|
||||
mock_address.subnet = subnet
|
||||
mock_ports[i].ip_addresses.append(mock_address)
|
||||
|
||||
with self.assertRaises(exceptions.BadRequest):
|
||||
ip_addresses.validate_ports_on_network_and_same_segment(
|
||||
mock_ports, "2")
|
||||
|
||||
def test_validate_ports_on_network_raise_segment_multiple_ips(self):
|
||||
mock_ports = [models.Port(id="1", network_id="2"),
|
||||
models.Port(id="2", network_id="2")]
|
||||
mock_subnets = [models.Subnet(id="1", segment_id="2"),
|
||||
models.Subnet(id="2", segment_id="3")]
|
||||
for i, subnet in enumerate(mock_subnets):
|
||||
mock_address = models.IPAddress(id="2", network_id="2")
|
||||
mock_address.subnet = subnet
|
||||
for x in xrange(i + 1):
|
||||
mock_ports[x].ip_addresses.append(mock_address)
|
||||
|
||||
with self.assertRaises(exceptions.BadRequest):
|
||||
ip_addresses.validate_ports_on_network_and_same_segment(
|
||||
mock_ports, "2")
|
||||
|
||||
def test_validate_ports_on_network_raise_network(self):
|
||||
mock_ports = [models.Port(id="1", network_id="2"),
|
||||
models.Port(id="2", network_id="3")]
|
||||
mock_addresses = [models.IPAddress(id="1", network_id="2"),
|
||||
models.IPAddress(id="2", network_id="3")]
|
||||
|
||||
for i, ip_address in enumerate(mock_addresses):
|
||||
ip_address.subnet = models.Subnet(id="1", segment_id="2")
|
||||
mock_ports[i].ip_addresses.append(ip_address)
|
||||
|
||||
with self.assertRaises(exceptions.BadRequest):
|
||||
ip_addresses.validate_ports_on_network_and_same_segment(
|
||||
mock_ports, "2")
|
||||
|
||||
def test_validate_ports_on_network_valid(self):
|
||||
mock_ports = [models.Port(id="1", network_id="2"),
|
||||
models.Port(id="2", network_id="2")]
|
||||
for p in mock_ports:
|
||||
p.ip_addresses.append(models.IPAddress(id="1", network_id="2"))
|
||||
p.ip_addresses[-1].subnet = models.Subnet(id="1", segment_id="1")
|
||||
|
||||
r = ip_addresses.validate_ports_on_network_and_same_segment(
|
||||
mock_ports, "2")
|
||||
self.assertEqual(r, None)
|
||||
|
||||
|
||||
class TestQuarkSharedIPAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
def test_shared_ip_request(self):
|
||||
ip_address_mock = {"ip_address": {"port_ids": [1, 2, 3]}}
|
||||
r = ip_addresses._shared_ip_request(ip_address_mock)
|
||||
self.assertTrue(r)
|
||||
|
||||
def test_shared_ip_request_false(self):
|
||||
ip_address_mock = {"ip_address": {"port_ids": [1]}}
|
||||
r = ip_addresses._shared_ip_request(ip_address_mock)
|
||||
self.assertFalse(r)
|
||||
|
||||
def test_can_be_shared(self):
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
mock_assocs = []
|
||||
for x in xrange(3):
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.ip_address_id = mock_address.id
|
||||
assoc.ip_address = mock_address
|
||||
assoc.enabled = [False, False, False][x]
|
||||
mock_assocs.append(assoc)
|
||||
r = ip_addresses._can_be_shared(mock_address)
|
||||
self.assertTrue(r)
|
||||
|
||||
def test_can_be_shared_false(self):
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
mock_assocs = []
|
||||
for x in xrange(3):
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.ip_address_id = mock_address.id
|
||||
assoc.ip_address = mock_address
|
||||
assoc.enabled = [False, True, False][x]
|
||||
mock_assocs.append(assoc)
|
||||
r = ip_addresses._can_be_shared(mock_address)
|
||||
self.assertFalse(r)
|
||||
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._can_be_shared")
|
||||
def test_raise_if_shared_and_enabled(self, can_be_shared_mock,
|
||||
shared_ip_request_mock):
|
||||
can_be_shared_mock.return_value = False
|
||||
shared_ip_request_mock.return_value = True
|
||||
obj = mock.MagicMock()
|
||||
with self.assertRaises(exceptions.BadRequest):
|
||||
ip_addresses._raise_if_shared_and_enabled(obj, obj)
|
||||
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._can_be_shared")
|
||||
def test_raise_if_shared_and_enabled_noraise(self, can_be_shared_mock,
|
||||
shared_ip_request_mock):
|
||||
can_be_shared_mock.return_value = True
|
||||
shared_ip_request_mock.return_value = True
|
||||
obj = mock.MagicMock()
|
||||
r = ip_addresses._raise_if_shared_and_enabled(obj, obj)
|
||||
self.assertEqual(r, None)
|
||||
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._can_be_shared")
|
||||
def test_raise_if_shared_and_enabled_fixed_request(self,
|
||||
can_be_shared_mock,
|
||||
shared_ip_request_mock):
|
||||
can_be_shared_mock.return_value = True
|
||||
shared_ip_request_mock.return_value = False
|
||||
obj = mock.MagicMock()
|
||||
r = ip_addresses._raise_if_shared_and_enabled(obj, obj)
|
||||
self.assertEqual(r, None)
|
||||
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._shared_ip_request")
|
||||
@mock.patch("quark.plugin_modules.ip_addresses._can_be_shared")
|
||||
def test_raise_if_shared_and_enabled_fixed_request_and_not_shareable(
|
||||
self, can_be_shared_mock, shared_ip_request_mock):
|
||||
can_be_shared_mock.return_value = False
|
||||
shared_ip_request_mock.return_value = False
|
||||
obj = mock.MagicMock()
|
||||
r = ip_addresses._raise_if_shared_and_enabled(obj, obj)
|
||||
self.assertEqual(r, None)
|
||||
|
||||
|
||||
class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, ports, addr, addr_ports=False):
|
||||
port_models = []
|
||||
addr_model = None
|
||||
|
||||
for port in ports:
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
@ -118,9 +404,21 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.port_find" % db_mod),
|
||||
mock.patch("%s.ip_address_find" % db_mod),
|
||||
) as (port_find, ip_find):
|
||||
mock.patch("%s.port_associate_ip" % db_mod),
|
||||
mock.patch("%s.port_disassociate_ip" % db_mod),
|
||||
mock.patch("quark.plugin_modules.ip_addresses"
|
||||
".validate_ports_on_network_and_same_segment"),
|
||||
mock.patch("quark.plugin_modules.ip_addresses"
|
||||
"._get_ipam_driver_for_network")
|
||||
) as (port_find, ip_find, port_associate_ip,
|
||||
port_disassociate_ip, val, mock_ipam):
|
||||
port_find.return_value = port_models
|
||||
ip_find.return_value = addr_model
|
||||
port_associate_ip.side_effect = _port_associate_stub
|
||||
port_disassociate_ip.side_effect = _port_disassociate_stub
|
||||
mock_ipam_driver = mock_ipam.return_value
|
||||
mock_ipam_driver.deallocate_ip_address.side_effect = (
|
||||
_ip_deallocate_stub)
|
||||
yield
|
||||
|
||||
def test_update_ip_address_does_not_exist(self):
|
||||
@ -145,7 +443,8 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4)
|
||||
with self._stubs(ports=[port], addr=ip):
|
||||
ip_address = {'ip_address': {'port_ids': [port['id']]}}
|
||||
ip_address = {'ip_address': {'port_ids': [port['id']],
|
||||
'network_id': 2}}
|
||||
response = self.plugin.update_ip_address(self.context,
|
||||
ip['id'],
|
||||
ip_address)
|
||||
@ -222,50 +521,33 @@ class TestQuarkUpdateIPAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
self.assertEqual(response['port_ids'], [])
|
||||
|
||||
|
||||
class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
class TestQuarkGetIpAddress(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, ips, ports):
|
||||
with mock.patch("quark.db.api.ip_address_find") as ip_find:
|
||||
ip_models = []
|
||||
port_models = []
|
||||
for port in ports:
|
||||
p = models.Port()
|
||||
p.update(port)
|
||||
port_models.append(p)
|
||||
if isinstance(ips, list):
|
||||
for ip in ips:
|
||||
version = ip.pop("version")
|
||||
ip_mod = models.IPAddress()
|
||||
ip_mod.update(ip)
|
||||
ip_mod.version = version
|
||||
ip_mod.ports = port_models
|
||||
ip_models.append(ip_mod)
|
||||
ip_find.return_value = ip_models
|
||||
else:
|
||||
if ips:
|
||||
version = ips.pop("version")
|
||||
ip_mod = models.IPAddress()
|
||||
ip_mod.update(ips)
|
||||
ip_mod.version = version
|
||||
ip_mod.ports = port_models
|
||||
# Set up Port to IP associations
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.port = p
|
||||
assoc.port_id = p.id
|
||||
assoc.ip_address = ip_mod
|
||||
assoc.ip_address_id = ip_mod.id
|
||||
ip_mod.associations.append(assoc)
|
||||
ip_find.return_value = ip_mod
|
||||
else:
|
||||
ip_find.return_value = ips
|
||||
yield
|
||||
|
||||
def test_get_ip_addresses(self):
|
||||
port = dict(id=100, device_id="foobar")
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4)
|
||||
with self._stubs(ips=[ip], ports=[port]):
|
||||
res = self.plugin.get_ip_addresses(self.context)
|
||||
addr_res = res[0]
|
||||
self.assertEqual(ip["id"], addr_res["id"])
|
||||
self.assertEqual(ip["subnet_id"], addr_res["subnet_id"])
|
||||
self.assertEqual(ip["address_readable"], addr_res["address"])
|
||||
self.assertEqual(addr_res["port_ids"][0], port["id"])
|
||||
self.assertEqual(addr_res["device_ids"][0], port["device_id"])
|
||||
|
||||
def test_get_ip_address(self):
|
||||
port = dict(id=100)
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
@ -282,3 +564,58 @@ class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
with self._stubs(ips=None, ports=[port]):
|
||||
with self.assertRaises(quark_exceptions.IpAddressNotFound):
|
||||
self.plugin.get_ip_address(self.context, 1)
|
||||
|
||||
|
||||
class TestQuarkGetIpAddresses(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, ips, ports):
|
||||
with mock.patch("quark.db.api.ip_address_find") as ip_find:
|
||||
ip_models = []
|
||||
port_models = []
|
||||
for port in ports:
|
||||
p = models.Port()
|
||||
p.update(port)
|
||||
port_models.append(p)
|
||||
for ip in ips:
|
||||
version = ip.pop("version")
|
||||
ip_mod = models.IPAddress()
|
||||
ip_mod.update(ip)
|
||||
ip_mod.version = version
|
||||
ip_mod.ports = port_models
|
||||
# Set up Port to IP associations
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.port = p
|
||||
assoc.port_id = p.id
|
||||
assoc.ip_address = ip_mod
|
||||
assoc.ip_address_id = ip_mod.id
|
||||
ip_mod.associations.append(assoc)
|
||||
ip_models.append(ip_mod)
|
||||
ip_find.return_value = ip_models
|
||||
yield
|
||||
|
||||
def test_get_ip_addresses(self):
|
||||
port = dict(id=100, device_id="foobar")
|
||||
ip = dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4)
|
||||
with self._stubs(ips=[ip], ports=[port]):
|
||||
res = self.plugin.get_ip_addresses(self.context)
|
||||
addr_res = res[0]
|
||||
self.assertEqual(ip["id"], addr_res["id"])
|
||||
self.assertEqual(ip["subnet_id"], addr_res["subnet_id"])
|
||||
self.assertEqual(ip["address_readable"], addr_res["address"])
|
||||
self.assertEqual(addr_res["port_ids"][0], port["id"])
|
||||
|
||||
def test_get_ip_addresses_multiple(self):
|
||||
port = dict(id=100, device_id="foobar")
|
||||
ips = [dict(id=1, address=3232235876, address_readable="192.168.1.100",
|
||||
subnet_id=1, network_id=2, version=4),
|
||||
dict(id=2, address=3232235878, address_readable="192.168.1.101",
|
||||
subnet_id=1, network_id=2, version=4)]
|
||||
with self._stubs(ips=ips, ports=[port]):
|
||||
res = self.plugin.get_ip_addresses(self.context)
|
||||
self.assertEqual(len(res), 2)
|
||||
for i, addr in enumerate(sorted(res, key=lambda x: x['id'])):
|
||||
self.assertEqual(ips[i]["id"], addr["id"])
|
||||
self.assertEqual(ips[i]["subnet_id"], addr["subnet_id"])
|
||||
self.assertEqual(ips[i]["address_readable"], addr["address"])
|
||||
self.assertEqual(addr["port_ids"][0], port["id"])
|
||||
|
@ -217,7 +217,10 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
|
||||
) as (network_find, port_count, port_create, port_find, allocate_ip,
|
||||
allocate_mac):
|
||||
def allocate_ip_effect(context, addresses, *args, **kwargs):
|
||||
addresses.extend(ip_models)
|
||||
|
||||
for ip_model in ip_models:
|
||||
ip_model.enabled_for_port = lambda x: True
|
||||
addresses.append(ip_models)
|
||||
network_find.return_value = network
|
||||
port_count.return_value = 0
|
||||
port_create.return_value = port_model
|
||||
@ -238,9 +241,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
|
||||
"subnet_id": "subnet2",
|
||||
"version": 4}
|
||||
fixed_ips = [{"ip_address": ip1['address_readable'],
|
||||
"subnet_id": ip1['subnet_id']},
|
||||
"subnet_id": ip1['subnet_id'],
|
||||
"enabled": True},
|
||||
{"ip_address": ip2['address_readable'],
|
||||
"subnet_id": ip2['subnet_id']}]
|
||||
"subnet_id": ip2['subnet_id'],
|
||||
"enabled": True}]
|
||||
port = {"port":
|
||||
{'fixed_ips': fixed_ips,
|
||||
'id': "11111111-2222-3333-4444-555555555555",
|
||||
@ -277,9 +282,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
|
||||
"subnet_id": "subnet2",
|
||||
"version": 6}
|
||||
fixed_ips = [{"ip_address": ip1['address_readable'],
|
||||
"subnet_id": ip1['subnet_id']},
|
||||
"subnet_id": ip1['subnet_id'],
|
||||
"enabled": True},
|
||||
{"ip_address": ip2['address_readable'],
|
||||
"subnet_id": ip2['subnet_id']}]
|
||||
"subnet_id": ip2['subnet_id'],
|
||||
"enabled": True}]
|
||||
port = {"port":
|
||||
{'fixed_ips': fixed_ips,
|
||||
'id': "11111111-2222-3333-4444-555555555555",
|
||||
@ -316,9 +323,11 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
|
||||
"subnet_id": "subnet2",
|
||||
"version": 6}
|
||||
fixed_ips = [{"ip_address": ip1['address_readable'],
|
||||
"subnet_id": ip1['subnet_id']},
|
||||
"subnet_id": ip1['subnet_id'],
|
||||
"enabled": True},
|
||||
{"ip_address": ip2['address_readable'],
|
||||
"subnet_id": ip2['subnet_id']}]
|
||||
"subnet_id": ip2['subnet_id'],
|
||||
"enabled": True}]
|
||||
port = {"port":
|
||||
{'fixed_ips': fixed_ips,
|
||||
'id': "11111111-2222-3333-4444-555555555555",
|
||||
@ -395,9 +404,18 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
||||
if network:
|
||||
network["network_plugin"] = "BASE"
|
||||
network["ipam_strategy"] = "ANY"
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
def _create_db_port(context, **kwargs):
|
||||
port_model = models.Port()
|
||||
port_model.update(kwargs)
|
||||
return port_model
|
||||
|
||||
def _alloc_ip(context, new_ips, *args, **kwargs):
|
||||
ip_mod = models.IPAddress()
|
||||
ip_mod.update(addr)
|
||||
ip_mod.enabled_for_port = lambda x: True
|
||||
new_ips.extend([ip_mod])
|
||||
return mock.DEFAULT
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.port_create"),
|
||||
@ -408,9 +426,9 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
||||
mock.patch("neutron.quota.QuotaEngine.limit_check")
|
||||
) as (port_create, net_find, alloc_ip, alloc_mac, port_count,
|
||||
limit_check):
|
||||
port_create.return_value = port_models
|
||||
port_create.side_effect = _create_db_port
|
||||
net_find.return_value = network
|
||||
alloc_ip.return_value = addr
|
||||
alloc_ip.side_effect = _alloc_ip
|
||||
alloc_mac.return_value = mac
|
||||
port_count.return_value = 0
|
||||
if limit_checks:
|
||||
@ -425,6 +443,7 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name))
|
||||
|
||||
expected = {'status': "ACTIVE",
|
||||
'name': port_name,
|
||||
'device_owner': None,
|
||||
@ -493,10 +512,13 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
||||
ip = mock.MagicMock()
|
||||
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||
ip.formatted = lambda: "192.168.10.45"
|
||||
fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")]
|
||||
ip.enabled_for_port = lambda x: True
|
||||
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||
ip_address="192.168.10.45")]
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
fixed_ips=fixed_ips, ip_addresses=[ip]))
|
||||
|
||||
expected = {'status': "ACTIVE",
|
||||
'device_owner': None,
|
||||
'mac_address': mac["address"],
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
import netaddr
|
||||
|
||||
from quark.db import api as db_api
|
||||
from quark.db import models
|
||||
from quark.tests.functional.base import BaseFunctionalTest
|
||||
|
||||
|
||||
@ -43,6 +44,35 @@ class TestDBAPI(BaseFunctionalTest):
|
||||
db_api.ip_address_find(self.context, device_id="foo")
|
||||
self.assertEqual(filter_mock.filter.call_count, 1)
|
||||
|
||||
def test_ip_address_find_address_type(self):
|
||||
self.context.session.query = mock.MagicMock()
|
||||
second_query_mock = self.context.session.query.return_value
|
||||
filter_mock = second_query_mock.join.return_value
|
||||
|
||||
db_api.ip_address_find(self.context, address_type="foo")
|
||||
# NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
|
||||
# when using SQLAlchemy models in expressions.
|
||||
expected_filter = models.IPAddress.address_type == "foo"
|
||||
self.assertEqual(len(filter_mock.filter.call_args[0]), 1)
|
||||
# NOTE(thomasem): Unfortunately BinaryExpression.compare isn't
|
||||
# showing to be a reliable comparison, so using the string
|
||||
# representation which dumps the associated SQL for the filter.
|
||||
self.assertEqual(str(expected_filter), str(
|
||||
filter_mock.filter.call_args[0][0]))
|
||||
|
||||
def test_ip_address_find_port_id(self):
|
||||
self.context.session.query = mock.MagicMock()
|
||||
second_query_mock = self.context.session.query.return_value
|
||||
final_query_mock = second_query_mock.join.return_value
|
||||
|
||||
db_api.ip_address_find(self.context, port_id="foo")
|
||||
# NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
|
||||
# when using SQLAlchemy models in expressions.
|
||||
expected_filter = models.IPAddress.ports.any(models.Port.id == "foo")
|
||||
self.assertEqual(len(final_query_mock.filter.call_args[0]), 1)
|
||||
self.assertEqual(str(expected_filter), str(
|
||||
final_query_mock.filter.call_args[0][0]))
|
||||
|
||||
def test_ip_address_find_ip_address_object(self):
|
||||
ip_address = netaddr.IPAddress("192.168.10.1")
|
||||
try:
|
||||
@ -58,3 +88,96 @@ class TestDBAPI(BaseFunctionalTest):
|
||||
scope=db_api.ONE)
|
||||
except Exception as e:
|
||||
self.fail("Expected no exceptions: %s" % e)
|
||||
|
||||
def test_port_associate_ip(self):
|
||||
self.context.session.add = mock.Mock()
|
||||
mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[])
|
||||
for x in xrange(4)]
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
r = db_api.port_associate_ip(self.context, mock_ports, mock_address)
|
||||
self.assertEqual(len(r.associations), len(mock_ports))
|
||||
for assoc, port in zip(r.associations, mock_ports):
|
||||
self.assertEqual(assoc.port_id, port.id)
|
||||
self.assertEqual(assoc.ip_address_id, mock_address.id)
|
||||
self.assertEqual(assoc.enabled, False)
|
||||
self.context.session.add.assert_called_once_with(r)
|
||||
|
||||
def test_port_associate_ip_enable_port(self):
|
||||
self.context.session.add = mock.Mock()
|
||||
mock_port = models.Port(id="1", network_id="2", ip_addresses=[])
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
r = db_api.port_associate_ip(self.context, [mock_port], mock_address,
|
||||
enable_port="1")
|
||||
self.assertEqual(len(r.associations), 1)
|
||||
assoc = r.associations[0]
|
||||
self.assertEqual(assoc.port_id, mock_port.id)
|
||||
self.assertEqual(assoc.ip_address_id, mock_address.id)
|
||||
self.assertEqual(assoc.enabled, True)
|
||||
self.context.session.add.assert_called_once_with(r)
|
||||
|
||||
def test_port_disassociate_ip(self):
|
||||
self.context.session.add = mock.Mock()
|
||||
self.context.session.delete = mock.Mock()
|
||||
mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[])
|
||||
for x in xrange(4)]
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
mock_assocs = []
|
||||
for p in mock_ports:
|
||||
assoc = models.PortIpAssociation()
|
||||
assoc.port_id = p.id
|
||||
assoc.port = p
|
||||
assoc.ip_address_id = mock_address.id
|
||||
assoc.ip_address = mock_address
|
||||
mock_assocs.append(assoc)
|
||||
|
||||
r = db_api.port_disassociate_ip(self.context, mock_ports[1:3],
|
||||
mock_address)
|
||||
|
||||
self.assertEqual(len(r.associations), 2)
|
||||
self.assertEqual(r.associations[0], mock_assocs[0])
|
||||
self.assertEqual(r.associations[1], mock_assocs[3])
|
||||
self.context.session.add.assert_called_once_with(r)
|
||||
self.context.session.delete.assert_has_calls(
|
||||
[mock.call(mock_assocs[1]), mock.call(mock_assocs[2])])
|
||||
|
||||
@mock.patch("quark.db.api.port_disassociate_ip")
|
||||
@mock.patch("quark.db.api.port_associate_ip")
|
||||
def test_update_port_associations_for_ip(self, associate_mock,
|
||||
disassociate_mock):
|
||||
self.context.session.add = mock.Mock()
|
||||
self.context.session.delete = mock.Mock()
|
||||
mock_ports = [models.Port(id=str(x), network_id="2", ip_addresses=[])
|
||||
for x in xrange(4)]
|
||||
mock_address = models.IPAddress(id="1", address=3232235876,
|
||||
address_readable="192.168.1.100",
|
||||
subnet_id="1", network_id="2",
|
||||
version=4, used_by_tenant_id="1")
|
||||
mock_address.ports = mock_ports
|
||||
new_port_list = mock_ports[1:3]
|
||||
new_port_list.append(models.Port(id="4", network_id="2",
|
||||
ip_addresses=[]))
|
||||
# NOTE(thomasem): Should be the new address after associating
|
||||
# any new ports in the list.
|
||||
mock_new_address = associate_mock.return_value
|
||||
|
||||
db_api.update_port_associations_for_ip(self.context,
|
||||
new_port_list,
|
||||
mock_address)
|
||||
|
||||
associate_mock.assert_called_once_with(self.context,
|
||||
set([new_port_list[2]]),
|
||||
mock_address)
|
||||
|
||||
disassociate_mock.assert_called_once_with(self.context,
|
||||
set([mock_ports[0],
|
||||
mock_ports[3]]),
|
||||
mock_new_address)
|
||||
|
@ -16,7 +16,6 @@ from sqlalchemy.sql import table
|
||||
|
||||
from quark.db.custom_types import INET
|
||||
import quark.db.migration
|
||||
from quark.db import models
|
||||
from quark.tests import test_base
|
||||
|
||||
|
||||
@ -800,7 +799,7 @@ class Test4fc07b41d45c(BaseMigrationTest):
|
||||
'quark_ip_addresses', self.metadata,
|
||||
sa.Column('id', sa.String(length=36), primary_key=True),
|
||||
sa.Column('_deallocated', sa.Boolean()),
|
||||
sa.Column('address_type', sa.Enum(*models.IPAddress.address_types))
|
||||
sa.Column('address_type', sa.Enum('fixed', 'shared', 'floating'))
|
||||
)
|
||||
self.metadata.create_all()
|
||||
alembic_command.stamp(self.config, self.previous_revision)
|
||||
|
Loading…
x
Reference in New Issue
Block a user