Merge pull request #57 from prashanthpai/401-403
Distinguish between 401(Unauthorized) and 403(Forbidden)
This commit is contained in:
commit
8c31e80639
@ -425,7 +425,9 @@ class Swauth(object):
|
||||
Returns a standard WSGI response callable with the status of 403 or 401
|
||||
depending on whether the REMOTE_USER is set or not.
|
||||
"""
|
||||
if req.remote_user:
|
||||
if not hasattr(req, 'credentials_valid'):
|
||||
req.credentials_valid = None
|
||||
if req.remote_user or req.credentials_valid:
|
||||
return HTTPForbidden(request=req)
|
||||
else:
|
||||
return HTTPUnauthorized(request=req)
|
||||
@ -540,7 +542,7 @@ class Swauth(object):
|
||||
:returns: swob.Response, 204 on success
|
||||
"""
|
||||
if not self.is_super_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
path = quote('/v1/%s' % self.auth_account)
|
||||
resp = self.make_pre_authed_request(
|
||||
req.environ, 'PUT', path).get_response(self.app)
|
||||
@ -580,7 +582,7 @@ class Swauth(object):
|
||||
explained above.
|
||||
"""
|
||||
if not self.is_reseller_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
listing = []
|
||||
marker = ''
|
||||
while True:
|
||||
@ -625,7 +627,7 @@ class Swauth(object):
|
||||
if req.path_info or not account or account[0] == '.':
|
||||
return HTTPBadRequest(request=req)
|
||||
if not self.is_account_admin(req, account):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
path = quote('/v1/%s/%s/.services' % (self.auth_account, account))
|
||||
resp = self.make_pre_authed_request(
|
||||
req.environ, 'GET', path).get_response(self.app)
|
||||
@ -696,7 +698,7 @@ class Swauth(object):
|
||||
dict as described above
|
||||
"""
|
||||
if not self.is_reseller_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
account = req.path_info_pop()
|
||||
if req.path_info != '/.services' or not account or account[0] == '.':
|
||||
return HTTPBadRequest(request=req)
|
||||
@ -742,7 +744,7 @@ class Swauth(object):
|
||||
:returns: swob.Response, 2xx on success.
|
||||
"""
|
||||
if not self.is_reseller_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
account = req.path_info_pop()
|
||||
if req.path_info or not account or account[0] == '.':
|
||||
return HTTPBadRequest(request=req)
|
||||
@ -827,7 +829,7 @@ class Swauth(object):
|
||||
:returns: swob.Response, 2xx on success.
|
||||
"""
|
||||
if not self.is_reseller_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
account = req.path_info_pop()
|
||||
if req.path_info or not account or account[0] == '.':
|
||||
return HTTPBadRequest(request=req)
|
||||
@ -953,7 +955,7 @@ class Swauth(object):
|
||||
(user[0] == '.' and user != '.groups'):
|
||||
return HTTPBadRequest(request=req)
|
||||
if not self.is_account_admin(req, account):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
if user == '.groups':
|
||||
# TODO: This could be very slow for accounts with a really large
|
||||
# number of users. Speed could be improved by concurrently
|
||||
@ -1003,7 +1005,7 @@ class Swauth(object):
|
||||
not self.is_reseller_admin(req)) or \
|
||||
('.reseller_admin' in display_groups and
|
||||
not self.is_super_admin(req)):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
return Response(body=body)
|
||||
|
||||
def handle_put_user(self, req):
|
||||
@ -1036,9 +1038,9 @@ class Swauth(object):
|
||||
return HTTPBadRequest(request=req)
|
||||
if reseller_admin:
|
||||
if not self.is_super_admin(req):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
elif not self.is_account_admin(req, account):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
|
||||
path = quote('/v1/%s/%s' % (self.auth_account, account))
|
||||
resp = self.make_pre_authed_request(
|
||||
@ -1086,7 +1088,7 @@ class Swauth(object):
|
||||
user[0] == '.':
|
||||
return HTTPBadRequest(request=req)
|
||||
if not self.is_account_admin(req, account):
|
||||
return HTTPForbidden(request=req)
|
||||
return self.denied_response(req)
|
||||
# Delete the user's existing token, if any.
|
||||
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
|
||||
resp = self.make_pre_authed_request(
|
||||
@ -1469,6 +1471,7 @@ class Swauth(object):
|
||||
to retrieve the admin_detail itself.
|
||||
:param returns: True if .reseller_admin.
|
||||
"""
|
||||
req.credentials_valid = False
|
||||
if self.is_super_admin(req):
|
||||
return True
|
||||
if not admin_detail:
|
||||
@ -1476,6 +1479,7 @@ class Swauth(object):
|
||||
if not self.credentials_match(admin_detail,
|
||||
req.headers.get('x-auth-admin-key')):
|
||||
return False
|
||||
req.credentials_valid = True
|
||||
return '.reseller_admin' in (g['name'] for g in admin_detail['groups'])
|
||||
|
||||
def is_account_admin(self, req, account):
|
||||
@ -1487,6 +1491,7 @@ class Swauth(object):
|
||||
:param account: The account to check for .admin against.
|
||||
:param returns: True if .admin.
|
||||
"""
|
||||
req.credentials_valid = False
|
||||
if self.is_super_admin(req):
|
||||
return True
|
||||
admin_detail = self.get_admin_detail(req)
|
||||
@ -1496,6 +1501,7 @@ class Swauth(object):
|
||||
if not self.credentials_match(admin_detail,
|
||||
req.headers.get('x-auth-admin-key')):
|
||||
return False
|
||||
req.credentials_valid = True
|
||||
return admin_detail and admin_detail['account'] == account and \
|
||||
'.admin' in (g['name'] for g in admin_detail['groups'])
|
||||
return False
|
||||
|
@ -1061,26 +1061,26 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super_admin',
|
||||
'X-Auth-Admin-Key': 'supertest'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
resp = Request.blank('/auth/v2/.prep',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Auth-Admin-User': '.super_admin',
|
||||
'X-Auth-Admin-Key': 'upertest'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
resp = Request.blank('/auth/v2/.prep',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Auth-Admin-User': '.super_admin'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
resp = Request.blank('/auth/v2/.prep',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Auth-Admin-Key': 'supertest'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
resp = Request.blank('/auth/v2/.prep',
|
||||
environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
|
||||
def test_prep_fail_account_create(self):
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -1171,7 +1171,7 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super:admin',
|
||||
'X-Auth-Admin-Key': 'supertest'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -1310,7 +1310,7 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super:admin',
|
||||
'X-Auth-Admin-Key': 'supertest'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -1478,7 +1478,7 @@ class TestAuth(unittest.TestCase):
|
||||
'X-Auth-Admin-Key': 'supertest'},
|
||||
body=json.dumps({'storage': {'local': 'new_value'}})
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -1674,7 +1674,7 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super:admin',
|
||||
'X-Auth-Admin-Key': 'supertest'},
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -1991,7 +1991,7 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super:admin',
|
||||
'X-Auth-Admin-Key': 'supertest'},
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -2491,7 +2491,7 @@ class TestAuth(unittest.TestCase):
|
||||
headers={'X-Auth-Admin-User': 'super:admin',
|
||||
'X-Auth-Admin-Key': 'supertest'},
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 1)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -2709,7 +2709,7 @@ class TestAuth(unittest.TestCase):
|
||||
'X-Auth-User-Key': 'key',
|
||||
'X-Auth-User-Reseller-Admin': 'true'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 0)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -2725,7 +2725,7 @@ class TestAuth(unittest.TestCase):
|
||||
'X-Auth-User-Key': 'key',
|
||||
'X-Auth-User-Reseller-Admin': 'true'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 0)
|
||||
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
@ -2740,7 +2740,7 @@ class TestAuth(unittest.TestCase):
|
||||
'X-Auth-User-Key': 'key',
|
||||
'X-Auth-User-Reseller-Admin': 'true'}
|
||||
).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 403)
|
||||
self.assertEquals(resp.status_int, 401)
|
||||
self.assertEquals(self.test_auth.app.calls, 0)
|
||||
|
||||
def test_put_user_account_admin_fail_bad_creds(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user