From c2bb81a60edcf8befe2712c2e798d389a167b192 Mon Sep 17 00:00:00 2001 From: Prashanth Pai Date: Mon, 20 Jan 2014 09:53:40 +0530 Subject: [PATCH] Allow users to change their own password/key Users were not able to update their own password/key with the update operation resulting in 403 (HTTPForbidden). EXAMPLES: Command to update password/key of regular user: gswauth-add-user -U account1:user1 -K old_pass account1 user1 new_pass Command to update password/key of account admin: gswauth-add-user -U account1:admin -K old_pass -a account1 admin new_pass Command to update password/key of reseller_admin: gswauth-add-user -U account1:radmin -K old_pass -r account1 radmin new_pass Signed-off-by: Prashanth Pai --- bin/swauth-add-user | 27 ++-- swauth/middleware.py | 51 +++++++- test_swauth/unit/test_middleware.py | 189 ++++++++++++++++++++++------ 3 files changed, 209 insertions(+), 58 deletions(-) diff --git a/bin/swauth-add-user b/bin/swauth-add-user index 81eeac7..82bf073 100755 --- a/bin/swauth-add-user +++ b/bin/swauth-add-user @@ -66,22 +66,23 @@ if __name__ == '__main__': parsed_path = '/' elif parsed_path[-1] != '/': parsed_path += '/' - # Ensure the account exists - path = '%sv2/%s' % (parsed_path, account) - headers = {'X-Auth-Admin-User': options.admin_user, - 'X-Auth-Admin-Key': options.admin_key} - if options.suffix: - headers['X-Account-Suffix'] = options.suffix - conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers, - ssl=(parsed.scheme == 'https')) - resp = conn.getresponse() - if resp.status // 100 != 2: - headers['Content-Length'] = '0' - conn = http_connect(parsed.hostname, parsed.port, 'PUT', path, headers, + # Ensure the account exists if user is NOT trying to change his password + if not options.admin_user == (account + ':' + user): + path = '%sv2/%s' % (parsed_path, account) + headers = {'X-Auth-Admin-User': options.admin_user, + 'X-Auth-Admin-Key': options.admin_key} + if options.suffix: + headers['X-Account-Suffix'] = options.suffix + conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers, ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'Account creation failed: %s %s' % (resp.status, resp.reason) + headers['Content-Length'] = '0' + conn = http_connect(parsed.hostname, parsed.port, 'PUT', path, headers, + ssl=(parsed.scheme == 'https')) + resp = conn.getresponse() + if resp.status // 100 != 2: + print 'Account creation failed: %s %s' % (resp.status, resp.reason) # Add the user path = '%sv2/%s/%s' % (parsed_path, account, user) headers = {'X-Auth-Admin-User': options.admin_user, diff --git a/swauth/middleware.py b/swauth/middleware.py index 9b1ec58..94450f0 100644 --- a/swauth/middleware.py +++ b/swauth/middleware.py @@ -1018,9 +1018,19 @@ class Swauth(object): X-Auth-User-Reseller-Admin may be set to `true` to create a .reseller_admin. + Creating users + ************** Can only be called by an account .admin unless the user is to be a .reseller_admin, in which case the request must be by .super_admin. + Changing password/key + ********************* + 1) reseller_admin key can be changed by super_admin and by himself. + 2) admin key can be changed by any admin in same account, + reseller_admin, super_admin and himself. + 3) Regular user key can be changed by any admin in his account, + reseller_admin, super_admin and himself. + :param req: The swob.Request to process. :returns: swob.Response, 2xx on success. """ @@ -1036,11 +1046,14 @@ class Swauth(object): if req.path_info or not account or account[0] == '.' or not user or \ user[0] == '.' or not key: return HTTPBadRequest(request=req) + user_arg = account + ':' + user if reseller_admin: - if not self.is_super_admin(req): - return self.denied_response(req) - elif not self.is_account_admin(req, account): - return self.denied_response(req) + if not self.is_super_admin(req) and\ + not self.is_user_changing_own_key(req, user_arg): + return self.denied_response(req) + elif not self.is_account_admin(req, account) and\ + not self.is_user_changing_own_key(req, user_arg): + return self.denied_response(req) path = quote('/v1/%s/%s' % (self.auth_account, account)) resp = self.make_pre_authed_request( @@ -1448,6 +1461,36 @@ class Swauth(object): return user_detail and self.auth_encoder().match( key, user_detail.get('auth')) + def is_user_changing_own_key(self, req, user): + """ + Check if the user is changing his own key. + :param req: The swob.Request to check. This contains x-auth-admin-user + and x-auth-admin-key headers which are credentials of the + user sending the request. + :param user: User whose password is to be changed. + :returns True if user is changing his own key, False if not. + """ + admin_detail = self.get_admin_detail(req) + if not admin_detail: + # The user does not exist + return False + + # If user is not admin/reseller_admin and x-auth-user-admin or + # x-auth-user-reseller-admin headers are present in request, he may be + # attempting to escalate himself as admin/reseller_admin! + if '.admin' not in (g['name'] for g in admin_detail['groups']): + if req.headers.get('x-auth-user-admin') == 'true' or \ + req.headers.get('x-auth-user-reseller-admin') == 'true': + return False + if '.reseller_admin' not in \ + (g['name'] for g in admin_detail['groups']) and \ + req.headers.get('x-auth-user-reseller-admin') == 'true': + return False + + return req.headers.get('x-auth-admin-user') == user and \ + self.credentials_match(admin_detail, + req.headers.get('x-auth-admin-key')) + def is_super_admin(self, req): """ Returns True if the admin specified in the request represents the diff --git a/test_swauth/unit/test_middleware.py b/test_swauth/unit/test_middleware.py index 960b4b3..b244174 100644 --- a/test_swauth/unit/test_middleware.py +++ b/test_swauth/unit/test_middleware.py @@ -2697,8 +2697,13 @@ class TestAuth(unittest.TestCase): def test_put_user_reseller_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ + # Checking if user is changing his own key. This is called. + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, + {"name": "test"}, {"name": ".admin"}, + {"name": ".reseller_admin"}], "auth": "plaintext:key"})), # GET of user object (reseller admin) - # This shouldn't actually get called, checked below + # This shouldn't actually get called, checked + # below ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, {"name": "test"}, {"name": ".admin"}, {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) @@ -2710,14 +2715,19 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.assertEquals(self.test_auth.app.calls, 0) + self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) - # This shouldn't actually get called, checked below + # Checking if user is changing his own key. This is called. ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) + "auth": "plaintext:key"})), + # GET of user object (account admin, but not reseller admin) + # This shouldn't actually get called, checked + # below + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Auth-Admin-User': 'act:adm', @@ -2726,12 +2736,16 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.assertEquals(self.test_auth.app.calls, 0) + self.assertEquals(self.test_auth.app.calls, 1) self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - # This shouldn't actually get called, checked below + # Checking if user is changing his own key. This is called. ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"})), + # GET of user object (regular user) + # This shouldn't actually get called, checked + # below + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={'REQUEST_METHOD': 'PUT'}, @@ -2741,65 +2755,80 @@ class TestAuth(unittest.TestCase): 'X-Auth-User-Reseller-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) - self.assertEquals(self.test_auth.app.calls, 0) + self.assertEquals(self.test_auth.app.calls, 1) def test_put_user_account_admin_fail_bad_creds(self): self.test_auth.app = FakeApp(iter([ # GET of user object (account admin, but wrong account) ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act2:adm', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_user_regular_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong account) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + "auth": "plaintext:key"})), + # Checking if user is changing his own key. + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, {"name": "test"}, {"name": ".admin"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key', - 'X-Auth-User-Key': 'key'} + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Admin': 'true'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) + self.assertEquals(self.test_auth.app.calls, 2) self.test_auth.app = FakeApp(iter([ # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"})), + # Checking if user is changing his own key. ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, {"name": "test"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Admin': 'true'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_put_user_regular_fail_bad_creds(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin, but wrong + # account) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # Checking if user is changing his own key. + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Auth-Admin-User': 'act2:adm', 'X-Auth-Admin-Key': 'key', 'X-Auth-User-Key': 'key'} ).get_response(self.test_auth) self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) + self.assertEquals(self.test_auth.app.calls, 2) + + self.test_auth.app = FakeApp(iter([ + # GET of user object (regular user) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"})), + # Checking if user is changing his own key. + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act2/usr', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) def test_put_user_regular_success(self): self.test_auth.app = FakeApp(iter([ @@ -3239,6 +3268,84 @@ class TestAuth(unittest.TestCase): self.assert_(not self.test_auth.credentials_match( {'auth': 'plaintext:key'}, 'notkey')) + def test_is_user_changing_own_key_err(self): + # User does not exist + self.test_auth.app = FakeApp( + iter([('404 Not Found', {}, '')])) + req = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key'}) + self.assert_( + not self.test_auth.is_user_changing_own_key(req, 'act:usr')) + self.assertEquals(self.test_auth.app.calls, 1) + + # user attempting to escalate himself as admin + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + req = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Admin': 'true'}) + self.assert_( + not self.test_auth.is_user_changing_own_key(req, 'act:usr')) + self.assertEquals(self.test_auth.app.calls, 1) + + # admin attempting to escalate himself as reseller_admin + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + req = Request.blank('/auth/v2/act/adm', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': 'act:adm', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key', + 'X-Auth-User-Reseller-Admin': 'true'}) + self.assert_( + not self.test_auth.is_user_changing_own_key(req, 'act:adm')) + self.assertEquals(self.test_auth.app.calls, 1) + + # different user + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + req = Request.blank('/auth/v2/act/usr2', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'key', + 'X-Auth-User-Key': 'key'}) + self.assert_( + not self.test_auth.is_user_changing_own_key(req, 'act:usr2')) + self.assertEquals(self.test_auth.app.calls, 1) + + # wrong key + self.test_auth.app = FakeApp(iter([ + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, + {"name": "test"}], "auth": "plaintext:key"}))])) + req = Request.blank('/auth/v2/act/usr', + environ={ + 'REQUEST_METHOD': 'PUT'}, + headers={ + 'X-Auth-Admin-User': 'act:usr', + 'X-Auth-Admin-Key': 'wrongkey', + 'X-Auth-User-Key': 'newkey'}) + self.assert_( + not self.test_auth.is_user_changing_own_key(req, 'act:usr')) + self.assertEquals(self.test_auth.app.calls, 1) + def test_is_super_admin_success(self): self.assert_(self.test_auth.is_super_admin(Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin',