Chi Lo 3db2d2c746 Added skip check when Ranger not reachable
When Ranger is not reachable, test cases will need to be
skipped.  In addition, remove the extra skip check for RMS and IMS
services.  If setup fails for these modules, let the test cases
fail as well.

Change-Id: Ibadb065ef9816cf1cb6ce44e5229753d679f8bf6
2020-08-11 14:07:03 -07:00

132 lines
5.3 KiB
Python
Executable File

# Copyright 2016 AT&T Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_concurrency
from oslo_log import log as logging
from ranger_tempest_plugin import clients
from tempest import config
from tempest import test
from tempest.lib import exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
PREFIX = 'alt_region'
oslo_concurrency.lockutils.set_defaults(CONF.oslo_concurrency.lock_path)
SYNC = oslo_concurrency.lockutils.synchronized_with_prefix(PREFIX)
class BaseOrmTest(test.BaseTestCase):
client_manager = clients.OrmClientManager
credentials = ['admin', 'primary', 'alt']
identity_url = CONF.identity.uri_v3 or ""
identity_url = identity_url.strip('/v3')
build_timeout = 120
build_interval = 10
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
super(BaseOrmTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
cls.project_client = cls.os_admin.projects_client
cls.region_client = cls.os_admin.rms_client
super(BaseOrmTest, cls).setup_clients()
@classmethod
def resource_setup(cls):
super(BaseOrmTest, cls).resource_setup()
try:
# check if ranger region exists
cls.region_client.get_region(CONF.identity.region)
except exceptions.Unauthorized:
# delete test region
cls.addClassResourceCleanup(cls.region_client.delete_region,
CONF.identity.region)
# create region for testing if it doesnt exist
cls.region_client.create_region(
CONF.identity.region,
**{'id': CONF.identity.region,
'name': CONF.identity.region,
'description': 'region for testing',
'status': 'functional',
'rangerAgentVersion': '3.0',
'OSVersion': 'stein',
'CLLI': '123450',
'address': {'country': 'This', 'state': 'region',
'city': 'is for', 'street': 'test',
'zip': 'purposes'},
'metadata': {},
'designType': 'large',
'domainName': CONF.auth.admin_domain_name,
'endpoints': [{
'publicURL':
'https://dashboard-nc.%s.cci.att.com'
% CONF.identity.region,
'type': 'dashboard'
}, {
'publicURL': cls.identity_url,
'type': 'identity'
}, {
'publicURL':
'https://ranger-agent-nc.%s.cci.att.com'
% CONF.identity.region,
'type': 'ord'
}]})
except (exceptions.UnexpectedResponseCode, exceptions.NotFound):
raise cls.skipException("Test skipped, Ranger region unavailable")
except Exception as exp:
if "No route to host" in str(exp):
skip_msg = ("Tests skipped , Ranger not reachable: %s" %
str(exp))
raise cls.skipException(skip_msg)
raise exp
try:
# alt_region_available is True, skip tests and warn
# if alt_region doesn't exist
if CONF.ranger.alt_region_available is True:
cls.region_client.get_region(CONF.ranger.alt_region)
except exceptions.Unauthorized:
skip_msg = ("Tests skipped as Ranger alt region does not exist."
" Create alternate ranger region before running"
" these tests again or change alt_region_available"
" to False to run against one site.")
raise cls.skipException(skip_msg)
# Get regions in ranger deployment
_, regions_list = cls.region_client.list_regions()
for region in regions_list['regions']:
if region['id'] == CONF.identity.region or \
(region['id'] == CONF.ranger.alt_region
and CONF.ranger.alt_region_available is True):
region['domainName'] = CONF.auth.admin_domain_name
cls.region_client.update_region(region['id'],
**region)
cls.addClassResourceCleanup(cls.region_client.update_region,
region['id'],
**region)
def assert_expected(self, expected, actual, excluded_keys):
for key, value in list(expected.items()):
if key not in excluded_keys:
self.assertIn(key, actual)
self.assertEqual(value, actual[key], key)