Fixes #35 X-Auth-New-Token support
When requesting an auth token, you can include an "X-Auth-New-Token: true" header to have a new token generated and the old one revoked.
This commit is contained in:
parent
a0aae0e20f
commit
c1bcd46e9f
@ -1118,6 +1118,10 @@ class Swauth(object):
|
||||
# Possibly other service dicts, not implemented yet.
|
||||
}
|
||||
|
||||
One can also include an "X-Auth-New-Token: true" header to
|
||||
force issuing a new token and revoking any old token, even if
|
||||
it hasn't expired yet.
|
||||
|
||||
:param req: The swob.Request to process.
|
||||
:returns: swob.Response, 2xx on success with data set as explained
|
||||
above.
|
||||
@ -1181,18 +1185,28 @@ class Swauth(object):
|
||||
if candidate_token:
|
||||
path = quote('/v1/%s/.token_%s/%s' %
|
||||
(self.auth_account, candidate_token[-1], candidate_token))
|
||||
resp = make_pre_authed_request(req.environ, 'GET', path,
|
||||
agent=self.agent).get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
token_detail = json.loads(resp.body)
|
||||
if token_detail['expires'] > time():
|
||||
token = candidate_token
|
||||
delete_token = False
|
||||
try:
|
||||
if req.headers.get('x-auth-new-token', 'false').lower() in \
|
||||
TRUE_VALUES:
|
||||
delete_token = True
|
||||
else:
|
||||
resp = make_pre_authed_request(req.environ, 'GET', path,
|
||||
agent=self.agent).get_response(self.app)
|
||||
if resp.status_int // 100 == 2:
|
||||
token_detail = json.loads(resp.body)
|
||||
if token_detail['expires'] > time():
|
||||
token = candidate_token
|
||||
else:
|
||||
delete_token = True
|
||||
elif resp.status_int != 404:
|
||||
raise Exception(
|
||||
'Could not detect whether a token already exists: '
|
||||
'%s %s' % (path, resp.status))
|
||||
finally:
|
||||
if delete_token:
|
||||
make_pre_authed_request(req.environ, 'DELETE', path,
|
||||
agent=self.agent).get_response(self.app)
|
||||
elif resp.status_int != 404:
|
||||
raise Exception('Could not detect whether a token already '
|
||||
'exists: %s %s' % (path, resp.status))
|
||||
# Create a new token if one didn't exist
|
||||
if not token:
|
||||
# Retrieve account id, we'll save this in the token
|
||||
@ -1316,7 +1330,9 @@ class Swauth(object):
|
||||
the token may not be the same as the auth process that created the
|
||||
token.
|
||||
"""
|
||||
if not self.itoken or self.itoken_expires < time():
|
||||
if not self.itoken or self.itoken_expires < time() or \
|
||||
env.get('HTTP_X_AUTH_NEW_TOKEN', 'false').lower() in \
|
||||
TRUE_VALUES:
|
||||
self.itoken = '%sitk%s' % (self.reseller_prefix, uuid4().hex)
|
||||
memcache_key = '%s/auth/%s' % (self.reseller_prefix, self.itoken)
|
||||
self.itoken_expires = time() + self.token_life - 60
|
||||
|
@ -821,6 +821,39 @@ class TestAuth(unittest.TestCase):
|
||||
"local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
|
||||
self.assertEquals(self.test_auth.app.calls, 3)
|
||||
|
||||
def test_get_token_success_existing_token_but_request_new_one(self):
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
# GET of user object
|
||||
('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
|
||||
json.dumps({"auth": "plaintext:key",
|
||||
"groups": [{'name': "act:usr"}, {'name': "act"},
|
||||
{'name': ".admin"}]})),
|
||||
# DELETE of expired token
|
||||
('204 No Content', {}, ''),
|
||||
# GET of account
|
||||
('204 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
|
||||
# PUT of new token
|
||||
('201 Created', {}, ''),
|
||||
# POST of token to user object
|
||||
('204 No Content', {}, ''),
|
||||
# GET of services object
|
||||
('200 Ok', {}, json.dumps({"storage": {"default": "local",
|
||||
"local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
|
||||
resp = Request.blank('/auth/v1.0',
|
||||
headers={'X-Auth-User': 'act:usr',
|
||||
'X-Auth-Key': 'key',
|
||||
'X-Auth-New-Token': 'true'}).get_response(self.test_auth)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
self.assertNotEquals(resp.headers.get('x-auth-token'), 'AUTH_tktest')
|
||||
self.assertEquals(resp.headers.get('x-auth-token'),
|
||||
resp.headers.get('x-storage-token'))
|
||||
self.assertEquals(resp.headers.get('x-storage-url'),
|
||||
'http://127.0.0.1:8080/v1/AUTH_cfa')
|
||||
self.assertEquals(json.loads(resp.body),
|
||||
{"storage": {"default": "local",
|
||||
"local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
|
||||
self.assertEquals(self.test_auth.app.calls, 6)
|
||||
|
||||
def test_get_token_success_existing_token_expired(self):
|
||||
self.test_auth.app = FakeApp(iter([
|
||||
# GET of user object
|
||||
|
Loading…
x
Reference in New Issue
Block a user