Merge "Use Keystone client session.Session"

This commit is contained in:
Jenkins 2014-09-17 07:25:28 +00:00 committed by Gerrit Code Review
commit d9a6f7fd3a
10 changed files with 102 additions and 742 deletions

View File

@ -19,7 +19,9 @@ import logging
import pkg_resources
import sys
from openstackclient.common import restapi
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import session
from openstackclient.identity import client as identity_client
@ -80,24 +82,68 @@ class ClientManager(object):
self._cacert = verify
self._insecure = False
self.session = restapi.RESTApi(
verify=verify,
debug=True,
)
ver_prefix = identity_client.AUTH_VERSIONS[
self._api_version[identity_client.API_NAME]
]
# Get logging from root logger
root_logger = logging.getLogger('')
LOG.setLevel(root_logger.getEffectiveLevel())
restapi_logger = logging.getLogger('restapi')
restapi_logger.setLevel(root_logger.getEffectiveLevel())
# NOTE(dtroyer): These plugins are hard-coded for the first step
# in using the new Keystone auth plugins.
if self._url:
LOG.debug('Using token auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Token(
auth_url=url,
token=token,
)
else:
self.auth = v3_auth.Token(
auth_url=url,
token=token,
)
else:
LOG.debug('Using password auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
tenant_id=project_id,
tenant_name=project_name,
)
else:
self.auth = v3_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
)
self.session = session.Session(
auth=self.auth,
verify=verify,
)
self.auth_ref = None
if not self._url:
# Trigger the auth call
self.auth_ref = self.session.auth.get_auth_ref(self.session)
# Populate other password flow attributes
self.auth_ref = self.identity.auth_ref
self._token = self.identity.auth_token
self._service_catalog = self.identity.service_catalog
self._token = self.session.auth.get_token(self.session)
self._service_catalog = self.auth_ref.service_catalog
return

View File

@ -1,332 +0,0 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""REST API bits"""
import json
import logging
import requests
try:
from urllib.parse import urlencode # noqa
except ImportError:
from urllib import urlencode # noqa
USER_AGENT = 'RAPI'
_logger = logging.getLogger(__name__)
class RESTApi(object):
"""A REST API client that handles the interface from us to the server
RESTApi is requests.Session wrapper that knows how to do:
* JSON serialization/deserialization
* log requests in 'curl' format
* basic API boilerplate for create/delete/list/set/show verbs
* authentication is handled elsewhere and a token is passed in
The expectation that there will be a RESTApi object per authentication
token in use, i.e. project/username/auth_endpoint
On the other hand, a Client knows details about the specific REST Api that
it communicates with, such as the available endpoints, API versions, etc.
"""
def __init__(
self,
session=None,
auth_header=None,
user_agent=USER_AGENT,
verify=True,
logger=None,
debug=None,
):
"""Construct a new REST client
:param object session: A Session object to be used for
communicating with the identity service.
:param string auth_header: A token from an initialized auth_reference
to be used in the X-Auth-Token header
:param string user_agent: Set the User-Agent header in the requests
:param boolean/string verify: If ``True``, the SSL cert will be
verified. A CA_BUNDLE path can also be
provided.
:param logging.Logger logger: A logger to output to. (optional)
:param boolean debug: Enables debug logging of all request and
responses to identity service.
default False (optional)
"""
self.set_auth(auth_header)
self.debug = debug
if not session:
# We create a default session object
session = requests.Session()
self.session = session
self.session.verify = verify
self.session.user_agent = user_agent
if logger:
self.logger = logger
else:
self.logger = _logger
def set_auth(self, auth_header):
"""Sets the current auth blob"""
self.auth_header = auth_header
def set_header(self, header, content):
"""Sets passed in headers into the session headers
Replaces existing headers!!
"""
if content is None:
del self.session.headers[header]
else:
self.session.headers[header] = content
def request(self, method, url, **kwargs):
"""Make an authenticated (if token available) request
:param method: Request HTTP method
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
"""
kwargs.setdefault('headers', {})
if self.auth_header:
kwargs['headers']['X-Auth-Token'] = self.auth_header
if 'json' in kwargs and isinstance(kwargs['json'], type({})):
kwargs['data'] = json.dumps(kwargs.pop('json'))
kwargs['headers']['Content-Type'] = 'application/json'
kwargs.setdefault('allow_redirects', True)
if self.debug:
self._log_request(method, url, **kwargs)
response = self.session.request(method, url, **kwargs)
if self.debug:
self._log_response(response)
return self._error_handler(response)
def _error_handler(self, response):
if response.status_code < 200 or response.status_code >= 300:
self.logger.debug(
"ERROR: %s",
response.text,
)
response.raise_for_status()
return response
# Convenience methods to mimic the ones provided by requests.Session
def delete(self, url, **kwargs):
"""Send a DELETE request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('DELETE', url, **kwargs)
def get(self, url, **kwargs):
"""Send a GET request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('GET', url, **kwargs)
def head(self, url, **kwargs):
"""Send a HEAD request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
kwargs.setdefault('allow_redirects', False)
return self.request('HEAD', url, **kwargs)
def options(self, url, **kwargs):
"""Send an OPTIONS request. Returns :class:`requests.Response` object.
:param url: Request URL
:param \*\*kwargs: Optional arguments passed to ``request``
"""
return self.request('OPTIONS', url, **kwargs)
def patch(self, url, data=None, json=None, **kwargs):
"""Send a PUT request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('PATCH', url, **kwargs)
def post(self, url, data=None, json=None, **kwargs):
"""Send a POST request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('POST', url, **kwargs)
def put(self, url, data=None, json=None, **kwargs):
"""Send a PUT request. Returns :class:`requests.Response` object.
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if json:
kwargs['json'] = json
if data:
kwargs['data'] = data
return self.request('PUT', url, **kwargs)
# Command verb methods
def create(self, url, data=None, response_key=None, **kwargs):
"""Create a new object via a POST request
:param url: Request URL
:param data: Request body, wil be JSON encoded
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('POST', url, json=data, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def list(self, url, data=None, response_key=None, **kwargs):
"""Retrieve a list of objects via a GET or POST request
:param url: Request URL
:param data: Request body, will be JSON encoded
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
if data:
response = self.request('POST', url, json=data, **kwargs)
else:
response = self.request('GET', url, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def set(self, url, data=None, response_key=None, **kwargs):
"""Update an object via a PUT request
:param url: Request URL
:param data: Request body
:param json: Request body to be encoded as JSON
Overwrites ``data`` argument if present
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('PUT', url, json=data)
if data:
if response_key:
return response.json()[response_key]
else:
return response.json()
else:
# Nothing to do here
return None
def show(self, url, response_key=None, **kwargs):
"""Retrieve a single object via a GET request
:param url: Request URL
:param response_key: Dict key in response body to extract
:param \*\*kwargs: Optional arguments passed to ``request``
"""
response = self.request('GET', url, **kwargs)
if response_key:
return response.json()[response_key]
else:
return response.json()
def _log_request(self, method, url, **kwargs):
if 'params' in kwargs and kwargs['params'] != {}:
url += '?' + urlencode(kwargs['params'])
string_parts = [
"curl -i",
"-X '%s'" % method,
"'%s'" % url,
]
for element in kwargs['headers']:
header = " -H '%s: %s'" % (element, kwargs['headers'][element])
string_parts.append(header)
self.logger.debug("REQ: %s" % " ".join(string_parts))
if 'data' in kwargs:
self.logger.debug(" REQ BODY: %r\n" % (kwargs['data']))
def _log_response(self, response):
self.logger.debug(
"RESP: [%s] %r\n",
response.status_code,
response.headers,
)
if response._content_consumed:
self.logger.debug(
" RESP BODY: %s\n",
response.text,
)
self.logger.debug(
" encoding: %s",
response.encoding,
)

View File

@ -29,6 +29,12 @@ API_VERSIONS = {
'3': 'keystoneclient.v3.client.Client',
}
# Translate our API version to auth plugin version prefix
AUTH_VERSIONS = {
'2.0': 'v2',
'3': 'v3',
}
def make_client(instance):
"""Returns an identity service client."""
@ -38,6 +44,8 @@ def make_client(instance):
API_VERSIONS)
LOG.debug('Instantiating identity client: %s', identity_client)
# TODO(dtroyer): Something doesn't like the session.auth when using
# token auth, chase that down.
if instance._url:
LOG.debug('Using token auth')
client = identity_client(
@ -50,32 +58,14 @@ def make_client(instance):
else:
LOG.debug('Using password auth')
client = identity_client(
username=instance._username,
password=instance._password,
user_domain_id=instance._user_domain_id,
user_domain_name=instance._user_domain_name,
project_domain_id=instance._project_domain_id,
project_domain_name=instance._project_domain_name,
domain_id=instance._domain_id,
domain_name=instance._domain_name,
tenant_name=instance._project_name,
tenant_id=instance._project_id,
auth_url=instance._auth_url,
region_name=instance._region_name,
session=instance.session,
cacert=instance._cacert,
insecure=instance._insecure,
trust_id=instance._trust_id,
)
# TODO(dtroyer): the identity v2 role commands use this yet, fix that
# so we can remove it
instance.auth_ref = client.auth_ref
# NOTE(dtroyer): this is hanging around until restapi is replace by
# ksc session
instance.session.set_auth(
client.auth_ref.auth_token,
)
if not instance._url:
instance.auth_ref = instance.auth.get_auth_ref(instance.session)
return client

View File

@ -33,9 +33,8 @@ class IssueToken(show.ShowOne):
def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
identity_client = self.app.client_manager.identity
token = identity_client.service_catalog.get_token()
token = self.app.client_manager.auth_ref.service_catalog.get_token()
token['project_id'] = token.pop('tenant_id')
return zip(*sorted(six.iteritems(token)))

View File

@ -29,7 +29,7 @@ def create_container(
):
"""Create a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to create
:returns: dict of returned headers
@ -53,7 +53,7 @@ def delete_container(
):
"""Delete a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to delete
"""
@ -72,7 +72,7 @@ def list_containers(
):
"""Get containers in an account
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param marker: marker query
:param limit: limit query
@ -127,7 +127,7 @@ def show_container(
):
"""Get container details
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to show
:returns: dict of returned headers

View File

@ -32,7 +32,7 @@ def create_object(
):
"""Create an object, upload it to a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container to store object
:param object: local path to object
@ -61,7 +61,7 @@ def delete_object(
):
"""Delete an object stored in a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: name of container that stores object
:param container: name of object to delete
@ -84,7 +84,7 @@ def list_objects(
):
"""Get objects in a container
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: container name to get a listing for
:param marker: marker query
@ -158,7 +158,7 @@ def show_object(
):
"""Get object details
:param session: a restapi object
:param session: an authenticated keystoneclient.session.Session object
:param url: endpoint
:param container: container name to get a listing for
:returns: dict of object properties

View File

@ -13,8 +13,10 @@
# under the License.
#
import mock
from keystoneclient.auth.identity import v2 as auth_v2
from openstackclient.common import clientmanager
from openstackclient.common import restapi
from openstackclient.tests import utils
@ -25,6 +27,10 @@ USERNAME = "itchy"
PASSWORD = "scratchy"
SERVICE_CATALOG = {'sc': '123'}
API_VERSION = {
'identity': '2.0',
}
def FakeMakeClient(instance):
return FakeClient()
@ -52,6 +58,7 @@ class TestClientCache(utils.TestCase):
self.assertEqual(c.attr, c.attr)
@mock.patch('keystoneclient.session.Session')
class TestClientManager(utils.TestCase):
def setUp(self):
super(TestClientManager, self).setUp()
@ -59,12 +66,13 @@ class TestClientManager(utils.TestCase):
clientmanager.ClientManager.identity = \
clientmanager.ClientCache(FakeMakeClient)
def test_client_manager_token(self):
def test_client_manager_token(self, mock):
client_manager = clientmanager.ClientManager(
token=AUTH_TOKEN,
url=AUTH_URL,
verify=True,
api_version=API_VERSION,
)
self.assertEqual(
@ -76,19 +84,20 @@ class TestClientManager(utils.TestCase):
client_manager._url,
)
self.assertIsInstance(
client_manager.session,
restapi.RESTApi,
client_manager.auth,
auth_v2.Token,
)
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
def test_client_manager_password(self):
def test_client_manager_password(self, mock):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify=False,
api_version=API_VERSION,
)
self.assertEqual(
@ -104,33 +113,20 @@ class TestClientManager(utils.TestCase):
client_manager._password,
)
self.assertIsInstance(
client_manager.session,
restapi.RESTApi,
client_manager.auth,
auth_v2.Password,
)
self.assertTrue(client_manager._insecure)
self.assertFalse(client_manager._verify)
# These need to stick around until the old-style clients are gone
self.assertEqual(
AUTH_REF,
client_manager.auth_ref,
)
self.assertEqual(
AUTH_TOKEN,
client_manager._token,
)
self.assertEqual(
SERVICE_CATALOG,
client_manager._service_catalog,
)
def test_client_manager_password_verify_ca(self):
def test_client_manager_password_verify_ca(self, mock):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify='cafile',
api_version=API_VERSION,
)
self.assertFalse(client_manager._insecure)

View File

@ -1,341 +0,0 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Test rest module"""
import json
import mock
import requests
import six
from openstackclient.common import restapi
from openstackclient.tests import utils
fake_user_agent = 'test_rapi'
fake_auth = '11223344556677889900'
fake_url = 'http://gopher.com'
fake_key = 'gopher'
fake_keys = 'gophers'
fake_gopher_mac = {
'id': 'g1',
'name': 'mac',
'actor': 'Mel Blanc',
}
fake_gopher_tosh = {
'id': 'g2',
'name': 'tosh',
'actor': 'Stan Freeberg',
}
fake_gopher_single = {
fake_key: fake_gopher_mac,
}
fake_gopher_list = {
fake_keys:
[
fake_gopher_mac,
fake_gopher_tosh,
]
}
fake_headers = {
'User-Agent': fake_user_agent,
}
class FakeResponse(requests.Response):
def __init__(self, headers={}, status_code=200, data=None, encoding=None):
super(FakeResponse, self).__init__()
self.status_code = status_code
self.headers.update(headers)
self._content = json.dumps(data)
if not isinstance(self._content, six.binary_type):
self._content = self._content.encode()
@mock.patch('openstackclient.common.restapi.requests.Session')
class TestRESTApi(utils.TestCase):
def test_request_get(self, session_mock):
resp = FakeResponse(status_code=200, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 200)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_get_return_300(self, session_mock):
resp = FakeResponse(status_code=300, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 300)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_get_fail_404(self, session_mock):
resp = FakeResponse(status_code=404, data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
self.assertRaises(requests.HTTPError, api.request, 'GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={},
allow_redirects=True,
)
def test_request_get_auth(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
headers=mock.MagicMock(return_value={}),
)
api = restapi.RESTApi(
auth_header=fake_auth,
user_agent=fake_user_agent,
)
gopher = api.request('GET', fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers={
'X-Auth-Token': fake_auth,
},
allow_redirects=True,
)
self.assertEqual(gopher.json(), fake_gopher_single)
def test_request_post(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi(
user_agent=fake_user_agent,
)
data = fake_gopher_tosh
gopher = api.request('POST', fake_url, json=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers={
'Content-Type': 'application/json',
},
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher.json(), fake_gopher_single)
# Methods
# TODO(dtroyer): add the other method methods
def test_delete(self, session_mock):
resp = FakeResponse(status_code=200, data=None)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
gopher = api.delete(fake_url)
session_mock.return_value.request.assert_called_with(
'DELETE',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher.status_code, 200)
# Commands
def test_create(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
data = fake_gopher_mac
# Test no key
gopher = api.create(fake_url, data=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_single)
# Test with key
gopher = api.create(fake_url, data=data, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_mac)
def test_list(self, session_mock):
resp = FakeResponse(data=fake_gopher_list)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
# test base
api = restapi.RESTApi()
gopher = api.list(fake_url, response_key=fake_keys)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, [fake_gopher_mac, fake_gopher_tosh])
# test body
api = restapi.RESTApi()
data = {'qwerty': 1}
gopher = api.list(fake_url, response_key=fake_keys, data=data)
session_mock.return_value.request.assert_called_with(
'POST',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, [fake_gopher_mac, fake_gopher_tosh])
# test query params
api = restapi.RESTApi()
params = {'qaz': '123'}
gophers = api.list(fake_url, response_key=fake_keys, params=params)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
params=params,
)
self.assertEqual(gophers, [fake_gopher_mac, fake_gopher_tosh])
def test_set(self, session_mock):
new_gopher = fake_gopher_single
new_gopher[fake_key]['name'] = 'Chip'
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
data = fake_gopher_mac
data['name'] = 'Chip'
# Test no data, no key
gopher = api.set(fake_url)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
json=None,
)
self.assertEqual(gopher, None)
# Test data, no key
gopher = api.set(fake_url, data=data)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_single)
# NOTE:(dtroyer): Key and no data is not tested as without data
# the response_key is moot
# Test data and key
gopher = api.set(fake_url, data=data, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'PUT',
fake_url,
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(data),
)
self.assertEqual(gopher, fake_gopher_mac)
def test_show(self, session_mock):
resp = FakeResponse(data=fake_gopher_single)
session_mock.return_value = mock.MagicMock(
request=mock.MagicMock(return_value=resp),
)
api = restapi.RESTApi()
# Test no key
gopher = api.show(fake_url)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, fake_gopher_single)
# Test with key
gopher = api.show(fake_url, response_key=fake_key)
session_mock.return_value.request.assert_called_with(
'GET',
fake_url,
headers=mock.ANY,
allow_redirects=True,
)
self.assertEqual(gopher, fake_gopher_mac)

View File

@ -125,7 +125,6 @@ class FakeIdentityv2Client(object):
def __init__(self, **kwargs):
self.roles = mock.Mock()
self.roles.resource_class = fakes.FakeResource(None, {})
self.service_catalog = mock.Mock()
self.services = mock.Mock()
self.services.resource_class = fakes.FakeResource(None, {})
self.tenants = mock.Mock()

View File

@ -13,6 +13,8 @@
# under the License.
#
import mock
from openstackclient.identity.v2_0 import token
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
@ -23,8 +25,9 @@ class TestToken(identity_fakes.TestIdentityv2):
super(TestToken, self).setUp()
# Get a shortcut to the Service Catalog Mock
self.sc_mock = self.app.client_manager.identity.service_catalog
self.sc_mock.reset_mock()
self.sc_mock = mock.Mock()
self.app.client_manager.auth_ref = mock.Mock()
self.app.client_manager.auth_ref.service_catalog = self.sc_mock
class TestTokenIssue(TestToken):