Merge "Fix Keystone Auth and Tests"

This commit is contained in:
Jenkins 2015-07-31 01:30:53 +00:00 committed by Gerrit Code Review
commit 5c702be8fd
2 changed files with 109 additions and 56 deletions

View File

@ -13,7 +13,6 @@
from __future__ import absolute_import
import json
import logging
import requests
@ -31,16 +30,10 @@ def login(_, token):
:param token: A Keystone Token
:returns: AuthDetails -- Class used for authentication information
"""
data = json.dumps({"auth": {
"identity": {
"methods": ["token"],
"token": {
"id": token
}}}})
req = requests.post(jsonloader.conf.auth['keystone']['url'] +
'/v3/auth/tokens',
headers={'Content-Type': 'application/json'},
data=data)
req = requests.get(jsonloader.conf.auth['keystone']['url'] +
'/v3/auth/tokens',
headers={'X-Auth-Token': token,
'X-Subject-Token': token})
if req.status_code != 200:
logger.info("Authentication failed for token <%s>, status %s",
token, req.status_code)

View File

@ -21,60 +21,120 @@ import mock
import requests
import requests_mock
from anchor.auth import keystone
from anchor.auth import results
class AuthKeystoneTests(unittest.TestCase):
def setUp(self):
self.config = "anchor.jsonloader.conf._config"
self.data = {'auth': {'keystone': {'url': 'http://localhost:35357'}}}
self.json_response = {
"token": {
"audit_ids": [
"TPDsHuK_QCaKwvkVlAer8A"
],
"catalog": [
{
"endpoints": [
{
"id": "1390df96096d4bd19add44811db34397",
"interface": "public",
"region": "RegionOne",
"region_id": "RegionOne",
"url": "http://10.0.2.15:5000/v2.0"
},
{
"id": "534bcae735614781a03069d637b21570",
"interface": "internal",
"region": "RegionOne",
"region_id": "RegionOne",
"url": "http://10.0.2.15:5000/v2.0"
},
{
"id": "cc7e879d691e4e4b9f4afecb1a3ce8f0",
"interface": "admin",
"region": "RegionOne",
"region_id": "RegionOne",
"url": "http://10.0.2.15:35357/v2.0"
}
],
"id": "3010a0c9af684db28659f0e9e08ee863",
"name": "keystone",
"type": "identity"
}
],
"expires_at": "2015-07-27T02:38:09.000000Z",
"extras": {},
"issued_at": "2015-07-27T01:38:09.409616",
"methods": [
"password",
"token"
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "5b2e7bd5d5954fdaa2d931285df8a132",
"name": "demo"
},
"roles": [
{
"id": "35a1d29b54f64c969aa9be288ec9d39a",
"name": "anotherrole"
},
{
"id": "9f64371fcbd64c669ab1a24686a1a367",
"name": "Member"
}
],
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "b2016b9338214cda926d5631c1fbc40c",
"name": "demo"
}
}
}
self.user = self.json_response['token']['user']['name']
self.roles = [role['name']
for role in self.json_response['token']['roles']]
self.expected = results.AuthDetails(
username=self.user, groups=self.roles)
self.keystone_url = self.data['auth'][
'keystone']['url'] + '/v3/auth/tokens'
self.keystone_token = uuid.uuid4().hex
super(AuthKeystoneTests, self).setUp()
def tearDown(self):
pass
def test_auth_login(self):
config = "anchor.jsonloader.conf._config"
data = {'auth': {'keystone': {'url': 'http://localhost:35357'}}}
with mock.patch.dict(config, data):
from anchor.auth import keystone
from anchor.auth import results
keystone_url = data['auth']['keystone']['url'] + '/v3/auth/tokens'
keystone_token = uuid.uuid4().hex
json_response = {
"token": {
"roles": [
{
"name": "admin"
}
],
"user": {
"name": "priti"
},
}
}
user = json_response['token']['user']['name']
roles = [role['name'] for role in json_response['token']['roles']]
expected = results.AuthDetails(username=user, groups=roles)
def test_parse_keystone_valid_response(self):
with mock.patch.dict(self.config, self.data):
with requests_mock.mock() as m:
m.post(keystone_url, json=json_response, status_code=200)
requests.post(keystone_url)
# Check that it can parse Keystone response when
# response has valid json and status code of 200
self.assertEqual(keystone.login(None, keystone_token),
expected)
m.get(self.keystone_url, json=self.json_response,
status_code=200)
requests.get(self.keystone_url)
self.assertEqual(keystone.login(
None, self.keystone_token), self.expected)
# Check that it fails and returns appropriate auth
# failure when Keystone authentication fails
m.post(keystone_url, status_code=201)
self.assertEqual(keystone.login(None, keystone_token),
None)
def test_parse_keystone_auth_fail(self):
with mock.patch.dict(self.config, self.data):
with requests_mock.mock() as m:
m.get(self.keystone_url, status_code=401)
self.assertEqual(keystone.login(
None, self.keystone_token), None)
# Check that it fails and returns appropriate auth
# failure when Keystone response is corrupted
m.post(keystone_url, json={}, status_code=200)
self.assertEqual(keystone.login(None, keystone_token),
None)
def test_parse_keystone_ok_but_malformed_response(self):
with mock.patch.dict(self.config, self.data):
with requests_mock.mock() as m:
m.get(self.keystone_url, json={}, status_code=200)
self.assertEqual(keystone.login(
None, self.keystone_token), None)