[OS:FVT] Adding designate ZONE API tests

Includes CRUD tests for zone api
root@prome-mdt-dhcp412:/opt/stack/tempest# ostestr --pdb vmware_nsx_tempest.tests.api.test_v2_designate.TestZones
{0} vmware_nsx_tempest.tests.api.test_v2_designate.TestZones.test_create_zone [7.087180s] ... ok
{0} vmware_nsx_tempest.tests.api.test_v2_designate.TestZones.test_delete_zone [0.456610s] ... ok
{0} vmware_nsx_tempest.tests.api.test_v2_designate.TestZones.test_list_zones [0.457285s] ... ok
{0} vmware_nsx_tempest.tests.api.test_v2_designate.TestZones.test_show_zone [0.438572s] ... ok
{0} vmware_nsx_tempest.tests.api.test_v2_designate.TestZones.test_update_zone [5.838672s] ... ok

 - Worker 0 (1 tests) => 0:00:13.447006
root@prome-mdt-dhcp412:/opt/stack/tempest# ostestr --pdb vmware_nsx_tempest.tests.scenario.test_designate.TestZonesScenario.test_network_zone_update
{0} vmware_nsx_tempest.tests.scenario.test_designate.TestZonesScenario.test_network_zone_update [13.662460s] ... ok

Change-Id: I159b69b2c526fd802b66dad30419b5b6a08e81ca
This commit is contained in:
Deepthi Kandavara Jayarama 2018-04-11 22:53:09 +00:00
parent e64aba7fe1
commit 0f313d12d4
7 changed files with 924 additions and 1 deletions

View File

@ -0,0 +1,85 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ZoneFile(object):
def __init__(self, origin, ttl, records):
self.origin = origin
self.ttl = ttl
self.records = records
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
@classmethod
def from_text(cls, text):
"""Return a ZoneFile from a string containing the zone file contents"""
# filter out empty lines and strip all leading/trailing whitespace.
# this assumes no multiline records
lines = [x.strip() for x in text.split('\n') if x.strip()]
assert lines[0].startswith('$ORIGIN')
assert lines[1].startswith('$TTL')
return ZoneFile(
origin=lines[0].split(' ')[1],
ttl=int(lines[1].split(' ')[1]),
records=[ZoneFileRecord.from_text(x) for x in lines[2:]],
)
class ZoneFileRecord(object):
def __init__(self, name, type, data):
self.name = str(name)
self.type = str(type)
self.data = str(data)
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted(self.__dict__.items())))
@classmethod
def from_text(cls, text):
"""Create a ZoneFileRecord from a line of text of a zone file, like:
mydomain.com. IN NS ns1.example.com.
"""
# assumes records don't have a TTL between the name and the class.
# assumes no parentheses in the record, all on a single line.
parts = [x for x in text.split(' ', 4) if x.strip()]
name, rclass, rtype, data = parts
assert rclass == 'IN'
return cls(name=name, type=rtype, data=data)

View File

@ -0,0 +1,228 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
LOG = logging.getLogger(__name__)
def wait_for_zone_404(client, zone_id):
"""Waits for a zone to 404."""
LOG.info('Waiting for zone %s to 404', zone_id)
start = int(time.time())
while True:
time.sleep(client.build_interval)
try:
_, zone = client.show_zone(zone_id)
except lib_exc.NotFound:
LOG.info('Zone %s is 404ing', zone_id)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Zone %(zone_id)s failed to 404 within the required '
'time (%(timeout)s s). Current status: '
'%(status_curr)s' %
{'zone_id': zone_id,
'status_curr': zone['status'],
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_zone_status_active(client, zone_id, status):
"""Waits for a zone to reach given status."""
LOG.info('Waiting for zone %s to reach ACTIVE', zone_id)
_, zone = client.show_zone(zone_id)
start = int(time.time())
status = 'ACTIVE'
while zone['status'] != status:
time.sleep(client.build_interval)
_, zone = client.show_zone(zone_id)
status_curr = zone['status']
if status_curr == status:
LOG.info('Zone %s reached %s', zone_id, status)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Zone %(zone_id)s failed to reach status=%(status)s '
'within the required time (%(timeout)s s). Current '
'status: %(status_curr)s' %
{'zone_id': zone_id,
'status': status,
'status_curr': status_curr,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_zone_import_status(client, zone_import_id, status):
"""Waits for an imported zone to reach the given status."""
LOG.info('Waiting for zone import %s to reach %s', zone_import_id, status)
_, zone_import = client.show_zone_import(zone_import_id)
start = int(time.time())
while zone_import['status'] != status:
time.sleep(client.build_interval)
_, zone_import = client.show_zone_import(zone_import_id)
status_curr = zone_import['status']
if status_curr == status:
LOG.info('Zone import %s reached %s', zone_import_id, status)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Zone import %(zone_import_id)s failed to reach '
'status=%(status)s within the required time '
'(%(timeout)s s). Current '
'status: %(status_curr)s' %
{'zone_import_id': zone_import_id,
'status': status,
'status_curr': status_curr,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_zone_export_status(client, zone_export_id, status):
"""Waits for an exported zone to reach the given status."""
LOG.info('Waiting for zone export %s to reach %s', zone_export_id, status)
_, zone_export = client.show_zone_export(zone_export_id)
start = int(time.time())
while zone_export['status'] != status:
time.sleep(client.build_interval)
_, zone_export = client.show_zone_export(zone_export_id)
status_curr = zone_export['status']
if status_curr == status:
LOG.info('Zone export %s reached %s', zone_export_id, status)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Zone export %(zone_export_id)s failed to reach '
'status=%(status)s within the required time '
'(%(timeout)s s). Current '
'status: %(status_curr)s' %
{'zone_export_id': zone_export_id,
'status': status,
'status_curr': status_curr,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_recordset_status(client, zone_id, recordset_id, status):
"""Waits for a recordset to reach the given status."""
LOG.info('Waiting for recordset %s to reach %s',
recordset_id, status)
_, recordset = client.show_recordset(zone_id, recordset_id)
start = int(time.time())
while recordset['status'] != status:
time.sleep(client.build_interval)
_, recordset = client.show_recordset(zone_id, recordset_id)
status_curr = recordset['status']
if status_curr == status:
LOG.info('Recordset %s reached %s', recordset_id, status)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Recordset %(recordset_id)s failed to reach '
'status=%(status) within the required time '
'(%(timeout)s s). Current '
'status: %(status_curr)s' %
{'recordset_id': recordset_id,
'status': status,
'status_curr': status_curr,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_query(client, name, rdatatype, found=True):
"""Query nameservers until the record of the given name and type is found.
:param client: A QueryClient
:param name: The record name for which to query
:param rdatatype: The record type for which to query
:param found: If True, wait until the record is found. Else, wait until the
record disappears.
"""
state = "found" if found else "removed"
LOG.info("Waiting for record %s of type %s to be %s on nameservers %s",
name, rdatatype, state, client.nameservers)
start = int(time.time())
while True:
time.sleep(client.build_interval)
responses = client.query(name, rdatatype)
if found:
all_answers_good = all(r.answer for r in responses)
else:
all_answers_good = all(not r.answer for r in responses)
if not client.nameservers or all_answers_good:
LOG.info("Record %s of type %s was successfully %s on nameservers "
"%s", name, rdatatype, state, client.nameservers)
return
if int(time.time()) - start >= client.build_timeout:
message = ('Record %(name)s of type %(rdatatype)s not %(state)s '
'on nameservers %(nameservers)s within the required '
'time (%(timeout)s s)' %
{'name': name,
'rdatatype': rdatatype,
'state': state,
'nameservers': client.nameservers,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = "(%s) %s" % (caller, message)
raise lib_exc.TimeoutException(message)

View File

@ -25,6 +25,7 @@ from tempest.lib import exceptions as lib_exc
from vmware_nsx_tempest._i18n import _
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import traffic_manager
from vmware_nsx_tempest.services import designate_base
from vmware_nsx_tempest.services.lbaas import health_monitors_client
from vmware_nsx_tempest.services.lbaas import listeners_client
from vmware_nsx_tempest.services.lbaas import load_balancers_client
@ -43,7 +44,8 @@ RULE_TYPE_DSCP_MARK = "dscp_marking"
# It includes feature related function such CRUD Mdproxy, L2GW or QoS
class FeatureManager(traffic_manager.IperfManager):
class FeatureManager(traffic_manager.IperfManager,
designate_base.DnsClientBase):
@classmethod
def setup_clients(cls):
"""Create various client connections. Such as NSXv3 and L2 Gateway.
@ -107,6 +109,13 @@ class FeatureManager(traffic_manager.IperfManager):
net_client.region,
net_client.endpoint_type,
**_params)
net_client.service = 'dns'
cls.zones_v2_client = openstack_network_clients.ZonesV2Client(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
#
# FwaasV2 base class
@ -715,3 +724,103 @@ class FeatureManager(traffic_manager.IperfManager):
def list_rule_types(self):
result = self.types_client.list_rule_types()
return result.get('rule_types', result)
#
# Designate Zone
#
def rand_zone_name(name='', prefix=None, suffix='.com.'):
"""Generate a random zone name
:param str name: The name that you want to include
:param prefix: the exact text to start the string. Defaults to "rand"
:param suffix: the exact text to end the string
:return: a random zone name e.g. example.org.
:rtype: string
"""
name = 'tempest'
name = data_utils.rand_name(name=name, prefix=prefix)
zone_name = name + suffix
return zone_name
def rand_email(self, zone_name):
"""Generate a random zone name
:return: a random zone name e.g. example.org.
:rtype: string
"""
email_id = 'example@%s' % str(zone_name).rstrip('.')
return email_id
def create_zone(self, name=None, email=None, description=None,
wait_until=False):
"""Create a zone with the specified parameters.
:param name: The name of the zone.
Default: Random Value
:param email: The email for the zone.
Default: Random Value
:param ttl: The ttl for the zone.
Default: Random Value
:param description: A description of the zone.
Default: Random Value
:param wait_until: Block until the zone reaches the desired status
:return: A tuple with the server response and the created zone.
"""
if name is None:
name = self.rand_zone_name()
zone = {
'name': name,
'email': email or self.rand_email(name),
'description': description or data_utils.rand_name('test-zone'),
}
_, body = self.zones_v2_client.create_zone(wait_until, **zone)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_zone, body['id'])
# Create Zone should Return a HTTP 202
return body
def delete_zone(self, uuid):
"""Deletes a zone having the specified UUID.
:param uuid: The unique identifier of the zone.
:return: A tuple with the server response and the response body.
"""
_, body = self.zones_v2_client.delete_zone(uuid)
return body
def show_zone(self, uuid):
"""Gets a specific zone.
:param uuid: Unique identifier of the zone in UUID format.
:return: Serialized zone as a dictionary.
"""
return self.zones_v2_client.show_zone(uuid)
def list_zones(self):
"""Gets a list of zones.
:return: Serialized zones as a list.
"""
return self.zones_v2_client.list_zones()
def update_zone(self, uuid, email=None, ttl=None,
description=None, wait_until=False):
"""Update a zone with the specified parameters.
:param uuid: The unique identifier of the zone.
:param email: The email for the zone.
Default: Random Value
:param ttl: The ttl for the zone.
Default: Random Value
:param description: A description of the zone.
Default: Random Value
:param wait_until: Block until the zone reaches the desiered status
:return: A tuple with the server response and the updated zone.
"""
zone = {
'email': email or self.rand_email(),
'ttl': ttl or self.rand_ttl(),
'description': description or self.rand_name('test-zone'),
}
_, body = self.zones_v2_client.update_zone(uuid, wait_until, **zone)
return body
def list_record_set_zone(self, uuid):
"""list recordsets of a zone.
:param uuid: The unique identifier of the zone.
"""
body = self.zones_v2_client.list_recordset_zone(uuid)
return body

View File

@ -0,0 +1,244 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import six
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
from six.moves.urllib import parse as urllib
from tempest.lib.common import rest_client
from tempest.lib import exceptions as lib_exc
from vmware_nsx_tempest.common import models
LOG = logging.getLogger(__name__)
def handle_errors(f):
"""A decorator that allows to ignore certain types of errors."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
param_name = 'ignore_errors'
ignored_errors = kwargs.get(param_name, tuple())
if param_name in kwargs:
del kwargs[param_name]
try:
return f(*args, **kwargs)
except ignored_errors as e:
# Silently ignore errors as requested
LOG.debug('Ignoring exception of type %s, as requested', type(e))
return wrapper
class DnsClientBase(rest_client.RestClient):
"""Base Tempest REST client for Designate API"""
uri_prefix = ''
CREATE_STATUS_CODES = [202]
SHOW_STATUS_CODES = [200]
LIST_STATUS_CODES = [200]
PUT_STATUS_CODES = []
UPDATE_STATUS_CODES = [202]
DELETE_STATUS_CODES = [202]
def serialize(self, data):
if isinstance(data, six.string_types):
return data
return json.dumps(data)
def deserialize(self, resp, object_str):
if 'application/json' in resp['content-type']:
return json.loads(object_str)
elif 'text/dns' in resp['content-type']:
return models.ZoneFile.from_text(object_str.decode("utf-8"))
else:
raise lib_exc.InvalidContentType()
@classmethod
def expected_success(cls, expected_code, read_code):
# the base class method does not check correctly if read_code is not
# an int. warn about this and cast to int to avoid silent errors.
if not isinstance(read_code, int):
message = ("expected_success(%(expected_code)r, %(read_code)r) "
"received not-int read_code %(read_code)r" %
{'expected_code': expected_code,
'read_code': read_code})
LOG.warn(message)
return super(DnsClientBase, cls).expected_success(
expected_code=expected_code, read_code=int(read_code),
)
def get_uri(self, resource_name, uuid=None, params=None):
"""Get URI for a specific resource or object.
:param resource_name: The name of the REST resource, e.g., 'zones'.
:param uuid: The unique identifier of an object in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:returns: Relative URI for the resource or object.
"""
uri_pattern = '{pref}/{res}{uuid}{params}'
uuid = '/%s' % uuid if uuid else ''
params = '?%s' % urllib.urlencode(params) if params else ''
return uri_pattern.format(pref=self.uri_prefix,
res=resource_name,
uuid=uuid,
params=params)
def _create_request(self, resource, data=None, params=None,
headers=None, extra_headers=False):
"""Create an object of the specified type.
:param resource: The name of the REST resource, e.g., 'zones'.
:param data: A Python dict that represents an object of the
specified type (to be serialized) or a plain string which
is sent as-is.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:returns: A tuple with the server response and the deserialized created
object.
"""
body = self.serialize(data)
uri = self.get_uri(resource, params=params)
resp, body = self.post(uri, body=body, headers=headers,
extra_headers=extra_headers)
self.expected_success(self.CREATE_STATUS_CODES, resp.status)
return resp, self.deserialize(resp, body)
def _show_request(self, resource, uuid, headers=None, params=None,
extra_headers=False, ignore_response=False):
"""Gets a specific object of the specified type.
:param resource: The name of the REST resource, e.g., 'zones'.
:param uuid: Unique identifier of the object in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:returns: Serialized object as a dictionary.
"""
uri = self.get_uri(resource, uuid=uuid, params=params)
resp, body = self.get(
uri, headers=headers, extra_headers=extra_headers)
if not ignore_response:
self.expected_success(self.SHOW_STATUS_CODES, resp.status)
return resp, self.deserialize(resp, body)
def _list_request(self, resource, params=None):
"""Gets a list of objects.
:param resource: The name of the REST resource, e.g., 'zones'.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:returns: Serialized object as a dictionary.
"""
uri = self.get_uri(resource, params=params)
resp, body = self.get(uri)
self.expected_success(self.LIST_STATUS_CODES, resp.status)
return resp, self.deserialize(resp, body)
def _put_request(self, resource, uuid, data, params=None):
"""Updates the specified object using PUT request.
:param resource: The name of the REST resource, e.g., 'zones'.
:param uuid: Unique identifier of the object in UUID format.
:param data: A Python dict that represents an object of the
specified type (to be serialized) or a plain string which
is sent as-is.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:returns: Serialized object as a dictionary.
"""
body = self.serialize(data)
uri = self.get_uri(resource, uuid=uuid, params=params)
resp, body = self.put(uri, body=body)
self.expected_success(self.PUT_STATUS_CODES, resp.status)
return resp, self.deserialize(resp, body)
def _update_request(self, resource, uuid, data, params=None, headers=None,
extra_headers=False):
"""Updates the specified object using PATCH request.
:param resource: The name of the REST resource, e.g., 'zones'
:param uuid: Unique identifier of the object in UUID format.
:param data: A Python dict that represents an object of the
specified type (to be serialized) or a plain string which
is sent as-is.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:returns: Serialized object as a dictionary.
"""
body = self.serialize(data)
uri = self.get_uri(resource, uuid=uuid, params=params)
resp, body = self.patch(uri, body=body,
headers=headers, extra_headers=True)
self.expected_success(self.UPDATE_STATUS_CODES, resp.status)
return resp, self.deserialize(resp, body)
def _delete_request(self, resource, uuid, params=None, headers=None,
extra_headers=False):
"""Deletes the specified object.
:param resource: The name of the REST resource, e.g., 'zones'.
:param uuid: The unique identifier of an object in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:returns: A tuple with the server response and the response body.
"""
uri = self.get_uri(resource, uuid=uuid, params=params)
resp, body = self.delete(
uri, headers=headers, extra_headers=extra_headers)
self.expected_success(self.DELETE_STATUS_CODES, resp.status)
if resp.status == 202:
body = self.deserialize(resp, body)
return resp, body

View File

@ -18,6 +18,8 @@ from tempest import config
from tempest.lib.services.network import base
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.common import waiters
from vmware_nsx_tempest.services import designate_base
LOG = log.getLogger(__name__)
CONF = config.CONF
@ -298,3 +300,46 @@ class QosPoliciesClient(base.BaseNetworkClient):
def list_policies(self, **filters):
uri = self.resource_base_path
return self.list_resources(uri, **filters)
class ZonesV2Client(designate_base.DnsClientBase):
"""
Request resources via API for ZonesV2Client
zonesv2 create zone
zonesv2 update zone
zonesv2 delete zone
zonesv2 show zone
zonesv2 list zones
"""
resource = 'zone'
resource_plural = 'policies'
path = 'zones'
resource_base_path = '/v2/%s' % path
def create_zone(self, wait_until, **zone):
resp, body = self._create_request(self.resource_base_path, zone)
self.expected_success(202, resp.status)
if wait_until:
waiters.wait_for_zone_status_active(self, body['id'], wait_until)
return resp, body
def update_zone(self, zone_id, wait_until, **zone):
resp, body = self._update_request(self.resource_base_path,
zone_id, zone)
# Update Zone should Return a HTTP 202
self.expected_success(202, resp.status)
if wait_until:
waiters.wait_for_zone_status_active(self, body['id'], wait_until)
return resp, body
def show_zone(self, zone_id):
return self._show_request(self.resource_base_path, zone_id)
def delete_zone(self, zone_id):
resp, body = self._delete_request(self.resource_base_path, zone_id)
# Delete Zone should Return a HTTP 202
self.expected_success(202, resp.status)
return resp, body
def list_zones(self):
return self._list_request(self.resource_base_path)

View File

@ -0,0 +1,111 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest.lib import feature_manager
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestZonesV2Ops(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestZonesV2Ops, cls).skip_checks()
if not test.is_extension_enabled('designate', 'network'):
msg = "Extension designate is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestZonesV2Ops, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestZonesV2Ops, cls).setup_clients()
class TestZones(TestZonesV2Ops):
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'action']
@decorators.idempotent_id('e26cf8c6-164d-4097-b066-4e2100382d53')
def test_create_zone(self):
"""Creating a v2 Zone"""
LOG.info('Create a zone')
zone = self.create_zone(wait_until=True)
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', zone['action'])
self.assertEqual('PENDING', zone['status'])
@decorators.idempotent_id('76586e1f-7466-4dd1-bcdf-b6805c63731c')
def test_delete_zone(self):
LOG.info('Create a zone')
zone = self.create_zone()
LOG.info('Delete the zone')
body = self.delete_zone(zone['id'])
LOG.info('Ensure we respond with DELETE+PENDING')
self.assertEqual('DELETE', body['action'])
self.assertEqual('PENDING', body['status'])
@decorators.idempotent_id('3fa18ce7-ac47-425f-a1d1-2baa5ead0ed1')
def test_show_zone(self):
LOG.info('Create a zone')
zone = self.create_zone()
LOG.info('Fetch the zone')
body = self.show_zone(zone['id'])
LOG.info('Ensure the fetched response matches the created zone')
self.assertEqual(zone['links'], body[1]['links'])
self.assertEqual(zone['name'], body[1]['name'])
self.assertEqual(zone['email'], body[1]['email'])
self.assertEqual(zone['ttl'], body[1]['ttl'])
@decorators.idempotent_id('7e35c62c-5baf-4d32-b3e8-59e76ea6571f')
def test_list_zones(self):
LOG.info('Create a zone')
self.create_zone()
LOG.info('List zones')
body = self.list_zones()
self.assertGreater(len(body[1]['zones']), 0)
@decorators.idempotent_id('55ca3fc8-6652-4f00-9af8-c01ea5bae5a0')
def test_update_zone(self):
LOG.info('Create a zone')
zone = self.create_zone()
# Generate a random description
description = data_utils.rand_name()
LOG.info('Update the zone')
zone = self.update_zone(
zone['id'], email=zone['email'], ttl=zone['ttl'],
description=description, wait_until=True)
LOG.info('Ensure we respond with UPDATE+PENDING')
self.assertEqual('UPDATE', zone['action'])
self.assertEqual('PENDING', zone['status'])
LOG.info('Ensure we respond with updated values')
self.assertEqual(description, zone['description'])

View File

@ -0,0 +1,101 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from tempest import test
from vmware_nsx_tempest.lib import feature_manager
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestZonesV2Ops(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestZonesV2Ops, cls).skip_checks()
if not test.is_extension_enabled('designate', 'network'):
msg = "Extension designate is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestZonesV2Ops, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestZonesV2Ops, cls).setup_clients()
def define_security_groups(self):
self.zone_sg = self.create_topology_empty_security_group(
namestart="zone_sg_")
# Common rules to allow the following traffic
# 1. Egress ICMP IPv4 any any
# 2. Egress ICMP IPv6 any any
# 3. Ingress ICMP IPv4 from public network
# 4. Ingress TCP 22 (SSH) from public network
common_ruleset = [dict(direction='egress', protocol='icmp'),
dict(direction='egress', protocol='icmp',
ethertype='IPv6'),
dict(direction='egress', protocol='tcp',
port_range_min=22, port_range_max=22),
dict(direction='egress', protocol='udp'),
dict(direction='ingress', protocol='tcp',
port_range_min=22, port_range_max=22),
dict(direction='ingress', protocol='udp'),
dict(direction='ingress', protocol='icmp')]
for rule in common_ruleset:
self.add_security_group_rule(self.qos_sg, rule)
class TestZonesScenario(TestZonesV2Ops):
@decorators.idempotent_id('e26cf8c6-164d-4097-b066-4e2100382d53')
def test_network_zone_update(self):
"""
Test
Create a zone, check zone exits, create a network
update network with the zone
"""
LOG.info('Create a zone')
zone = self.create_zone(wait_until=True)
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', zone['action'])
self.assertEqual('PENDING', zone['status'])
network_designate = self.create_topology_network(
"network_designate", dns_domain=zone['name'])
self.create_topology_subnet("subnet_designate", network_designate)
self.assertEqual(network_designate['dns_domain'], zone['name'])
LOG.info('Show recordset of the zone')
recordset = self.list_record_set_zone(zone['id'])
self.assertEqual(recordset['metadata']['total_count'], 2)
if any(record['type'] == 'NS' for record in recordset['recordsets']):
LOG.info('NS record is present')
else:
LOG.error('NS record is missing')
if any(record['type'] == 'SOA' for record in recordset['recordsets']):
LOG.info('SOA record if present')
else:
LOG.info('NS record is missing')