# Copyright 2016 AT&T Corp # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import oslo_concurrency from oslo_log import log as logging from ranger_tempest_plugin import clients from tempest import config from tempest import test from tempest.lib import exceptions CONF = config.CONF LOG = logging.getLogger(__name__) PREFIX = 'alt_region' oslo_concurrency.lockutils.set_defaults(CONF.oslo_concurrency.lock_path) SYNC = oslo_concurrency.lockutils.synchronized_with_prefix(PREFIX) class BaseOrmTest(test.BaseTestCase): client_manager = clients.OrmClientManager credentials = ['admin', 'primary', 'alt'] identity_url = CONF.identity.uri_v3 or "" identity_url = identity_url.strip('/v3') build_timeout = 120 image_build_timeout = 300 build_interval = 10 @classmethod def setup_credentials(cls): cls.set_network_resources() super(BaseOrmTest, cls).setup_credentials() @classmethod def setup_clients(cls): cls.project_client = cls.os_admin.projects_client cls.region_client = cls.os_admin.rms_client super(BaseOrmTest, cls).setup_clients() @classmethod def resource_setup(cls): super(BaseOrmTest, cls).resource_setup() try: # check if ranger region exists cls.region_client.get_region(CONF.identity.region) except Exception as exp: skip_msg = "Ranger services unavailable: {}".format( str(exp)) raise cls.skipException(skip_msg) try: # alt_region_available is True, skip tests and warn # if alt_region doesn't exist if CONF.ranger.alt_region_available is True: cls.region_client.get_region(CONF.ranger.alt_region) except exceptions.Unauthorized as e: skip_msg = ("Tests skipped as Ranger alt region does not exist." " Create alternate ranger region before running" " these tests again or change alt_region_available" " to False to run against one site.") raise cls.skipException(skip_msg) from e def assert_expected(self, expected, actual, excluded_keys): for key, value in list(expected.items()): if key not in excluded_keys: self.assertIn(key, actual) self.assertEqual(value, actual[key], key)