From c81af303b01ecb02e9fb05b6efca1431b3b77116 Mon Sep 17 00:00:00 2001 From: Pino de Candia Date: Mon, 30 Oct 2017 23:10:11 +0000 Subject: [PATCH] Added validation for API objects on POST. --- tatu/api/models.py | 125 +++++++++++++++++++++------------ tatu/tests/test_app.py | 156 +++++++++++++++++++++++++++++++---------- 2 files changed, 198 insertions(+), 83 deletions(-) diff --git a/tatu/api/models.py b/tatu/api/models.py index c87be4f..802708e 100644 --- a/tatu/api/models.py +++ b/tatu/api/models.py @@ -1,24 +1,51 @@ import falcon import json +import uuid from tatu.db import models as db from Crypto.PublicKey import RSA +def validate_uuid(string): + try: + val = uuid.UUID(string, version=4) + except ValueError: + msg = '{} is not a valid UUID'.format(string) + raise falcon.HTTPBadRequest('Bad request', msg) + +def validate_uuids(req, params): + id_keys = ['token_id', 'auth_id', 'host_id', 'user_id', 'project-id', 'instance-id'] + if req.method in ('POST', 'PUT'): + for key in id_keys: + if key in req.body: + validate_uuid(req.body[key]) + for key in id_keys: + if key in params: + validate_uuid(params[key]) + +def validate(req, resp, resource, params): + if req.content_length: + # Store the body since we cannot read the stream again later + req.body = json.load(req.stream) + elif req.method in ('POST', 'PUT'): + raise falcon.HTTPBadRequest('The POST/PUT request is missing a body.') + validate_uuids(req, params) class Authorities(object): + @falcon.before(validate) def on_post(self, req, resp): - body = None - if req.content_length: - body = json.load(req.stream) - db.createAuthority( - self.session, - body['auth_id'], - ) + try: + db.createAuthority( + self.session, + req.body['auth_id'], + ) + except KeyError as e: + raise falcon.HTTPBadRequest(str(e)) resp.status = falcon.HTTP_201 - resp.location = '/authorities/' + body['auth_id'] + resp.location = '/authorities/' + req.body['auth_id'] class Authority(object): + @falcon.before(validate) def on_get(self, req, resp, auth_id): auth = db.getAuthority(self.session, auth_id) if auth is None: @@ -38,22 +65,24 @@ class Authority(object): class UserCerts(object): + @falcon.before(validate) def on_post(self, req, resp): - body = None - if req.content_length: - body = json.load(req.stream) # TODO: validation - user = db.createUserCert( - self.session, - body['user_id'], - body['auth_id'], - body['key.pub'] - ) + try: + user = db.createUserCert( + self.session, + req.body['user_id'], + req.body['auth_id'], + req.body['key.pub'] + ) + except KeyError as e: + raise falcon.HTTPBadRequest(str(e)) resp.status = falcon.HTTP_201 resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint class UserCert(object): + @falcon.before(validate) def on_get(self, req, resp, user_id, fingerprint): user = db.getUserCert(self.session, user_id, fingerprint) if user is None: @@ -70,21 +99,25 @@ class UserCert(object): class HostCerts(object): + @falcon.before(validate) def on_post(self, req, resp): - body = None - if req.content_length: - body = json.load(req.stream) - host = db.createHostCert( - self.session, - body['token_id'], - body['host_id'], - body['key.pub'] - ) + # Note that we could have found the host_id using the token_id. + # But requiring the host_id makes it a little harder to steal the token. + try: + host = db.createHostCert( + self.session, + req.body['token_id'], + req.body['host_id'], + req.body['key.pub'] + ) + except KeyError as e: + raise falcon.HTTPBadRequest(str(e)) resp.status = falcon.HTTP_201 resp.location = '/hostcerts/' + host.host_id + '/' + host.fingerprint class HostCert(object): + @falcon.before(validate) def on_get(self, req, resp, host_id, fingerprint): host = db.getHostCert(self.session, host_id, fingerprint) if host is None: @@ -101,21 +134,23 @@ class HostCert(object): class Tokens(object): + @falcon.before(validate) def on_post(self, req, resp): - body = None - if req.content_length: - body = json.load(req.stream) - token = db.createToken( - self.session, - body['host_id'], - body['auth_id'], - body['hostname'] - ) + try: + token = db.createToken( + self.session, + req.body['host_id'], + req.body['auth_id'], + req.body['hostname'] + ) + except KeyError as e: + raise falcon.HTTPBadRequest(str(e)) resp.status = falcon.HTTP_201 resp.location = '/hosttokens/' + token.token_id class NovaVendorData(object): + @falcon.before(validate) def on_post(self, req, resp): # An example of the data nova sends to vendordata services: # { @@ -126,16 +161,16 @@ class NovaVendorData(object): # "project-id": "039d104b7a5c4631b4ba6524d0b9e981", # "user-data": null # } - body = None - if req.content_length: - body = json.load(req.stream) - token = db.createToken( - self.session, - body['instance-id'], - body['project-id'], - body['hostname'] - ) - auth = db.getAuthority(self.session, body['project-id']) + try: + token = db.createToken( + self.session, + req.body['instance-id'], + req.body['project-id'], + req.body['hostname'] + ) + except KeyError as e: + raise falcon.HTTPBadRequest(str(e)) + auth = db.getAuthority(self.session, req.body['project-id']) if auth is None: resp.status = falcon.HTTP_NOT_FOUND return diff --git a/tatu/tests/test_app.py b/tatu/tests/test_app.py index 27e4f7d..8887ee6 100644 --- a/tatu/tests/test_app.py +++ b/tatu/tests/test_app.py @@ -19,26 +19,29 @@ def client(db): api = create_app(db) return testing.TestClient(api) +def random_uuid(): + return str(uuid.uuid4()) + token_id = '' -host_id = str(uuid.uuid4()) +host_id = random_uuid() host_key = RSA.generate(2048) -host_pub_key = host_key.publickey().exportKey('OpenSSH') +host_pub_key = host_key.publickey().exportKey('OpenSSH') host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash() -user_id = str(uuid.uuid4()) +user_id = random_uuid() user_key = RSA.generate(2048) -user_pub_key = user_key.publickey().exportKey('OpenSSH') +user_pub_key = user_key.publickey().exportKey('OpenSSH') user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash() -auth_id = str(uuid.uuid4()) +auth_id = random_uuid() auth_user_pub_key = None @pytest.mark.dependency() -def test_post_authority(client, db): +def test_post_authority(client): body = { 'auth_id': auth_id, - } + } response = client.simulate_post( '/authorities', body=json.dumps(body) @@ -49,9 +52,32 @@ def test_post_authority(client, db): #auth = session.query(Authority).get(auth_id) #assert auth is not None +def test_post_no_body(client): + for path in ['/authorities', '/usercerts', '/hosttokens', + '/hostcerts', '/novavendordata']: + response = client.simulate_post(path) + assert response.status == falcon.HTTP_BAD_REQUEST + +def test_post_empty_body(client): + bodystr = json.dumps({}) + for path in ['/authorities', '/usercerts', '/hosttokens', + '/hostcerts', '/novavendordata']: + response = client.simulate_post(path, body=bodystr) + assert response.status == falcon.HTTP_BAD_REQUEST + +def test_post_authority_bad_uuid(client): + body = { + 'auth_id': 'foobar', + } + response = client.simulate_post( + '/authorities', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_BAD_REQUEST + @pytest.mark.dependency(depends=['test_post_authority']) def test_get_authority(client): - response = client.simulate_get('/authorities/' + auth_id) + response = client.simulate_get('/authorities/' + auth_id) assert response.status == falcon.HTTP_OK body = json.loads(response.content) assert 'auth_id' in body @@ -62,19 +88,33 @@ def test_get_authority(client): assert 'user_key' not in body assert 'host_key' not in body -def test_get_authority_fails(client): - response = client.simulate_get('/authorities/' + str(uuid.uuid4())) +def test_get_authority_doesnt_exist(client): + response = client.simulate_get('/authorities/' + random_uuid()) assert response.status == falcon.HTTP_NOT_FOUND +def test_get_authority_with_bad_uuid(client): + response = client.simulate_get('/authorities/foobar') + assert response.status == falcon.HTTP_BAD_REQUEST + def user_request(auth=auth_id, user_id=user_id, pub_key=user_pub_key): return { 'user_id': user_id, 'auth_id': auth, 'key.pub': pub_key - } + } + +def test_post_user_bad_uuid(client): + for key in ['user_id', 'auth_id']: + body = user_request() + body[key] = 'foobar' + response = client.simulate_post( + '/usercerts', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_BAD_REQUEST @pytest.mark.dependency(depends=['test_post_authority']) -def test_post_user(client, db): +def test_post_user(client): body = user_request() response = client.simulate_post( '/usercerts', @@ -89,7 +129,7 @@ def test_post_user(client, db): @pytest.mark.dependency(depends=['test_post_user']) def test_get_user(client): - response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint) + response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint) assert response.status == falcon.HTTP_OK body = json.loads(response.content) assert 'user_id' in body @@ -98,14 +138,18 @@ def test_get_user(client): assert 'key-cert.pub' in body assert body['auth_id'] == auth_id -def test_get_user_fails(client): - response = client.simulate_get('/usercerts/' + str(uuid.uuid4()) + '/' + user_fingerprint) +def test_get_user_doesnt_exist(client): + response = client.simulate_get('/usercerts/' + random_uuid() + '/' + user_fingerprint) assert response.status == falcon.HTTP_NOT_FOUND +def test_get_user_with_bad_uuid(client): + response = client.simulate_get('/usercerts/foobar/' + user_fingerprint) + assert response.status == falcon.HTTP_BAD_REQUEST + @pytest.mark.dependency(depends=['test_post_user']) def test_post_second_cert_same_user(client): key = RSA.generate(2048) - pub_key = key.publickey().exportKey('OpenSSH') + pub_key = key.publickey().exportKey('OpenSSH') body = user_request(pub_key=pub_key) response = client.simulate_post( '/usercerts', @@ -118,8 +162,8 @@ def test_post_second_cert_same_user(client): assert location[2] == user_id assert location[3] == sshpubkeys.SSHKey(pub_key).hash() -def test_post_user_unknown_auth(client, db): - body = user_request(auth=str(uuid.uuid4())) +def test_post_user_unknown_auth(client): + body = user_request(auth=random_uuid()) response = client.simulate_post( '/usercerts', body=json.dumps(body) @@ -141,7 +185,7 @@ def token_request(auth=auth_id, host=host_id): 'host_id': host, 'auth_id': auth, 'hostname': 'testname.local' - } + } def host_request(token, host=host_id, pub_key=host_pub_key): return { @@ -150,14 +194,26 @@ def host_request(token, host=host_id, pub_key=host_pub_key): 'key.pub': pub_key } -@pytest.mark.dependency(depends=['test_post_authority']) -def test_post_novavendordata(client, db): - host = str(uuid.uuid4()) - req = { +def vendordata_request(auth, host): + return { 'instance-id': host, - 'project-id': auth_id, + 'project-id': auth, 'hostname': 'mytest.testing' } + +def test_post_vendordata_bad_uuid(client): + for key in ['instance-id', 'project-id']: + body = vendordata_request(auth_id, host_id) + body[key] = 'foobar' + response = client.simulate_post( + '/novavendordata', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_BAD_REQUEST + +@pytest.mark.dependency(depends=['test_post_authority']) +def test_post_novavendordata(client): + req = vendordata_request(auth_id, random_uuid()) response = client.simulate_post( '/novavendordata', body=json.dumps(req) @@ -174,8 +230,28 @@ def test_post_novavendordata(client, db): assert 'principals' in vendordata assert vendordata['principals'] == 'admin' +def test_post_token_bad_uuid(client): + for key in ['auth_id', 'host_id']: + body = token_request() + body[key] = 'foobar' + response = client.simulate_post( + '/hosttokens', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_BAD_REQUEST + +def test_post_host_bad_uuid(client): + for key in ['token_id', 'host_id']: + body = host_request(random_uuid()) + body[key] = 'foobar' + response = client.simulate_post( + '/hosttokens', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_BAD_REQUEST + @pytest.mark.dependency(depends=['test_post_authority']) -def test_post_token_and_host(client, db): +def test_post_token_and_host(client): token = token_request() response = client.simulate_post( '/hosttokens', @@ -189,7 +265,7 @@ def test_post_token_and_host(client, db): global token_id token_id = location_path[-1] # Verify that it's a valid UUID - uuid.UUID(token_id) + uuid.UUID(token_id, version=4) host = host_request(token_id) response = client.simulate_post( '/hostcerts', @@ -204,7 +280,7 @@ def test_post_token_and_host(client, db): @pytest.mark.dependency(depends=['test_post_token_and_host']) def test_get_host(client): - response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint) + response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint) assert response.status == falcon.HTTP_OK body = json.loads(response.content) assert 'host_id' in body @@ -215,12 +291,16 @@ def test_get_host(client): assert body['fingerprint'] == host_fingerprint assert body['auth_id'] == auth_id -def test_get_host_fails(client): - response = client.simulate_get('/hostcerts/' + str(uuid.uuid4()) + '/' + host_fingerprint) +def test_get_host_doesnt_exist(client): + response = client.simulate_get('/hostcerts/' + random_uuid() + '/' + host_fingerprint) assert response.status == falcon.HTTP_NOT_FOUND -def test_post_token_unknown_auth(client, db): - token = token_request(str(uuid.uuid4())) +def test_get_host_with_bad_uuid(client): + response = client.simulate_get('/hostcerts/foobar/' + host_fingerprint) + assert response.status == falcon.HTTP_BAD_REQUEST + +def test_post_token_unknown_auth(client): + token = token_request(random_uuid()) response = client.simulate_post( '/hosttokens', body=json.dumps(token) @@ -228,8 +308,8 @@ def test_post_token_unknown_auth(client, db): assert response.status == falcon.HTTP_NOT_FOUND @pytest.mark.dependency(depends=['test_post_authority']) -def test_post_host_with_bogus_token(client, db): - host = host_request(str(uuid.uuid4()), str(uuid.uuid4())) +def test_post_host_with_bogus_token(client): + host = host_request(random_uuid(), random_uuid()) response = client.simulate_post( '/hostcerts', body=json.dumps(host) @@ -237,7 +317,7 @@ def test_post_host_with_bogus_token(client, db): assert response.status == falcon.HTTP_NOT_FOUND @pytest.mark.dependency(depends=['test_post_token_and_host']) -def test_post_host_with_wrong_host_id(client, db): +def test_post_host_with_wrong_host_id(client): # Get a new token for the same host_id as the base test. token = token_request() response = client.simulate_post( @@ -251,8 +331,8 @@ def test_post_host_with_wrong_host_id(client, db): # Use the token with a different host_id than it was created for. # Use a different public key to avoid other error conditions. key = RSA.generate(2048) - pub_key = key.publickey().exportKey('OpenSSH') - host = host_request(location_path[-1], str(uuid.uuid4()), pub_key) + pub_key = key.publickey().exportKey('OpenSSH') + host = host_request(location_path[-1], random_uuid(), pub_key) response = client.simulate_post( '/hostcerts', body=json.dumps(host) @@ -279,7 +359,7 @@ def test_post_host_same_public_key_fails(client): assert response.status == falcon.HTTP_CONFLICT @pytest.mark.dependency(depends=['test_post_token_and_host']) -def test_post_host_with_used_token(client, db): +def test_post_host_with_used_token(client): # Re-use the token from the test this depends on. # Use the same host_id and different public key to avoid other errors. key = RSA.generate(2048) @@ -289,4 +369,4 @@ def test_post_host_with_used_token(client, db): '/hostcerts', body=json.dumps(host) ) - assert response.status == falcon.HTTP_FORBIDDEN + assert response.status == falcon.HTTP_FORBIDDEN