Added validation for API objects on POST.
This commit is contained in:
parent
95a183a301
commit
c81af303b0
@ -1,24 +1,51 @@
|
|||||||
import falcon
|
import falcon
|
||||||
import json
|
import json
|
||||||
|
import uuid
|
||||||
from tatu.db import models as db
|
from tatu.db import models as db
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
|
|
||||||
|
def validate_uuid(string):
|
||||||
|
try:
|
||||||
|
val = uuid.UUID(string, version=4)
|
||||||
|
except ValueError:
|
||||||
|
msg = '{} is not a valid UUID'.format(string)
|
||||||
|
raise falcon.HTTPBadRequest('Bad request', msg)
|
||||||
|
|
||||||
|
def validate_uuids(req, params):
|
||||||
|
id_keys = ['token_id', 'auth_id', 'host_id', 'user_id', 'project-id', 'instance-id']
|
||||||
|
if req.method in ('POST', 'PUT'):
|
||||||
|
for key in id_keys:
|
||||||
|
if key in req.body:
|
||||||
|
validate_uuid(req.body[key])
|
||||||
|
for key in id_keys:
|
||||||
|
if key in params:
|
||||||
|
validate_uuid(params[key])
|
||||||
|
|
||||||
|
def validate(req, resp, resource, params):
|
||||||
|
if req.content_length:
|
||||||
|
# Store the body since we cannot read the stream again later
|
||||||
|
req.body = json.load(req.stream)
|
||||||
|
elif req.method in ('POST', 'PUT'):
|
||||||
|
raise falcon.HTTPBadRequest('The POST/PUT request is missing a body.')
|
||||||
|
validate_uuids(req, params)
|
||||||
|
|
||||||
class Authorities(object):
|
class Authorities(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
body = None
|
try:
|
||||||
if req.content_length:
|
db.createAuthority(
|
||||||
body = json.load(req.stream)
|
self.session,
|
||||||
db.createAuthority(
|
req.body['auth_id'],
|
||||||
self.session,
|
)
|
||||||
body['auth_id'],
|
except KeyError as e:
|
||||||
)
|
raise falcon.HTTPBadRequest(str(e))
|
||||||
resp.status = falcon.HTTP_201
|
resp.status = falcon.HTTP_201
|
||||||
resp.location = '/authorities/' + body['auth_id']
|
resp.location = '/authorities/' + req.body['auth_id']
|
||||||
|
|
||||||
class Authority(object):
|
class Authority(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_get(self, req, resp, auth_id):
|
def on_get(self, req, resp, auth_id):
|
||||||
auth = db.getAuthority(self.session, auth_id)
|
auth = db.getAuthority(self.session, auth_id)
|
||||||
if auth is None:
|
if auth is None:
|
||||||
@ -38,22 +65,24 @@ class Authority(object):
|
|||||||
|
|
||||||
class UserCerts(object):
|
class UserCerts(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
body = None
|
|
||||||
if req.content_length:
|
|
||||||
body = json.load(req.stream)
|
|
||||||
# TODO: validation
|
# TODO: validation
|
||||||
user = db.createUserCert(
|
try:
|
||||||
self.session,
|
user = db.createUserCert(
|
||||||
body['user_id'],
|
self.session,
|
||||||
body['auth_id'],
|
req.body['user_id'],
|
||||||
body['key.pub']
|
req.body['auth_id'],
|
||||||
)
|
req.body['key.pub']
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise falcon.HTTPBadRequest(str(e))
|
||||||
resp.status = falcon.HTTP_201
|
resp.status = falcon.HTTP_201
|
||||||
resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint
|
resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint
|
||||||
|
|
||||||
class UserCert(object):
|
class UserCert(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_get(self, req, resp, user_id, fingerprint):
|
def on_get(self, req, resp, user_id, fingerprint):
|
||||||
user = db.getUserCert(self.session, user_id, fingerprint)
|
user = db.getUserCert(self.session, user_id, fingerprint)
|
||||||
if user is None:
|
if user is None:
|
||||||
@ -70,21 +99,25 @@ class UserCert(object):
|
|||||||
|
|
||||||
class HostCerts(object):
|
class HostCerts(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
body = None
|
# Note that we could have found the host_id using the token_id.
|
||||||
if req.content_length:
|
# But requiring the host_id makes it a little harder to steal the token.
|
||||||
body = json.load(req.stream)
|
try:
|
||||||
host = db.createHostCert(
|
host = db.createHostCert(
|
||||||
self.session,
|
self.session,
|
||||||
body['token_id'],
|
req.body['token_id'],
|
||||||
body['host_id'],
|
req.body['host_id'],
|
||||||
body['key.pub']
|
req.body['key.pub']
|
||||||
)
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise falcon.HTTPBadRequest(str(e))
|
||||||
resp.status = falcon.HTTP_201
|
resp.status = falcon.HTTP_201
|
||||||
resp.location = '/hostcerts/' + host.host_id + '/' + host.fingerprint
|
resp.location = '/hostcerts/' + host.host_id + '/' + host.fingerprint
|
||||||
|
|
||||||
class HostCert(object):
|
class HostCert(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_get(self, req, resp, host_id, fingerprint):
|
def on_get(self, req, resp, host_id, fingerprint):
|
||||||
host = db.getHostCert(self.session, host_id, fingerprint)
|
host = db.getHostCert(self.session, host_id, fingerprint)
|
||||||
if host is None:
|
if host is None:
|
||||||
@ -101,21 +134,23 @@ class HostCert(object):
|
|||||||
|
|
||||||
class Tokens(object):
|
class Tokens(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
body = None
|
try:
|
||||||
if req.content_length:
|
token = db.createToken(
|
||||||
body = json.load(req.stream)
|
self.session,
|
||||||
token = db.createToken(
|
req.body['host_id'],
|
||||||
self.session,
|
req.body['auth_id'],
|
||||||
body['host_id'],
|
req.body['hostname']
|
||||||
body['auth_id'],
|
)
|
||||||
body['hostname']
|
except KeyError as e:
|
||||||
)
|
raise falcon.HTTPBadRequest(str(e))
|
||||||
resp.status = falcon.HTTP_201
|
resp.status = falcon.HTTP_201
|
||||||
resp.location = '/hosttokens/' + token.token_id
|
resp.location = '/hosttokens/' + token.token_id
|
||||||
|
|
||||||
class NovaVendorData(object):
|
class NovaVendorData(object):
|
||||||
|
|
||||||
|
@falcon.before(validate)
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
# An example of the data nova sends to vendordata services:
|
# An example of the data nova sends to vendordata services:
|
||||||
# {
|
# {
|
||||||
@ -126,16 +161,16 @@ class NovaVendorData(object):
|
|||||||
# "project-id": "039d104b7a5c4631b4ba6524d0b9e981",
|
# "project-id": "039d104b7a5c4631b4ba6524d0b9e981",
|
||||||
# "user-data": null
|
# "user-data": null
|
||||||
# }
|
# }
|
||||||
body = None
|
try:
|
||||||
if req.content_length:
|
token = db.createToken(
|
||||||
body = json.load(req.stream)
|
self.session,
|
||||||
token = db.createToken(
|
req.body['instance-id'],
|
||||||
self.session,
|
req.body['project-id'],
|
||||||
body['instance-id'],
|
req.body['hostname']
|
||||||
body['project-id'],
|
)
|
||||||
body['hostname']
|
except KeyError as e:
|
||||||
)
|
raise falcon.HTTPBadRequest(str(e))
|
||||||
auth = db.getAuthority(self.session, body['project-id'])
|
auth = db.getAuthority(self.session, req.body['project-id'])
|
||||||
if auth is None:
|
if auth is None:
|
||||||
resp.status = falcon.HTTP_NOT_FOUND
|
resp.status = falcon.HTTP_NOT_FOUND
|
||||||
return
|
return
|
||||||
|
@ -19,26 +19,29 @@ def client(db):
|
|||||||
api = create_app(db)
|
api = create_app(db)
|
||||||
return testing.TestClient(api)
|
return testing.TestClient(api)
|
||||||
|
|
||||||
|
def random_uuid():
|
||||||
|
return str(uuid.uuid4())
|
||||||
|
|
||||||
token_id = ''
|
token_id = ''
|
||||||
|
|
||||||
host_id = str(uuid.uuid4())
|
host_id = random_uuid()
|
||||||
host_key = RSA.generate(2048)
|
host_key = RSA.generate(2048)
|
||||||
host_pub_key = host_key.publickey().exportKey('OpenSSH')
|
host_pub_key = host_key.publickey().exportKey('OpenSSH')
|
||||||
host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash()
|
host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash()
|
||||||
|
|
||||||
user_id = str(uuid.uuid4())
|
user_id = random_uuid()
|
||||||
user_key = RSA.generate(2048)
|
user_key = RSA.generate(2048)
|
||||||
user_pub_key = user_key.publickey().exportKey('OpenSSH')
|
user_pub_key = user_key.publickey().exportKey('OpenSSH')
|
||||||
user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash()
|
user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash()
|
||||||
|
|
||||||
auth_id = str(uuid.uuid4())
|
auth_id = random_uuid()
|
||||||
auth_user_pub_key = None
|
auth_user_pub_key = None
|
||||||
|
|
||||||
@pytest.mark.dependency()
|
@pytest.mark.dependency()
|
||||||
def test_post_authority(client, db):
|
def test_post_authority(client):
|
||||||
body = {
|
body = {
|
||||||
'auth_id': auth_id,
|
'auth_id': auth_id,
|
||||||
}
|
}
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/authorities',
|
'/authorities',
|
||||||
body=json.dumps(body)
|
body=json.dumps(body)
|
||||||
@ -49,9 +52,32 @@ def test_post_authority(client, db):
|
|||||||
#auth = session.query(Authority).get(auth_id)
|
#auth = session.query(Authority).get(auth_id)
|
||||||
#assert auth is not None
|
#assert auth is not None
|
||||||
|
|
||||||
|
def test_post_no_body(client):
|
||||||
|
for path in ['/authorities', '/usercerts', '/hosttokens',
|
||||||
|
'/hostcerts', '/novavendordata']:
|
||||||
|
response = client.simulate_post(path)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
|
def test_post_empty_body(client):
|
||||||
|
bodystr = json.dumps({})
|
||||||
|
for path in ['/authorities', '/usercerts', '/hosttokens',
|
||||||
|
'/hostcerts', '/novavendordata']:
|
||||||
|
response = client.simulate_post(path, body=bodystr)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
|
def test_post_authority_bad_uuid(client):
|
||||||
|
body = {
|
||||||
|
'auth_id': 'foobar',
|
||||||
|
}
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/authorities',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
def test_get_authority(client):
|
def test_get_authority(client):
|
||||||
response = client.simulate_get('/authorities/' + auth_id)
|
response = client.simulate_get('/authorities/' + auth_id)
|
||||||
assert response.status == falcon.HTTP_OK
|
assert response.status == falcon.HTTP_OK
|
||||||
body = json.loads(response.content)
|
body = json.loads(response.content)
|
||||||
assert 'auth_id' in body
|
assert 'auth_id' in body
|
||||||
@ -62,19 +88,33 @@ def test_get_authority(client):
|
|||||||
assert 'user_key' not in body
|
assert 'user_key' not in body
|
||||||
assert 'host_key' not in body
|
assert 'host_key' not in body
|
||||||
|
|
||||||
def test_get_authority_fails(client):
|
def test_get_authority_doesnt_exist(client):
|
||||||
response = client.simulate_get('/authorities/' + str(uuid.uuid4()))
|
response = client.simulate_get('/authorities/' + random_uuid())
|
||||||
assert response.status == falcon.HTTP_NOT_FOUND
|
assert response.status == falcon.HTTP_NOT_FOUND
|
||||||
|
|
||||||
|
def test_get_authority_with_bad_uuid(client):
|
||||||
|
response = client.simulate_get('/authorities/foobar')
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
def user_request(auth=auth_id, user_id=user_id, pub_key=user_pub_key):
|
def user_request(auth=auth_id, user_id=user_id, pub_key=user_pub_key):
|
||||||
return {
|
return {
|
||||||
'user_id': user_id,
|
'user_id': user_id,
|
||||||
'auth_id': auth,
|
'auth_id': auth,
|
||||||
'key.pub': pub_key
|
'key.pub': pub_key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def test_post_user_bad_uuid(client):
|
||||||
|
for key in ['user_id', 'auth_id']:
|
||||||
|
body = user_request()
|
||||||
|
body[key] = 'foobar'
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/usercerts',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
def test_post_user(client, db):
|
def test_post_user(client):
|
||||||
body = user_request()
|
body = user_request()
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/usercerts',
|
'/usercerts',
|
||||||
@ -89,7 +129,7 @@ def test_post_user(client, db):
|
|||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_user'])
|
@pytest.mark.dependency(depends=['test_post_user'])
|
||||||
def test_get_user(client):
|
def test_get_user(client):
|
||||||
response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint)
|
response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint)
|
||||||
assert response.status == falcon.HTTP_OK
|
assert response.status == falcon.HTTP_OK
|
||||||
body = json.loads(response.content)
|
body = json.loads(response.content)
|
||||||
assert 'user_id' in body
|
assert 'user_id' in body
|
||||||
@ -98,14 +138,18 @@ def test_get_user(client):
|
|||||||
assert 'key-cert.pub' in body
|
assert 'key-cert.pub' in body
|
||||||
assert body['auth_id'] == auth_id
|
assert body['auth_id'] == auth_id
|
||||||
|
|
||||||
def test_get_user_fails(client):
|
def test_get_user_doesnt_exist(client):
|
||||||
response = client.simulate_get('/usercerts/' + str(uuid.uuid4()) + '/' + user_fingerprint)
|
response = client.simulate_get('/usercerts/' + random_uuid() + '/' + user_fingerprint)
|
||||||
assert response.status == falcon.HTTP_NOT_FOUND
|
assert response.status == falcon.HTTP_NOT_FOUND
|
||||||
|
|
||||||
|
def test_get_user_with_bad_uuid(client):
|
||||||
|
response = client.simulate_get('/usercerts/foobar/' + user_fingerprint)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_user'])
|
@pytest.mark.dependency(depends=['test_post_user'])
|
||||||
def test_post_second_cert_same_user(client):
|
def test_post_second_cert_same_user(client):
|
||||||
key = RSA.generate(2048)
|
key = RSA.generate(2048)
|
||||||
pub_key = key.publickey().exportKey('OpenSSH')
|
pub_key = key.publickey().exportKey('OpenSSH')
|
||||||
body = user_request(pub_key=pub_key)
|
body = user_request(pub_key=pub_key)
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/usercerts',
|
'/usercerts',
|
||||||
@ -118,8 +162,8 @@ def test_post_second_cert_same_user(client):
|
|||||||
assert location[2] == user_id
|
assert location[2] == user_id
|
||||||
assert location[3] == sshpubkeys.SSHKey(pub_key).hash()
|
assert location[3] == sshpubkeys.SSHKey(pub_key).hash()
|
||||||
|
|
||||||
def test_post_user_unknown_auth(client, db):
|
def test_post_user_unknown_auth(client):
|
||||||
body = user_request(auth=str(uuid.uuid4()))
|
body = user_request(auth=random_uuid())
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/usercerts',
|
'/usercerts',
|
||||||
body=json.dumps(body)
|
body=json.dumps(body)
|
||||||
@ -141,7 +185,7 @@ def token_request(auth=auth_id, host=host_id):
|
|||||||
'host_id': host,
|
'host_id': host,
|
||||||
'auth_id': auth,
|
'auth_id': auth,
|
||||||
'hostname': 'testname.local'
|
'hostname': 'testname.local'
|
||||||
}
|
}
|
||||||
|
|
||||||
def host_request(token, host=host_id, pub_key=host_pub_key):
|
def host_request(token, host=host_id, pub_key=host_pub_key):
|
||||||
return {
|
return {
|
||||||
@ -150,14 +194,26 @@ def host_request(token, host=host_id, pub_key=host_pub_key):
|
|||||||
'key.pub': pub_key
|
'key.pub': pub_key
|
||||||
}
|
}
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
def vendordata_request(auth, host):
|
||||||
def test_post_novavendordata(client, db):
|
return {
|
||||||
host = str(uuid.uuid4())
|
|
||||||
req = {
|
|
||||||
'instance-id': host,
|
'instance-id': host,
|
||||||
'project-id': auth_id,
|
'project-id': auth,
|
||||||
'hostname': 'mytest.testing'
|
'hostname': 'mytest.testing'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def test_post_vendordata_bad_uuid(client):
|
||||||
|
for key in ['instance-id', 'project-id']:
|
||||||
|
body = vendordata_request(auth_id, host_id)
|
||||||
|
body[key] = 'foobar'
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/novavendordata',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
|
def test_post_novavendordata(client):
|
||||||
|
req = vendordata_request(auth_id, random_uuid())
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/novavendordata',
|
'/novavendordata',
|
||||||
body=json.dumps(req)
|
body=json.dumps(req)
|
||||||
@ -174,8 +230,28 @@ def test_post_novavendordata(client, db):
|
|||||||
assert 'principals' in vendordata
|
assert 'principals' in vendordata
|
||||||
assert vendordata['principals'] == 'admin'
|
assert vendordata['principals'] == 'admin'
|
||||||
|
|
||||||
|
def test_post_token_bad_uuid(client):
|
||||||
|
for key in ['auth_id', 'host_id']:
|
||||||
|
body = token_request()
|
||||||
|
body[key] = 'foobar'
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/hosttokens',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
|
def test_post_host_bad_uuid(client):
|
||||||
|
for key in ['token_id', 'host_id']:
|
||||||
|
body = host_request(random_uuid())
|
||||||
|
body[key] = 'foobar'
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/hosttokens',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
def test_post_token_and_host(client, db):
|
def test_post_token_and_host(client):
|
||||||
token = token_request()
|
token = token_request()
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/hosttokens',
|
'/hosttokens',
|
||||||
@ -189,7 +265,7 @@ def test_post_token_and_host(client, db):
|
|||||||
global token_id
|
global token_id
|
||||||
token_id = location_path[-1]
|
token_id = location_path[-1]
|
||||||
# Verify that it's a valid UUID
|
# Verify that it's a valid UUID
|
||||||
uuid.UUID(token_id)
|
uuid.UUID(token_id, version=4)
|
||||||
host = host_request(token_id)
|
host = host_request(token_id)
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/hostcerts',
|
'/hostcerts',
|
||||||
@ -204,7 +280,7 @@ def test_post_token_and_host(client, db):
|
|||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||||
def test_get_host(client):
|
def test_get_host(client):
|
||||||
response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint)
|
response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint)
|
||||||
assert response.status == falcon.HTTP_OK
|
assert response.status == falcon.HTTP_OK
|
||||||
body = json.loads(response.content)
|
body = json.loads(response.content)
|
||||||
assert 'host_id' in body
|
assert 'host_id' in body
|
||||||
@ -215,12 +291,16 @@ def test_get_host(client):
|
|||||||
assert body['fingerprint'] == host_fingerprint
|
assert body['fingerprint'] == host_fingerprint
|
||||||
assert body['auth_id'] == auth_id
|
assert body['auth_id'] == auth_id
|
||||||
|
|
||||||
def test_get_host_fails(client):
|
def test_get_host_doesnt_exist(client):
|
||||||
response = client.simulate_get('/hostcerts/' + str(uuid.uuid4()) + '/' + host_fingerprint)
|
response = client.simulate_get('/hostcerts/' + random_uuid() + '/' + host_fingerprint)
|
||||||
assert response.status == falcon.HTTP_NOT_FOUND
|
assert response.status == falcon.HTTP_NOT_FOUND
|
||||||
|
|
||||||
def test_post_token_unknown_auth(client, db):
|
def test_get_host_with_bad_uuid(client):
|
||||||
token = token_request(str(uuid.uuid4()))
|
response = client.simulate_get('/hostcerts/foobar/' + host_fingerprint)
|
||||||
|
assert response.status == falcon.HTTP_BAD_REQUEST
|
||||||
|
|
||||||
|
def test_post_token_unknown_auth(client):
|
||||||
|
token = token_request(random_uuid())
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/hosttokens',
|
'/hosttokens',
|
||||||
body=json.dumps(token)
|
body=json.dumps(token)
|
||||||
@ -228,8 +308,8 @@ def test_post_token_unknown_auth(client, db):
|
|||||||
assert response.status == falcon.HTTP_NOT_FOUND
|
assert response.status == falcon.HTTP_NOT_FOUND
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
def test_post_host_with_bogus_token(client, db):
|
def test_post_host_with_bogus_token(client):
|
||||||
host = host_request(str(uuid.uuid4()), str(uuid.uuid4()))
|
host = host_request(random_uuid(), random_uuid())
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/hostcerts',
|
'/hostcerts',
|
||||||
body=json.dumps(host)
|
body=json.dumps(host)
|
||||||
@ -237,7 +317,7 @@ def test_post_host_with_bogus_token(client, db):
|
|||||||
assert response.status == falcon.HTTP_NOT_FOUND
|
assert response.status == falcon.HTTP_NOT_FOUND
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||||
def test_post_host_with_wrong_host_id(client, db):
|
def test_post_host_with_wrong_host_id(client):
|
||||||
# Get a new token for the same host_id as the base test.
|
# Get a new token for the same host_id as the base test.
|
||||||
token = token_request()
|
token = token_request()
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
@ -251,8 +331,8 @@ def test_post_host_with_wrong_host_id(client, db):
|
|||||||
# Use the token with a different host_id than it was created for.
|
# Use the token with a different host_id than it was created for.
|
||||||
# Use a different public key to avoid other error conditions.
|
# Use a different public key to avoid other error conditions.
|
||||||
key = RSA.generate(2048)
|
key = RSA.generate(2048)
|
||||||
pub_key = key.publickey().exportKey('OpenSSH')
|
pub_key = key.publickey().exportKey('OpenSSH')
|
||||||
host = host_request(location_path[-1], str(uuid.uuid4()), pub_key)
|
host = host_request(location_path[-1], random_uuid(), pub_key)
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/hostcerts',
|
'/hostcerts',
|
||||||
body=json.dumps(host)
|
body=json.dumps(host)
|
||||||
@ -279,7 +359,7 @@ def test_post_host_same_public_key_fails(client):
|
|||||||
assert response.status == falcon.HTTP_CONFLICT
|
assert response.status == falcon.HTTP_CONFLICT
|
||||||
|
|
||||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||||
def test_post_host_with_used_token(client, db):
|
def test_post_host_with_used_token(client):
|
||||||
# Re-use the token from the test this depends on.
|
# Re-use the token from the test this depends on.
|
||||||
# Use the same host_id and different public key to avoid other errors.
|
# Use the same host_id and different public key to avoid other errors.
|
||||||
key = RSA.generate(2048)
|
key = RSA.generate(2048)
|
||||||
@ -289,4 +369,4 @@ def test_post_host_with_used_token(client, db):
|
|||||||
'/hostcerts',
|
'/hostcerts',
|
||||||
body=json.dumps(host)
|
body=json.dumps(host)
|
||||||
)
|
)
|
||||||
assert response.status == falcon.HTTP_FORBIDDEN
|
assert response.status == falcon.HTTP_FORBIDDEN
|
||||||
|
Loading…
x
Reference in New Issue
Block a user