jh629g 57323cb05d Modifications to function with 3.6 Ranger Build
Several tests because non-functional after git removal
and python 3.6 code update. This patchset will ensure
that these tests function with current ranger codebase.
Also removes erroneous use of rand_region functionality
when testing flavors, flavor testing should always use
real ranger regions, otherwise database will require
manual purging of flavor after it goes into error status.

Change-Id: I941b45b62ae1288e5594942d4567dc90fc968842
2020-01-27 16:48:46 -06:00

196 lines
6.4 KiB
Python
Executable File

# Copyright 2015
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import requests
from tempest import config
from tempest.lib import auth
from tempest.lib.common import rest_client
CONF = config.CONF
class ResponseError(Exception):
pass
class ConnectionError(Exception):
pass
class RangerClientBase(rest_client.RestClient):
rms_url = CONF.ranger.ranger_rms_base_url
auth_region = CONF.identity.region
timeout = 10
def __init__(self, *args, **kwargs):
# Get the rbac value and also remove it from kwargs before
# sending to parent
self.rbac = kwargs.pop('rbac', False)
super().__init__(*args, **kwargs)
def get_keystone_ep(self):
"""Get the Keystone EP from tempest conf."""
identity_url = CONF.identity.uri_v3 or ""
identity_url = identity_url.strip('/v3')
return identity_url
def get_data(self):
data = '''
{
"auth":{
"identity":{
"methods":[
"password"
],
"password":{
"user":{
"domain":{
"name":"%s"
},
"name":"%s",
"password":"%s"
}
}
},
"scope":{
"project":{
"name":"%s",
"domain":{
"name":"%s"
}
}
}
}
}'''
if self.rbac:
# Pick the credentials from auth_provider where patrole has
# switched the role
data = data % (self.auth_provider.credentials.user_domain_name,
self.auth_provider.credentials.username,
self.auth_provider.credentials.password,
self.auth_provider.credentials.project_name,
self.auth_provider.credentials.project_domain_name)
else:
data = data % (CONF.auth.admin_domain_name,
CONF.auth.admin_username,
CONF.auth.admin_password,
CONF.auth.admin_project_name,
CONF.auth.admin_domain_name)
return data
def get_token(self, timeout, host):
headers = {
'Content-Type': 'application/json',
}
url = '%s/v3/auth/tokens'
if not CONF.ranger.auth_enabled:
return None
region = self.auth_region
keystone_ep = self.get_keystone_ep()
if keystone_ep is None:
raise ConnectionError(
'Failed in get_token, host: {}, region: {}'.format(host,
region))
url = url % (keystone_ep)
data = self.get_data()
try:
resp = requests.post(url,
timeout=timeout,
data=data,
headers=headers)
if resp.status_code != 201:
raise ResponseError(
'Failed to get token (Reason: {})'.format(
resp.status_code))
return resp.headers['x-subject-token']
except Exception as ex:
raise ConnectionError(str(ex))
def get_headers(self, accept_type=None, send_type=None):
if self.rbac:
requester = self.auth_provider.credentials.username
else:
requester = CONF.auth.admin_username
headers = {'X-Auth-Region': CONF.identity.region,
'X-Auth-Token': self.get_token(self.timeout, self.rms_url),
'X-RANGER-Tracking-Id': 'test',
'X-RANGER-Requester': requester,
'X-RANGER-Client': 'cli',
'Content-Type': 'application/json'
}
return headers
def get_request(self, uri, expected_body_schema):
ex_headers = self.get_headers()
resp, body = self.get(uri, extra_headers=ex_headers)
self.expected_success(200, resp.status)
body = json.loads(body)
self.validate_response(expected_body_schema, resp, body)
return resp, body
def put_request(self, uri, put_body, expected_body_schema):
ex_headers = self.get_headers()
resp, body = self.put(uri, body=put_body, extra_headers=ex_headers)
self.expected_success([200, 201], resp.status)
body = json.loads(body)
self.validate_response(expected_body_schema, resp, body)
return resp, body
def delete_request(self, uri, expected_body_schema):
ex_headers = self.get_headers()
resp, body = self.delete(uri, extra_headers=ex_headers)
self.expected_success(204, resp.status)
self.validate_response(expected_body_schema, resp, body)
return resp, body
def post_request(self, uri, post_body, expected_body_schema):
ex_headers = self.get_headers()
resp, body = self.post(uri, body=post_body,
extra_headers=ex_headers)
self.expected_success([200, 201], resp.status)
body = json.loads(body)
self.validate_response(expected_body_schema, resp, body)
return resp, body
class RangerAuthProvider(auth.KeystoneV3AuthProvider):
def __init__(self, credentials, auth_url=CONF.identity.uri_v3):
_auth = auth_url or ""
super(RangerAuthProvider, self).__init__(credentials, _auth)
def auth_request(self, method, url, headers=None, body=None, filters=None):
filters = {'service': 'identity'}
auth_headers = super(RangerAuthProvider,
self).auth_request(method,
url,
filters=filters)
base_headers = auth_headers[1]
base_headers.update(headers)
auth_req = dict(url=url, headers=base_headers, body=body)
return auth_req['url'], auth_req['headers'], auth_req['body']