
The message you got upon successful user update was misleading as it told you all of the fields were updated, even if no changes were made to those fields. Also there was an issue where in this user edit form, where if can_edit_user was False in the local_settings file, you could not change the default project for a user. Changed the stubbing behavior for tests as well to allow the use of a decorator to stub your tests using Mox instead of having to clutter up the actual test with repetitive Mox stubs. Fixes Bug #993572 Change-Id: Iaebd6525bb4509a2ec6b350df432f4f60dd35114
336 lines
14 KiB
Python
336 lines
14 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Copyright 2012 Nebula, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
|
|
import cloudfiles as swift_client
|
|
from django import http
|
|
from django import test as django_test
|
|
from django.conf import settings
|
|
from django.contrib.messages.storage import default_storage
|
|
from django.core.handlers import wsgi
|
|
from django.test.client import RequestFactory
|
|
from functools import wraps
|
|
from glanceclient.v1 import client as glance_client
|
|
from keystoneclient.v2_0 import client as keystone_client
|
|
from novaclient.v1_1 import client as nova_client
|
|
import httplib2
|
|
import mox
|
|
|
|
from horizon import api
|
|
from horizon import context_processors
|
|
from horizon import middleware
|
|
from horizon import users
|
|
from horizon.tests.test_data.utils import load_test_data
|
|
|
|
from .time import time
|
|
from .time import today
|
|
from .time import utcnow
|
|
|
|
|
|
# Makes output of failing mox tests much easier to read.
|
|
wsgi.WSGIRequest.__repr__ = lambda self: "<class 'django.http.HttpRequest'>"
|
|
|
|
|
|
def create_stubs(stubs_to_create={}):
|
|
if not isinstance(stubs_to_create, dict):
|
|
raise TypeError, ("create_stub must be passed a dict, but a %s was " \
|
|
"given." % type(stubs_to_create).__name__)
|
|
|
|
def inner_stub_out(fn):
|
|
@wraps(fn)
|
|
def instance_stub_out(self):
|
|
for key in stubs_to_create:
|
|
if not (isinstance(stubs_to_create[key], tuple) or \
|
|
isinstance(stubs_to_create[key], list)):
|
|
raise TypeError, ("The values of the create_stub " \
|
|
"dict must be lists or tuples, but is a %s." %
|
|
type(stubs_to_create[key]).__name__)
|
|
|
|
for value in stubs_to_create[key]:
|
|
self.mox.StubOutWithMock(key, value)
|
|
return fn(self)
|
|
return instance_stub_out
|
|
return inner_stub_out
|
|
|
|
|
|
class RequestFactoryWithMessages(RequestFactory):
|
|
def get(self, *args, **kwargs):
|
|
req = super(RequestFactoryWithMessages, self).get(*args, **kwargs)
|
|
req.session = []
|
|
req._messages = default_storage(req)
|
|
return req
|
|
|
|
def post(self, *args, **kwargs):
|
|
req = super(RequestFactoryWithMessages, self).post(*args, **kwargs)
|
|
req.session = []
|
|
req._messages = default_storage(req)
|
|
return req
|
|
|
|
|
|
class TestCase(django_test.TestCase):
|
|
"""
|
|
Specialized base test case class for Horizon which gives access to
|
|
numerous additional features:
|
|
|
|
* A full suite of test data through various attached objects and
|
|
managers (e.g. ``self.servers``, ``self.user``, etc.). See the
|
|
docs for :class:`~horizon.tests.test_data.utils.TestData` for more
|
|
information.
|
|
* The ``mox`` mocking framework via ``self.mox``.
|
|
* A set of request context data via ``self.context``.
|
|
* A ``RequestFactory`` class which supports Django's ``contrib.messages``
|
|
framework via ``self.factory``.
|
|
* A ready-to-go request object via ``self.request``.
|
|
* The ability to override specific time data controls for easier testing.
|
|
* Several handy additional assertion methods.
|
|
"""
|
|
def setUp(self):
|
|
load_test_data(self)
|
|
self.mox = mox.Mox()
|
|
self.factory = RequestFactoryWithMessages()
|
|
self.context = {'authorized_tenants': self.tenants.list()}
|
|
|
|
def fake_conn_request(*args, **kwargs):
|
|
raise Exception("An external URI request tried to escape through "
|
|
"an httplib2 client. Args: %s, kwargs: %s"
|
|
% (args, kwargs))
|
|
|
|
self._real_conn_request = httplib2.Http._conn_request
|
|
httplib2.Http._conn_request = fake_conn_request
|
|
|
|
self._real_horizon_context_processor = context_processors.horizon
|
|
context_processors.horizon = lambda request: self.context
|
|
|
|
self._real_get_user_from_request = users.get_user_from_request
|
|
tenants = self.context['authorized_tenants']
|
|
self.setActiveUser(id=self.user.id,
|
|
token=self.token.id,
|
|
username=self.user.name,
|
|
tenant_id=self.tenant.id,
|
|
service_catalog=self.service_catalog,
|
|
authorized_tenants=tenants)
|
|
self.request = http.HttpRequest()
|
|
self.request.session = self.client._session()
|
|
self.request.session['token'] = self.token.id
|
|
middleware.HorizonMiddleware().process_request(self.request)
|
|
|
|
def tearDown(self):
|
|
self.mox.UnsetStubs()
|
|
httplib2.Http._conn_request = self._real_conn_request
|
|
context_processors.horizon = self._real_horizon_context_processor
|
|
users.get_user_from_request = self._real_get_user_from_request
|
|
self.mox.VerifyAll()
|
|
|
|
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
|
|
service_catalog=None, tenant_name=None, roles=None,
|
|
authorized_tenants=None):
|
|
users.get_user_from_request = lambda x: \
|
|
users.User(id=id,
|
|
token=token,
|
|
user=username,
|
|
tenant_id=tenant_id,
|
|
service_catalog=service_catalog,
|
|
roles=roles,
|
|
authorized_tenants=authorized_tenants,
|
|
request=self.request)
|
|
|
|
def override_times(self):
|
|
""" Overrides the "current" time with immutable values. """
|
|
now = datetime.datetime.utcnow()
|
|
time.override_time = \
|
|
datetime.time(now.hour, now.minute, now.second)
|
|
today.override_time = datetime.date(now.year, now.month, now.day)
|
|
utcnow.override_time = now
|
|
return now
|
|
|
|
def reset_times(self):
|
|
""" Undoes the changes made by ``override_times``. """
|
|
time.override_time = None
|
|
today.override_time = None
|
|
utcnow.override_time = None
|
|
|
|
def assertRedirectsNoFollow(self, response, expected_url):
|
|
"""
|
|
Asserts that the given response issued a 302 redirect without
|
|
processing the view which is redirected to.
|
|
"""
|
|
assert (response.status_code / 100 == 3), \
|
|
"The response did not return a redirect."
|
|
self.assertEqual(response._headers.get('location', None),
|
|
('Location', settings.TESTSERVER + expected_url))
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
def assertNoMessages(self, response=None):
|
|
"""
|
|
Asserts that no messages have been attached by the ``contrib.messages``
|
|
framework.
|
|
"""
|
|
self.assertMessageCount(response, success=0, warn=0, info=0, error=0)
|
|
|
|
def assertMessageCount(self, response=None, **kwargs):
|
|
"""
|
|
Asserts that the specified number of messages have been attached
|
|
for various message types. Usage would look like
|
|
``self.assertMessageCount(success=1)``.
|
|
"""
|
|
temp_req = self.client.request(**{'wsgi.input': None})
|
|
temp_req.COOKIES = self.client.cookies
|
|
storage = default_storage(temp_req)
|
|
messages = []
|
|
|
|
if response is None:
|
|
# To gain early access to the messages we have to decode the
|
|
# cookie on the test client.
|
|
if 'messages' in self.client.cookies:
|
|
message_cookie = self.client.cookies['messages'].value
|
|
messages = storage._decode(message_cookie)
|
|
# Check for messages in the context
|
|
elif hasattr(response, "context") and "messages" in response.context:
|
|
messages = response.context["messages"]
|
|
# Check for messages attached to the request on a TemplateResponse
|
|
elif hasattr(response, "_request") and hasattr(response._request,
|
|
"_messages"):
|
|
messages = response._request._messages._queued_messages
|
|
|
|
# If we don't have messages and we don't expect messages, we're done.
|
|
if not any(kwargs.values()) and not messages:
|
|
return
|
|
|
|
# If we expected messages and have none, that's a problem.
|
|
if any(kwargs.values()) and not messages:
|
|
error_msg = "Messages were expected, but none were set."
|
|
assert 0 == sum(kwargs.values()), error_msg
|
|
|
|
# Otherwise, make sure we got the expected messages.
|
|
for msg_type, count in kwargs.items():
|
|
msgs = [m.message for m in messages if msg_type in m.tags]
|
|
assert len(msgs) == count, \
|
|
"%s messages not as expected: %s" % (msg_type.title(),
|
|
", ".join(msgs))
|
|
|
|
def assertNoFormErrors(self, response, context_name="form"):
|
|
"""
|
|
Asserts that the response either does not contain a form in it's
|
|
context, or that if it does, that form has no errors.
|
|
"""
|
|
context = getattr(response, "context", {})
|
|
if not context or context_name not in context:
|
|
return True
|
|
errors = response.context[context_name]._errors
|
|
assert len(errors) == 0, \
|
|
"Unexpected errors were found on the form: %s" % errors
|
|
|
|
def assertFormErrors(self, response, count=0, message=None,
|
|
context_name="form"):
|
|
"""
|
|
Asserts that the response does contain a form in it's
|
|
context, and that form has errors, if count were given,
|
|
it must match the exact numbers of errors
|
|
"""
|
|
context = getattr(response, "context", {})
|
|
assert (context and context_name in context), \
|
|
"The response did not contain a form."
|
|
errors = response.context[context_name]._errors
|
|
if count:
|
|
assert len(errors) == count, \
|
|
"%d errors were found on the form, %d expected" % \
|
|
(len(errors), count)
|
|
if message and message not in unicode(errors):
|
|
self.fail("Expected message not found, instead found: %s"
|
|
% ["%s: %s" % (key, [e for e in field_errors]) for
|
|
(key, field_errors) in errors.items()])
|
|
else:
|
|
assert len(errors) > 0, "No errors were found on the form"
|
|
|
|
|
|
class BaseAdminViewTests(TestCase):
|
|
"""
|
|
A ``TestCase`` subclass which sets an active user with the "admin" role
|
|
for testing admin-only views and functionality.
|
|
"""
|
|
def setActiveUser(self, *args, **kwargs):
|
|
if "roles" not in kwargs:
|
|
kwargs['roles'] = [self.roles.admin._info]
|
|
super(BaseAdminViewTests, self).setActiveUser(*args, **kwargs)
|
|
|
|
|
|
class APITestCase(TestCase):
|
|
"""
|
|
The ``APITestCase`` class is for use with tests which deal with the
|
|
underlying clients rather than stubbing out the horizon.api.* methods.
|
|
"""
|
|
def setUp(self):
|
|
super(APITestCase, self).setUp()
|
|
|
|
def fake_keystoneclient(request, username=None, password=None,
|
|
tenant_id=None, token_id=None, endpoint=None,
|
|
admin=False):
|
|
"""
|
|
Wrapper function which returns the stub keystoneclient. Only
|
|
necessary because the function takes too many arguments to
|
|
conveniently be a lambda.
|
|
"""
|
|
return self.stub_keystoneclient()
|
|
|
|
# Store the original clients
|
|
self._original_glanceclient = api.glance.glanceclient
|
|
self._original_keystoneclient = api.keystone.keystoneclient
|
|
self._original_novaclient = api.nova.novaclient
|
|
|
|
# Replace the clients with our stubs.
|
|
api.glance.glanceclient = lambda request: self.stub_glanceclient()
|
|
api.keystone.keystoneclient = fake_keystoneclient
|
|
api.nova.novaclient = lambda request: self.stub_novaclient()
|
|
|
|
def tearDown(self):
|
|
super(APITestCase, self).tearDown()
|
|
api.glance.glanceclient = self._original_glanceclient
|
|
api.nova.novaclient = self._original_novaclient
|
|
api.keystone.keystoneclient = self._original_keystoneclient
|
|
|
|
def stub_novaclient(self):
|
|
if not hasattr(self, "novaclient"):
|
|
self.mox.StubOutWithMock(nova_client, 'Client')
|
|
self.novaclient = self.mox.CreateMock(nova_client.Client)
|
|
return self.novaclient
|
|
|
|
def stub_keystoneclient(self):
|
|
if not hasattr(self, "keystoneclient"):
|
|
self.mox.StubOutWithMock(keystone_client, 'Client')
|
|
self.keystoneclient = self.mox.CreateMock(keystone_client.Client)
|
|
return self.keystoneclient
|
|
|
|
def stub_glanceclient(self):
|
|
if not hasattr(self, "glanceclient"):
|
|
self.mox.StubOutWithMock(glance_client, 'Client')
|
|
self.glanceclient = self.mox.CreateMock(glance_client.Client)
|
|
return self.glanceclient
|
|
|
|
def stub_swiftclient(self, expected_calls=1):
|
|
if not hasattr(self, "swiftclient"):
|
|
self.mox.StubOutWithMock(swift_client, 'Connection')
|
|
self.swiftclient = self.mox.CreateMock(swift_client.Connection)
|
|
while expected_calls:
|
|
swift_client.Connection(auth=mox.IgnoreArg())\
|
|
.AndReturn(self.swiftclient)
|
|
expected_calls -= 1
|
|
return self.swiftclient
|