Fix bug where admin is able to delete reseller_admin
Changed the code to only allow the super_admin to delete a reseller_admin. This follows the same logic of user creation, where only the super_admin can create a reseller_admin. Also, took the opportunity to refactor some code and implemented get_user_detail method to remove duplicated code Bug 1260239: https://bugs.launchpad.net/gluster-swift/+bug/1260239 Change-Id: I9e4866cd7ad08698f427846be566ab2364ad4850 Signed-off-by: Thiago da Silva <thiago@redhat.com> Reviewed-on: http://review.gluster.org/6516 Reviewed-by: Luis Pabon <lpabon@redhat.com> Tested-by: Luis Pabon <lpabon@redhat.com>
This commit is contained in:
parent
ac55cfb30b
commit
205a6e4aa7
@ -908,6 +908,10 @@ class Swauth(object):
|
|||||||
return HTTPBadRequest(request=req)
|
return HTTPBadRequest(request=req)
|
||||||
if not self.is_account_admin(req, account):
|
if not self.is_account_admin(req, account):
|
||||||
return self.denied_response(req)
|
return self.denied_response(req)
|
||||||
|
|
||||||
|
# get information for each user for the specified
|
||||||
|
# account and create a list of all groups that the users
|
||||||
|
# are part of
|
||||||
if user == '.groups':
|
if user == '.groups':
|
||||||
# TODO: This could be very slow for accounts with a really large
|
# TODO: This could be very slow for accounts with a really large
|
||||||
# number of users. Speed could be improved by concurrently
|
# number of users. Speed could be improved by concurrently
|
||||||
@ -932,28 +936,25 @@ class Swauth(object):
|
|||||||
break
|
break
|
||||||
for obj in sublisting:
|
for obj in sublisting:
|
||||||
if obj['name'][0] != '.':
|
if obj['name'][0] != '.':
|
||||||
path = quote('/v1/%s/%s/%s' % (self.auth_account,
|
|
||||||
account, obj['name']))
|
# get list of groups for each user
|
||||||
resp = self.make_pre_authed_request(
|
user_json = self.get_user_detail(req, account,
|
||||||
req.environ, 'GET', path).get_response(self.app)
|
obj['name'])
|
||||||
if resp.status_int // 100 != 2:
|
if user_json is None:
|
||||||
raise Exception('Could not retrieve user object: '
|
raise Exception('Could not retrieve user object: '
|
||||||
'%s %s' % (path, resp.status))
|
'%s:%s %s' % (account, user, 404))
|
||||||
groups.update(
|
groups.update(
|
||||||
g['name'] for g in json.loads(resp.body)['groups'])
|
g['name'] for g in json.loads(user_json)['groups'])
|
||||||
marker = sublisting[-1]['name'].encode('utf-8')
|
marker = sublisting[-1]['name'].encode('utf-8')
|
||||||
body = json.dumps(
|
body = json.dumps(
|
||||||
{'groups': [{'name': g} for g in sorted(groups)]})
|
{'groups': [{'name': g} for g in sorted(groups)]})
|
||||||
else:
|
else:
|
||||||
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
|
# get information for specific user,
|
||||||
resp = self.make_pre_authed_request(
|
# if user doesn't exist, return HTTPNotFound
|
||||||
req.environ, 'GET', path).get_response(self.app)
|
body = self.get_user_detail(req, account, user)
|
||||||
if resp.status_int == 404:
|
if body is None:
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
if resp.status_int // 100 != 2:
|
|
||||||
raise Exception('Could not retrieve user object: %s %s' %
|
|
||||||
(path, resp.status))
|
|
||||||
body = resp.body
|
|
||||||
display_groups = [g['name'] for g in json.loads(body)['groups']]
|
display_groups = [g['name'] for g in json.loads(body)['groups']]
|
||||||
if ('.admin' in display_groups and
|
if ('.admin' in display_groups and
|
||||||
not self.is_reseller_admin(req)) or \
|
not self.is_reseller_admin(req)) or \
|
||||||
@ -1054,8 +1055,19 @@ class Swauth(object):
|
|||||||
if req.path_info or not account or account[0] == '.' or not user or \
|
if req.path_info or not account or account[0] == '.' or not user or \
|
||||||
user[0] == '.':
|
user[0] == '.':
|
||||||
return HTTPBadRequest(request=req)
|
return HTTPBadRequest(request=req)
|
||||||
|
|
||||||
|
# if user to be deleted is reseller_admin, then requesting
|
||||||
|
# user must also be a reseller_admin
|
||||||
|
is_reseller_admin = self.is_user_reseller_admin(req, account, user)
|
||||||
|
if not is_reseller_admin and not req.credentials_valid:
|
||||||
|
# if user to be deleted can't be found, return 404
|
||||||
|
return HTTPNotFound(request=req)
|
||||||
|
elif is_reseller_admin and not self.is_super_admin(req):
|
||||||
|
return HTTPForbidden(request=req)
|
||||||
|
|
||||||
if not self.is_account_admin(req, account):
|
if not self.is_account_admin(req, account):
|
||||||
return self.denied_response(req)
|
return self.denied_response(req)
|
||||||
|
|
||||||
# Delete the user's existing token, if any.
|
# Delete the user's existing token, if any.
|
||||||
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
|
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
|
||||||
resp = self.make_pre_authed_request(
|
resp = self.make_pre_authed_request(
|
||||||
@ -1084,6 +1096,26 @@ class Swauth(object):
|
|||||||
(path, resp.status))
|
(path, resp.status))
|
||||||
return HTTPNoContent(request=req)
|
return HTTPNoContent(request=req)
|
||||||
|
|
||||||
|
def is_user_reseller_admin(self, req, account, user):
|
||||||
|
"""
|
||||||
|
Returns True if the user is a .reseller_admin.
|
||||||
|
|
||||||
|
:param account: account user is part of
|
||||||
|
:param user: the user
|
||||||
|
:returns: True if user .reseller_admin, False
|
||||||
|
if user is not a reseller_admin and None if the user
|
||||||
|
doesn't exist.
|
||||||
|
"""
|
||||||
|
req.credentials_valid = True
|
||||||
|
user_json = self.get_user_detail(req, account, user)
|
||||||
|
if user_json is None:
|
||||||
|
req.credentials_valid = False
|
||||||
|
return False
|
||||||
|
|
||||||
|
user_detail = json.loads(user_json)
|
||||||
|
|
||||||
|
return '.reseller_admin' in (g['name'] for g in user_detail['groups'])
|
||||||
|
|
||||||
def handle_get_token(self, req):
|
def handle_get_token(self, req):
|
||||||
"""
|
"""
|
||||||
Handles the various `request for token and service end point(s)` calls.
|
Handles the various `request for token and service end point(s)` calls.
|
||||||
@ -1391,18 +1423,34 @@ class Swauth(object):
|
|||||||
return None
|
return None
|
||||||
admin_account, admin_user = \
|
admin_account, admin_user = \
|
||||||
req.headers.get('x-auth-admin-user').split(':', 1)
|
req.headers.get('x-auth-admin-user').split(':', 1)
|
||||||
path = quote('/v1/%s/%s/%s' % (self.auth_account, admin_account,
|
user_json = self.get_user_detail(req, admin_account, admin_user)
|
||||||
admin_user))
|
if user_json is None:
|
||||||
|
return None
|
||||||
|
admin_detail = json.loads(user_json)
|
||||||
|
admin_detail['account'] = admin_account
|
||||||
|
return admin_detail
|
||||||
|
|
||||||
|
def get_user_detail(self, req, account, user):
|
||||||
|
"""
|
||||||
|
Returns the response body of a GET request for the specified user
|
||||||
|
The body is in JSON format and contains all user information.
|
||||||
|
|
||||||
|
:param req: The swob request
|
||||||
|
:param account: the account the user is a member of
|
||||||
|
:param user: the user
|
||||||
|
|
||||||
|
:returns: A JSON response with the user detail information, None
|
||||||
|
if the user doesn't exist
|
||||||
|
"""
|
||||||
|
path = quote('/v1/%s/%s/%s' % (self.auth_account, account, user))
|
||||||
resp = self.make_pre_authed_request(
|
resp = self.make_pre_authed_request(
|
||||||
req.environ, 'GET', path).get_response(self.app)
|
req.environ, 'GET', path).get_response(self.app)
|
||||||
if resp.status_int == 404:
|
if resp.status_int == 404:
|
||||||
return None
|
return None
|
||||||
if resp.status_int // 100 != 2:
|
if resp.status_int // 100 != 2:
|
||||||
raise Exception('Could not get admin user object: %s %s' %
|
raise Exception('Could not get user object: %s %s' %
|
||||||
(path, resp.status))
|
(path, resp.status))
|
||||||
admin_detail = json.loads(resp.body)
|
return resp.body
|
||||||
admin_detail['account'] = admin_account
|
|
||||||
return admin_detail
|
|
||||||
|
|
||||||
def credentials_match(self, user_detail, key):
|
def credentials_match(self, user_detail, key):
|
||||||
"""
|
"""
|
||||||
|
@ -492,12 +492,12 @@ class TestUser(unittest.TestCase):
|
|||||||
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with re_admin user of other account: '+output)
|
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with re_admin user of other account: '+output)
|
||||||
'''
|
'''
|
||||||
Utils.addResellerAdminUser('test2', 're_admintobedeletedbyotheraccountusers2', 'testing')
|
Utils.addResellerAdminUser('test2', 're_admintobedeletedbyotheraccountusers2', 'testing')
|
||||||
(status,output) = Utils.deleteUser('test2', 're_admintobedeletedbyotherusers2',user='test:admin',key='testing')
|
(status,output) = Utils.deleteUser('test2', 're_admintobedeletedbyotheraccountusers2',user='test:admin',key='testing')
|
||||||
self.assertNotEqual(status, 0, 're_admin deletion succeeded with admin user of other account: '+output)
|
self.assertNotEqual(status, 0, 're_admin deletion succeeded with admin user of other account: '+output)
|
||||||
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with admin user of other account: '+output)
|
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with admin user of other account: '+output)
|
||||||
|
|
||||||
Utils.addResellerAdminUser('test2', 're_admintobedeletedbyotheraccountusers3', 'testing')
|
Utils.addResellerAdminUser('test2', 're_admintobedeletedbyotheraccountusers3', 'testing')
|
||||||
(status,output) = Utils.deleteUser('test2', 're_admintobedeletedbyotherusers3',user='test:tester',key='testing')
|
(status,output) = Utils.deleteUser('test2', 're_admintobedeletedbyotheraccountusers3',user='test:tester',key='testing')
|
||||||
self.assertNotEqual(status, 0, 're_admin deletion succeeded with regular user of other account: '+output)
|
self.assertNotEqual(status, 0, 're_admin deletion succeeded with regular user of other account: '+output)
|
||||||
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with user of other account: '+output)
|
self.assertEqual('403 Forbidden' in output,True, 're_admin deletion succeeded with user of other account: '+output)
|
||||||
|
|
||||||
|
@ -3543,6 +3543,9 @@ class TestAuth(unittest.TestCase):
|
|||||||
|
|
||||||
def test_delete_user_bad_creds(self):
|
def test_delete_user_bad_creds(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
|
||||||
|
{"name": "test"}, {"name": ".admin"}],
|
||||||
|
"auth": "plaintext:key"})),
|
||||||
# GET of user object (account admin, but wrong
|
# GET of user object (account admin, but wrong
|
||||||
# account)
|
# account)
|
||||||
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
|
||||||
@ -3557,9 +3560,11 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'key'}
|
'X-Auth-Admin-Key': 'key'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 403)
|
self.assertEquals(resp.status_int, 403)
|
||||||
self.assertEquals(self.test_auth.app.calls, 1)
|
self.assertEquals(self.test_auth.app.calls, 2)
|
||||||
|
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# GET of user object (regular user)
|
# GET of user object (regular user)
|
||||||
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
{"name": "test"}], "auth": "plaintext:key"}))]))
|
{"name": "test"}], "auth": "plaintext:key"}))]))
|
||||||
@ -3572,8 +3577,55 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'key'}
|
'X-Auth-Admin-Key': 'key'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 403)
|
self.assertEquals(resp.status_int, 403)
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 2)
|
||||||
|
|
||||||
|
def test_delete_reseller_admin_user_fail(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user being deleted a reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:re_adm"},
|
||||||
|
{"name": "act2"}, {"name": ".admin"},
|
||||||
|
{"name": ".reseller_admin"}], "auth": "plaintext:key"})),
|
||||||
|
# GET of user object
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
|
||||||
|
{"name": "act2"}, {"name": ".admin"}],
|
||||||
|
"auth": "plaintext:key"}))]))
|
||||||
|
|
||||||
|
resp = Request.blank('/auth/v2/act2/re_adm',
|
||||||
|
environ={
|
||||||
|
'REQUEST_METHOD': 'DELETE'},
|
||||||
|
headers={
|
||||||
|
'X-Auth-Admin-User':
|
||||||
|
'act2:adm',
|
||||||
|
'X-Auth-Admin-Key': 'key'}
|
||||||
|
).get_response(self.test_auth)
|
||||||
|
self.assertEquals(resp.status_int, 403)
|
||||||
self.assertEquals(self.test_auth.app.calls, 1)
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
|
||||||
|
def test_delete_reseller_admin_user_success(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user being deleted a reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act2:re_adm"},
|
||||||
|
{"name": "act2"}, {"name": ".admin"},
|
||||||
|
{"name": ".reseller_admin"}], "auth": "plaintext:key"})),
|
||||||
|
# HEAD of user object
|
||||||
|
('200 Ok',
|
||||||
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
|
# DELETE of token
|
||||||
|
('204 No Content', {}, ''),
|
||||||
|
# DELETE of user object
|
||||||
|
('204 No Content', {}, '')]))
|
||||||
|
|
||||||
|
resp = Request.blank('/auth/v2/act2/re_adm',
|
||||||
|
environ={
|
||||||
|
'REQUEST_METHOD': 'DELETE'},
|
||||||
|
headers={
|
||||||
|
'X-Auth-Admin-User':
|
||||||
|
'.super_admin',
|
||||||
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
|
).get_response(self.test_auth)
|
||||||
|
self.assertEquals(resp.status_int, 204)
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 4)
|
||||||
|
|
||||||
def test_delete_user_invalid_account(self):
|
def test_delete_user_invalid_account(self):
|
||||||
resp = Request.blank('/auth/v2/.invalid/usr',
|
resp = Request.blank('/auth/v2/.invalid/usr',
|
||||||
environ={
|
environ={
|
||||||
@ -3628,6 +3680,9 @@ class TestAuth(unittest.TestCase):
|
|||||||
|
|
||||||
def test_delete_user_fail_delete_token(self):
|
def test_delete_user_fail_delete_token(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok',
|
('200 Ok',
|
||||||
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
@ -3642,10 +3697,13 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 500)
|
self.assertEquals(resp.status_int, 500)
|
||||||
self.assertEquals(self.test_auth.app.calls, 2)
|
self.assertEquals(self.test_auth.app.calls, 3)
|
||||||
|
|
||||||
def test_delete_user_fail_delete_user(self):
|
def test_delete_user_fail_delete_user(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok',
|
('200 Ok',
|
||||||
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
@ -3662,10 +3720,13 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 500)
|
self.assertEquals(resp.status_int, 500)
|
||||||
self.assertEquals(self.test_auth.app.calls, 3)
|
self.assertEquals(self.test_auth.app.calls, 4)
|
||||||
|
|
||||||
def test_delete_user_success(self):
|
def test_delete_user_success(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok',
|
('200 Ok',
|
||||||
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
@ -3682,10 +3743,13 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 204)
|
self.assertEquals(resp.status_int, 204)
|
||||||
self.assertEquals(self.test_auth.app.calls, 3)
|
self.assertEquals(self.test_auth.app.calls, 4)
|
||||||
|
|
||||||
def test_delete_user_success_missing_user_at_end(self):
|
def test_delete_user_success_missing_user_at_end(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok',
|
('200 Ok',
|
||||||
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
@ -3702,10 +3766,13 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 204)
|
self.assertEquals(resp.status_int, 204)
|
||||||
self.assertEquals(self.test_auth.app.calls, 3)
|
self.assertEquals(self.test_auth.app.calls, 4)
|
||||||
|
|
||||||
def test_delete_user_success_missing_token(self):
|
def test_delete_user_success_missing_token(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok',
|
('200 Ok',
|
||||||
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
{'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
|
||||||
@ -3722,10 +3789,13 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 204)
|
self.assertEquals(resp.status_int, 204)
|
||||||
self.assertEquals(self.test_auth.app.calls, 3)
|
self.assertEquals(self.test_auth.app.calls, 4)
|
||||||
|
|
||||||
def test_delete_user_success_no_token(self):
|
def test_delete_user_success_no_token(self):
|
||||||
self.test_auth.app = FakeApp(iter([
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
# is user reseller_admin
|
||||||
|
('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
|
||||||
|
{"name": "test"}], "auth": "plaintext:key"})),
|
||||||
# HEAD of user object
|
# HEAD of user object
|
||||||
('200 Ok', {}, ''),
|
('200 Ok', {}, ''),
|
||||||
# DELETE of user object
|
# DELETE of user object
|
||||||
@ -3739,7 +3809,7 @@ class TestAuth(unittest.TestCase):
|
|||||||
'X-Auth-Admin-Key': 'supertest'}
|
'X-Auth-Admin-Key': 'supertest'}
|
||||||
).get_response(self.test_auth)
|
).get_response(self.test_auth)
|
||||||
self.assertEquals(resp.status_int, 204)
|
self.assertEquals(resp.status_int, 204)
|
||||||
self.assertEquals(self.test_auth.app.calls, 2)
|
self.assertEquals(self.test_auth.app.calls, 3)
|
||||||
|
|
||||||
def test_validate_token_bad_prefix(self):
|
def test_validate_token_bad_prefix(self):
|
||||||
resp = Request.blank('/auth/v2/.token/BAD_token'
|
resp = Request.blank('/auth/v2/.token/BAD_token'
|
||||||
@ -3934,7 +4004,7 @@ class TestAuth(unittest.TestCase):
|
|||||||
headers={'X-Auth-Admin-User': 'act:usr'}))
|
headers={'X-Auth-Admin-User': 'act:usr'}))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
exc = err
|
exc = err
|
||||||
self.assertEquals(str(exc), 'Could not get admin user object: '
|
self.assertEquals(str(exc), 'Could not get user object: '
|
||||||
'/v1/AUTH_gsmetadata/act/usr 503 Service Unavailable')
|
'/v1/AUTH_gsmetadata/act/usr 503 Service Unavailable')
|
||||||
self.assertEquals(self.test_auth.app.calls, 1)
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
|
||||||
@ -3954,6 +4024,80 @@ class TestAuth(unittest.TestCase):
|
|||||||
'groups': [{'name': 'act:usr'}, {'name': 'act'},
|
'groups': [{'name': 'act:usr'}, {'name': 'act'},
|
||||||
{'name': '.admin'}]})
|
{'name': '.admin'}]})
|
||||||
|
|
||||||
|
def test_get_user_detail_success(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('200 Ok', {},
|
||||||
|
json.dumps({"auth": "plaintext:key",
|
||||||
|
"groups": [{'name': "act:usr"}, {'name': "act"},
|
||||||
|
{'name': ".admin"}]}))]))
|
||||||
|
detail = self.test_auth.get_user_detail(
|
||||||
|
Request.blank('/',
|
||||||
|
headers={'X-Auth-Admin-User': 'act:usr'}),
|
||||||
|
'act', 'usr')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
detail_json = json.loads(detail)
|
||||||
|
self.assertEquals("plaintext:key", detail_json['auth'])
|
||||||
|
|
||||||
|
def test_get_user_detail_fail_user_doesnt_exist(self):
|
||||||
|
self.test_auth.app = FakeApp(
|
||||||
|
iter([('404 Not Found', {}, '')]))
|
||||||
|
detail = self.test_auth.get_user_detail(
|
||||||
|
Request.blank('/',
|
||||||
|
headers={'X-Auth-Admin-User': 'act:usr'}),
|
||||||
|
'act', 'usr')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
self.assertEquals(detail, None)
|
||||||
|
|
||||||
|
def test_get_user_detail_fail_exception(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('503 Service Unavailable', {}, '')]))
|
||||||
|
exc = None
|
||||||
|
try:
|
||||||
|
detail = self.test_auth.get_user_detail(
|
||||||
|
Request.blank('/',
|
||||||
|
headers={'X-Auth-Admin-User': 'act:usr'}),
|
||||||
|
'act', 'usr')
|
||||||
|
except Exception as err:
|
||||||
|
exc = err
|
||||||
|
self.assertEquals(str(exc), 'Could not get user object: '
|
||||||
|
'/v1/AUTH_gsmetadata/act/usr 503 Service Unavailable')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
|
||||||
|
def test_is_user_reseller_admin_success(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('200 Ok', {},
|
||||||
|
json.dumps({"auth": "plaintext:key",
|
||||||
|
"groups": [{'name': "act:usr"}, {'name': "act"},
|
||||||
|
{'name': ".reseller_admin"}]}))]))
|
||||||
|
result = self.test_auth.is_user_reseller_admin(
|
||||||
|
Request.blank('/',
|
||||||
|
headers={'X-Auth-Admin-User': 'act:usr'}),
|
||||||
|
'act', 'usr')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_is_user_reseller_admin_fail(self):
|
||||||
|
self.test_auth.app = FakeApp(iter([
|
||||||
|
('200 Ok', {},
|
||||||
|
json.dumps({"auth": "plaintext:key",
|
||||||
|
"groups": [{'name': "act:usr"}, {'name': "act"},
|
||||||
|
{'name': ".admin"}]}))]))
|
||||||
|
result = self.test_auth.is_user_reseller_admin(
|
||||||
|
Request.blank('/',
|
||||||
|
headers={'X-Auth-Admin-User': 'act:usr'}),
|
||||||
|
'act', 'usr')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_is_user_reseller_admin_fail_user_doesnt_exist(self):
|
||||||
|
self.test_auth.app = FakeApp(
|
||||||
|
iter([('404 Not Found', {}, '')]))
|
||||||
|
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})
|
||||||
|
result = self.test_auth.is_user_reseller_admin(req, 'act', 'usr')
|
||||||
|
self.assertEquals(self.test_auth.app.calls, 1)
|
||||||
|
self.assertFalse(result)
|
||||||
|
self.assertFalse(req.credentials_valid)
|
||||||
|
|
||||||
def test_credentials_match_success(self):
|
def test_credentials_match_success(self):
|
||||||
self.assert_(self.test_auth.credentials_match(
|
self.assert_(self.test_auth.credentials_match(
|
||||||
{'auth': 'plaintext:key'}, 'key'))
|
{'auth': 'plaintext:key'}, 'key'))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user