Merge "Adds tests for client module and refactors"

This commit is contained in:
Jenkins 2013-08-26 12:45:18 +00:00 committed by Gerrit Code Review
commit 710a658ede
3 changed files with 422 additions and 50 deletions

View File

@ -39,6 +39,71 @@ def _get_endpoint(client, **kwargs):
endpoint_type=kwargs.get('endpoint_type') or 'publicURL')
def _get_token_and_endpoint(**kwargs):
token = kwargs.get('os_auth_token')
endpoint = kwargs.get('tuskar_url')
ks_kwargs = {
'username': kwargs.get('os_username'),
'password': kwargs.get('os_password'),
'tenant_id': kwargs.get('os_tenant_id'),
'tenant_name': kwargs.get('os_tenant_name'),
'auth_url': kwargs.get('os_auth_url'),
'service_type': kwargs.get('os_service_type'),
'endpoint_type': kwargs.get('os_endpoint_type'),
'insecure': kwargs.get('insecure'),
}
# create Keystone Client if we miss token or endpoint
if not (token and endpoint):
_ksclient = _get_ksclient(**ks_kwargs)
# get token if needed
if not token:
token = _ksclient.auth_token
# get endpoint if needed
if not endpoint:
endpoint = _get_endpoint(_ksclient, **ks_kwargs)
return token, endpoint
def _get_client_with_token(api_version, **kwargs):
token = kwargs.get('os_auth_token')
endpoint = kwargs.get('tuskar_url')
# test if we have all needed parameters
if token and endpoint:
client_kwargs = {
'token': token,
'insecure': kwargs.get('insecure'),
'timeout': kwargs.get('timeout'),
'ca_file': kwargs.get('ca_file'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
}
# return new Client
return Client(api_version, endpoint, **client_kwargs)
# return None if we do not have all needed parameters
else:
return None
def _get_client_with_credentials(api_version, **kwargs):
username = kwargs.get('os_username')
password = kwargs.get('os_password')
auth_url = kwargs.get('os_auth_url')
tenant_id = kwargs.get('os_tenant_id')
tenant_name = kwargs.get('os_tenant_name')
# test if we have all needed parameters
if (username and password and auth_url and
(tenant_id or tenant_name)):
token, endpoint = _get_token_and_endpoint(**kwargs)
# call for a client with token and endpoint
return _get_client_with_token(api_version, endpoint=endpoint,
token=token, **kwargs)
# returns None if we do not have needed parameters
else:
return None
def get_client(api_version, **kwargs):
"""Get an authtenticated client, based on the credentials
in the keyword args.
@ -54,40 +119,16 @@ def get_client(api_version, **kwargs):
* insecure: allow insecure SSL (no cert verification)
* os_tenant_{name|id}: name or ID of tenant
"""
if kwargs.get('os_auth_token') and kwargs.get('tuskar_url'):
token = kwargs.get('os_auth_token')
endpoint = kwargs.get('tuskar_url')
elif (kwargs.get('os_username') and
kwargs.get('os_password') and
kwargs.get('os_auth_url') and
(kwargs.get('os_tenant_id') or kwargs.get('os_tenant_name'))):
ks_kwargs = {
'username': kwargs.get('os_username'),
'password': kwargs.get('os_password'),
'tenant_id': kwargs.get('os_tenant_id'),
'tenant_name': kwargs.get('os_tenant_name'),
'auth_url': kwargs.get('os_auth_url'),
'service_type': kwargs.get('os_service_type'),
'endpoint_type': kwargs.get('os_endpoint_type'),
'insecure': kwargs.get('insecure'),
}
_ksclient = _get_ksclient(**ks_kwargs)
token = kwargs.get('os_auth_token') or _ksclient.auth_token
endpoint = kwargs.get('tuskar_url') or \
_get_endpoint(_ksclient, **ks_kwargs)
cli_kwargs = {
'token': token,
'insecure': kwargs.get('insecure'),
'timeout': kwargs.get('timeout'),
'ca_file': kwargs.get('ca_file'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
}
return Client(api_version, endpoint, **cli_kwargs)
# Try call for client with token and endpoint.
# If it returns None, call for client with credentials
client = (_get_client_with_token(api_version, **kwargs) or
_get_client_with_credentials(api_version, **kwargs))
# If we have a client, return it
if client:
return client
# otherwise raise error
else:
raise ValueError("Need correct set of parameters")
def Client(version, *args, **kwargs):

View File

@ -70,20 +70,299 @@ class ClientGetKSClientTest(tutils.TestCase):
'auth_url',
])
kwargs = {}
# contruct testing dictionary with redunant keys and missing some
for key in keys | redundant_keys:
if key not in missing_keys:
kwargs[key] = key + '_value'
# construct dict containing expected values - no reduntant keys and
# default for missing keys
expected_args = kwargs.copy()
for key in redundant_keys:
del expected_args[key]
for key in missing_keys:
expected_args[key] = None
kwargs, ksargs = tutils.create_test_dictionary_pair(keys,
redundant_keys,
missing_keys)
self.assertEqual(client._get_ksclient(**kwargs), 'mocked ksclient')
mocked_ksclient.assert_called_with(**expected_args)
mocked_ksclient.assert_called_with(**ksargs)
class ClientGetClientWithTokenTest(tutils.TestCase):
@mock.patch.object(client, 'Client')
def test_it_filters_kwargs(self, mocked_client):
mocked_client.return_value = 'mocked client'
api_version = 1
endpoint = 'http://tuskar.api:1234'
token = 'token'
keys = set([ # keys for Client constructor
'insecure',
'timeout',
'ca_file',
'cert_file',
'key_file',
])
redundant_keys = set([ # key added to see if not passed to constructor
'some_other_key',
'any_other_key',
])
missing_keys = set([ # missing to see if its value defaults to None
'ca_file',
'cert_file',
])
kwargs, client_args = tutils.create_test_dictionary_pair(
keys, redundant_keys, missing_keys)
self.assertEqual(client._get_client_with_token(api_version,
os_auth_token=token,
tuskar_url=endpoint,
**kwargs),
'mocked client')
mocked_client.assert_called_with(api_version, endpoint,
token=token,
**client_args)
def test_it_returns_none_without_token(self):
api_version = 1
endpoint = 'http://tuskar.api:1234'
self.assertEqual(
client._get_client_with_token(api_version,
tuskar_url=endpoint),
None)
def test_it_returns_none_without_endpoint(self):
api_version = 1
token = 'token'
self.assertEqual(
client._get_client_with_token(api_version,
os_auth_token=token),
None)
class ClientGetClientWithCredentialsTest(tutils.TestCase):
def setUp(self):
super(ClientGetClientWithCredentialsTest, self).setUp()
self.kwargs = {
'os_username': 'username',
'os_password': 'password',
'os_tenant_id': 'tenant_id',
'os_tenant_name': 'tenant_name',
'os_auth_url': 'auth_url',
'os_auth_token': 'auth_token',
'tuskar_url': 'tuskar_url',
'os_service_type': 'service_type',
'os_endpoint_type': 'endpoint_type',
'insecure': 'insecure',
}
def test_it_returns_none_without_username(self):
api_version = 1
kwargs = self.kwargs.copy()
del kwargs['os_username']
self.assertEqual(
client._get_client_with_credentials(api_version, **kwargs),
None)
def test_it_returns_none_without_password(self):
api_version = 1
kwargs = self.kwargs.copy()
del kwargs['os_password']
self.assertEqual(
client._get_client_with_credentials(api_version, **kwargs),
None)
def test_it_returns_none_without_auth_url(self):
api_version = 1
kwargs = self.kwargs.copy()
del kwargs['os_auth_url']
self.assertEqual(
client._get_client_with_credentials(api_version, **kwargs),
None)
def test_it_returns_none_without_tenant_id_and_tenant_name(self):
api_version = 1
kwargs = self.kwargs.copy()
del kwargs['os_tenant_id']
del kwargs['os_tenant_name']
self.assertEqual(
client._get_client_with_credentials(api_version, **kwargs),
None)
@mock.patch.object(client, '_get_client_with_token')
@mock.patch.object(client, '_get_token_and_endpoint')
def test_it_calls_get_token_and_endpoint(self,
mocked_get_token_and_endpoint,
mocked_get_client_with_token):
api_version = 1
kwargs = self.kwargs.copy()
mocked_get_token_and_endpoint.return_value = ('token', 'endpoint')
mocked_get_client_with_token.return_value = 'mocked client'
self.assertEqual(
client._get_client_with_credentials(api_version, **kwargs),
'mocked client'
)
mocked_get_token_and_endpoint.assert_called_with(**kwargs)
mocked_get_client_with_token.assert_called_with(api_version,
token='token',
endpoint='endpoint',
**kwargs)
class ClientGetTokenAndEndpointTest(tutils.TestCase):
def setUp(self):
super(ClientGetTokenAndEndpointTest, self).setUp()
self.kwargs = {
'os_username': 'username',
'os_password': 'password',
'os_tenant_id': 'tenant_id',
'os_tenant_name': 'tenant_name',
'os_auth_url': 'auth_url',
'os_service_type': 'service_type',
'os_endpoint_type': 'endpoint_type',
'insecure': 'insecure',
}
self.translation_of_args = {
'os_username': 'username',
'os_password': 'password',
'os_tenant_id': 'tenant_id',
'os_tenant_name': 'tenant_name',
'os_auth_url': 'auth_url',
'os_service_type': 'service_type',
'os_endpoint_type': 'endpoint_type',
}
self.redundant_keys = set(['other_key', 'any_other_key'])
self.missing_keys = set(['os_tenant_name', 'os_service_type'])
@mock.patch.object(client, '_get_endpoint')
@mock.patch.object(client, '_get_ksclient')
def test_it_with_both_token_and_endpoint(self,
mocked_get_ksclient,
mocked_get_endpoint):
kwargs, expected_kwargs = tutils.create_test_dictionary_pair(
set(self.kwargs.keys()),
self.redundant_keys,
self.missing_keys,
**self.translation_of_args)
kwargs['os_auth_token'] = 'token'
kwargs['tuskar_url'] = 'tuskar.api:1234'
self.assertEqual(client._get_token_and_endpoint(**kwargs),
('token', 'tuskar.api:1234'))
self.assertEqual(mocked_get_ksclient.call_count, 0)
self.assertEqual(mocked_get_endpoint.call_count, 0)
@mock.patch.object(client, '_get_endpoint')
@mock.patch.object(client, '_get_ksclient')
def test_it_with_token(self,
mocked_get_ksclient,
mocked_get_endpoint):
kwargs, expected_kwargs = tutils.create_test_dictionary_pair(
set(self.kwargs.keys()),
self.redundant_keys,
self.missing_keys,
**self.translation_of_args)
kwargs['os_auth_token'] = 'token'
mocked_get_endpoint.return_value = 'tuskar.api:1234'
self.assertEqual(client._get_token_and_endpoint(**kwargs),
('token', 'tuskar.api:1234'))
mocked_get_ksclient.assert_called_with(**expected_kwargs)
mocked_get_endpoint.assert_called_with(
mocked_get_ksclient.return_value,
**expected_kwargs)
self.assertEqual(mocked_get_ksclient.return_value.call_count, 0)
@mock.patch.object(client, '_get_endpoint')
@mock.patch.object(client, '_get_ksclient')
def test_it_with_endpoint(self,
mocked_get_ksclient,
mocked_get_endpoint):
kwargs, expected_kwargs = tutils.create_test_dictionary_pair(
set(self.kwargs.keys()),
self.redundant_keys,
self.missing_keys,
**self.translation_of_args)
kwargs['tuskar_url'] = 'tuskar.api:1234'
mocked_get_ksclient.return_value.auth_token = 'token'
self.assertEqual(client._get_token_and_endpoint(**kwargs),
('token', 'tuskar.api:1234'))
mocked_get_ksclient.assert_called_with(**expected_kwargs)
self.assertEqual(mocked_get_endpoint.call_count, 0)
class ClientGetClientTest(tutils.TestCase):
def setUp(self):
super(ClientGetClientTest, self).setUp()
self.kwargs = {
'other_key': 'other_value',
'any_other_key': 'any_other_value',
}
self.api_version = 1
@mock.patch.object(client, '_get_client_with_token')
@mock.patch.object(client, '_get_client_with_credentials')
def test_it_works_with_token(self,
mocked_get_client_with_credentials,
mocked_get_client_with_token):
mocked_get_client_with_token.return_value = 'client'
self.assertEqual(client.get_client(self.api_version, **self.kwargs),
'client')
mocked_get_client_with_token.assert_called_with(self.api_version,
**self.kwargs)
self.assertEqual(mocked_get_client_with_credentials.call_count, 0)
@mock.patch.object(client, '_get_client_with_token')
@mock.patch.object(client, '_get_client_with_credentials')
def test_it_works_with_credentials(self,
mocked_get_client_with_credentials,
mocked_get_client_with_token):
mocked_get_client_with_token.return_value = None
mocked_get_client_with_credentials.return_value = 'client'
self.assertEqual(client.get_client(self.api_version, **self.kwargs),
'client')
mocked_get_client_with_token.assert_called_with(self.api_version,
**self.kwargs)
mocked_get_client_with_credentials.assert_called_with(self.api_version,
**self.kwargs)
@mock.patch.object(client, '_get_client_with_token')
@mock.patch.object(client, '_get_client_with_credentials')
def test_it_raises_error_without_proper_params(
self,
mocked_get_client_with_credentials,
mocked_get_client_with_token):
mocked_get_client_with_token.return_value = None
mocked_get_client_with_credentials.return_value = None
self.assertRaises(ValueError,
client.get_client, self.api_version, **self.kwargs
)
mocked_get_client_with_token.assert_called_with(self.api_version,
**self.kwargs)
mocked_get_client_with_credentials.assert_called_with(self.api_version,
**self.kwargs)
class ClientClientTest(tutils.TestCase):
@mock.patch.object(client.utils, 'import_versioned_module')
def test_it_works(self,
mocked_import_versioned_module):
api_version = 1
args = ['argument', 'parameter']
kwargs = {
'other_key': 'other_value',
'any_other_key': 'any_other_value',
}
mocked_client_class = mock.MagicMock()
mocked_import_versioned_module.return_value.Client = \
mocked_client_class
client.Client(api_version, *args, **kwargs)
mocked_import_versioned_module.assert_called_with(
api_version,
'client'
)
mocked_client_class.assert_called_with(
*args,
**kwargs
)

View File

@ -69,3 +69,55 @@ class FakeResponse(object):
def read(self, amt):
return self.body.read(amt)
def create_test_dictionary_pair(default_keys, redundant_keys, missing_keys,
**kwargs):
"""Creates a pair of dictionaries for testing
This function creates two dictionaries from three sets of keys.
The first returned dictionary contains keys from default_keys,
keys from redundant_keys but is missing keys from missing_keys.
All with value of key + '_value'.
The second returned dictionary contains keys from default_keys
with value of key + '_value' except for keys from missing_keys.
These contains value None.
These two dictionaries can be used in test cases when testing
if tested function filters out set of keys from kwargs
and passes it to other function.
:param default_keys: set of keys expected to be passed on
:param redundant_keys: set of keys expected to be filtered out
:param missing_keys: set of keys missing from passed_dictionary
and expected to be set to None
:param kwargs: key translation pairs. original=new_one will create
original='original_value' in passed_dictionary and
new_one='original_value' in called_dictionary.
"""
passed_dictionary = {}
translations = kwargs
for key in default_keys | redundant_keys:
if key not in missing_keys:
passed_dictionary[key] = key + '_value'
called_dictionary = passed_dictionary.copy()
for key in redundant_keys:
del called_dictionary[key]
for key in missing_keys:
called_dictionary[key] = None
for key in translations:
if key in called_dictionary:
# create new key with name from translations dict
# with original value
called_dictionary[translations[key]] = called_dictionary[key]
# delete original key
del called_dictionary[key]
return passed_dictionary, called_dictionary