diff --git a/swauth/middleware.py b/swauth/middleware.py index 495bea8..9b1ec58 100644 --- a/swauth/middleware.py +++ b/swauth/middleware.py @@ -425,7 +425,9 @@ class Swauth(object): Returns a standard WSGI response callable with the status of 403 or 401 depending on whether the REMOTE_USER is set or not. """ - if req.remote_user: + if not hasattr(req, 'credentials_valid'): + req.credentials_valid = None + if req.remote_user or req.credentials_valid: return HTTPForbidden(request=req) else: return HTTPUnauthorized(request=req) @@ -540,7 +542,7 @@ class Swauth(object): :returns: swob.Response, 204 on success """ if not self.is_super_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) path = quote('/v1/%s' % self.auth_account) resp = self.make_pre_authed_request( req.environ, 'PUT', path).get_response(self.app) @@ -580,7 +582,7 @@ class Swauth(object): explained above. """ if not self.is_reseller_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) listing = [] marker = '' while True: @@ -625,7 +627,7 @@ class Swauth(object): if req.path_info or not account or account[0] == '.': return HTTPBadRequest(request=req) if not self.is_account_admin(req, account): - return HTTPForbidden(request=req) + return self.denied_response(req) path = quote('/v1/%s/%s/.services' % (self.auth_account, account)) resp = self.make_pre_authed_request( req.environ, 'GET', path).get_response(self.app) @@ -696,7 +698,7 @@ class Swauth(object): dict as described above """ if not self.is_reseller_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) account = req.path_info_pop() if req.path_info != '/.services' or not account or account[0] == '.': return HTTPBadRequest(request=req) @@ -742,7 +744,7 @@ class Swauth(object): :returns: swob.Response, 2xx on success. """ if not self.is_reseller_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) account = req.path_info_pop() if req.path_info or not account or account[0] == '.': return HTTPBadRequest(request=req) @@ -827,7 +829,7 @@ class Swauth(object): :returns: swob.Response, 2xx on success. """ if not self.is_reseller_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) account = req.path_info_pop() if req.path_info or not account or account[0] == '.': return HTTPBadRequest(request=req) @@ -953,7 +955,7 @@ class Swauth(object): (user[0] == '.' and user != '.groups'): return HTTPBadRequest(request=req) if not self.is_account_admin(req, account): - return HTTPForbidden(request=req) + return self.denied_response(req) if user == '.groups': # TODO: This could be very slow for accounts with a really large # number of users. Speed could be improved by concurrently @@ -1003,7 +1005,7 @@ class Swauth(object): not self.is_reseller_admin(req)) or \ ('.reseller_admin' in display_groups and not self.is_super_admin(req)): - return HTTPForbidden(request=req) + return self.denied_response(req) return Response(body=body) def handle_put_user(self, req): @@ -1036,9 +1038,9 @@ class Swauth(object): return HTTPBadRequest(request=req) if reseller_admin: if not self.is_super_admin(req): - return HTTPForbidden(request=req) + return self.denied_response(req) elif not self.is_account_admin(req, account): - return HTTPForbidden(request=req) + return self.denied_response(req) path = quote('/v1/%s/%s' % (self.auth_account, account)) resp = self.make_pre_authed_request( @@ -1086,7 +1088,7 @@ class Swauth(object): user[0] == '.': return HTTPBadRequest(request=req) if not self.is_account_admin(req, account): - return HTTPForbidden(request=req) + return self.denied_response(req) # Delete the user's existing token, if any. path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user)) resp = self.make_pre_authed_request( @@ -1469,6 +1471,7 @@ class Swauth(object): to retrieve the admin_detail itself. :param returns: True if .reseller_admin. """ + req.credentials_valid = False if self.is_super_admin(req): return True if not admin_detail: @@ -1476,6 +1479,7 @@ class Swauth(object): if not self.credentials_match(admin_detail, req.headers.get('x-auth-admin-key')): return False + req.credentials_valid = True return '.reseller_admin' in (g['name'] for g in admin_detail['groups']) def is_account_admin(self, req, account): @@ -1487,6 +1491,7 @@ class Swauth(object): :param account: The account to check for .admin against. :param returns: True if .admin. """ + req.credentials_valid = False if self.is_super_admin(req): return True admin_detail = self.get_admin_detail(req) @@ -1496,6 +1501,7 @@ class Swauth(object): if not self.credentials_match(admin_detail, req.headers.get('x-auth-admin-key')): return False + req.credentials_valid = True return admin_detail and admin_detail['account'] == account and \ '.admin' in (g['name'] for g in admin_detail['groups']) return False diff --git a/test_swauth/unit/test_middleware.py b/test_swauth/unit/test_middleware.py index 9029f89..960b4b3 100644 --- a/test_swauth/unit/test_middleware.py +++ b/test_swauth/unit/test_middleware.py @@ -1061,26 +1061,26 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super_admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Auth-Admin-User': '.super_admin', 'X-Auth-Admin-Key': 'upertest'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Auth-Admin-User': '.super_admin'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) resp = Request.blank('/auth/v2/.prep', environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) def test_prep_fail_account_create(self): self.test_auth.app = FakeApp(iter([ @@ -1171,7 +1171,7 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -1310,7 +1310,7 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -1478,7 +1478,7 @@ class TestAuth(unittest.TestCase): 'X-Auth-Admin-Key': 'supertest'}, body=json.dumps({'storage': {'local': 'new_value'}}) ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -1674,7 +1674,7 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -1991,7 +1991,7 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -2491,7 +2491,7 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-Admin-User': 'super:admin', 'X-Auth-Admin-Key': 'supertest'}, ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ @@ -2709,7 +2709,7 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ @@ -2725,7 +2725,7 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) self.test_auth.app = FakeApp(iter([ @@ -2740,7 +2740,7 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Key': 'key', 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) + self.assertEquals(resp.status_int, 401) self.assertEquals(self.test_auth.app.calls, 0) def test_put_user_account_admin_fail_bad_creds(self):