From aae01194663b7dccc659c810f313f69129f74cc7 Mon Sep 17 00:00:00 2001 From: Pino de Candia Date: Sat, 28 Oct 2017 08:13:54 +0000 Subject: [PATCH] Added tests for GET and error cases. --- tatu/api/app.py | 2 +- tatu/api/models.py | 61 +++++++++--- tatu/db/models.py | 85 ++++++++++------- tatu/tests/test_app.py | 207 +++++++++++++++++++++++++++++------------ 4 files changed, 244 insertions(+), 111 deletions(-) diff --git a/tatu/api/app.py b/tatu/api/app.py index 6f21f81..4d48861 100644 --- a/tatu/api/app.py +++ b/tatu/api/app.py @@ -5,7 +5,7 @@ from tatu.db.persistence import SQLAlchemySessionManager def create_app(sa): api = falcon.API(middleware=[sa]) api.add_route('/authorities', models.Authorities()) - api.add_route('/authorities/{ca_id}', models.Authority()) + api.add_route('/authorities/{auth_id}', models.Authority()) api.add_route('/usercerts', models.UserCerts()) api.add_route('/usercerts/{user_id}/{fingerprint}', models.UserCert()) api.add_route('/hostcerts', models.HostCerts()) diff --git a/tatu/api/models.py b/tatu/api/models.py index f300613..883d7c4 100644 --- a/tatu/api/models.py +++ b/tatu/api/models.py @@ -1,6 +1,7 @@ import falcon import json from tatu.db import models as db +from Crypto.PublicKey import RSA class Authorities(object): @@ -12,18 +13,30 @@ class Authorities(object): db.createAuthority( self.session, body['auth_id'], - body['user_pubkey'], - body['user_privkey'], - body['host_pubkey'], - body['host_privkey'] + user_key=body['user_key'], + host_key=body['host_key'], ) resp.status = falcon.HTTP_201 resp.location = '/authorities/' + body['auth_id'] class Authority(object): - def on_get(self, req, resp, ca_id): - resp.status = falcon.HTTP_400 + def on_get(self, req, resp, auth_id): + auth = db.getAuthority(self.session, auth_id) + if auth is None: + resp.status = falcon.HTTP_NOT_FOUND + return + user_key = RSA.importKey(auth.user_key) + user_pub_key = user_key.publickey().exportKey('OpenSSH') + host_key = RSA.importKey(auth.host_key) + host_pub_key = host_key.publickey().exportKey('OpenSSH') + body = { + 'auth_id': auth_id, + 'user_key.pub': user_pub_key, + 'host_key.pub': host_pub_key + } + resp.body = json.dumps(body) + resp.status = falcon.HTTP_OK class UserCerts(object): @@ -36,7 +49,7 @@ class UserCerts(object): self.session, body['user_id'], body['auth_id'], - body['pub_key'] + body['key.pub'] ) resp.status = falcon.HTTP_201 resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint @@ -44,7 +57,18 @@ class UserCerts(object): class UserCert(object): def on_get(self, req, resp, user_id, fingerprint): - resp.status = falcon.HTTP_400 + user = db.getUserCert(self.session, user_id, fingerprint) + if user is None: + resp.status = falcon.HTTP_NOT_FOUND + return + body = { + 'user_id': user.user_id, + 'fingerprint': user.fingerprint, + 'auth_id': user.auth_id, + 'key-cert.pub': user.cert + } + resp.body = json.dumps(body) + resp.status = falcon.HTTP_OK class HostCerts(object): @@ -55,16 +79,27 @@ class HostCerts(object): host = db.createHostCert( self.session, body['token_id'], - body['instance_id'], - body['pub_key'] + body['host_id'], + body['key.pub'] ) resp.status = falcon.HTTP_201 - resp.location = '/hostcerts/' + host.instance_id + '/' + host.fingerprint + resp.location = '/hostcerts/' + host.host_id + '/' + host.fingerprint class HostCert(object): def on_get(self, req, resp, host_id, fingerprint): - resp.status = falcon.HTTP_400 + host = db.getHostCert(self.session, host_id, fingerprint) + if host is None: + resp.status = falcon.HTTP_NOT_FOUND + return + body = { + 'host_id': host.host_id, + 'fingerprint': host.fingerprint, + 'auth_id': host.auth_id, + 'key-cert.pub': host.pubkey, + } + resp.body = json.dumps(body) + resp.status = falcon.HTTP_OK class Token(object): @@ -74,7 +109,7 @@ class Token(object): body = json.load(req.stream) token = db.createToken( self.session, - body['instance_id'], + body['host_id'], body['auth_id'], body['hostname'] ) diff --git a/tatu/db/models.py b/tatu/db/models.py index 38ef3f2..e9e0537 100644 --- a/tatu/db/models.py +++ b/tatu/db/models.py @@ -6,6 +6,7 @@ import sshpubkeys import uuid import os from tatu.utils import generateCert +from Crypto.PublicKey import RSA Base = declarative_base() @@ -16,20 +17,19 @@ class Authority(Base): __tablename__ = 'authorities' auth_id = sa.Column(sa.String(36), primary_key=True) - user_pubkey = sa.Column(sa.Text) - user_privkey = sa.Column(sa.Text) - host_pubkey = sa.Column(sa.Text) - host_privkey = sa.Column(sa.Text) + user_key = sa.Column(sa.Text) + host_key = sa.Column(sa.Text) -def createAuthority(session, auth_id, user_pub, user_priv, host_pub, host_priv): - auth = Authority(auth_id=auth_id, - user_pubkey=user_pub, - user_privkey=user_priv, - host_pubkey=host_pub, - host_privkey=host_priv) - session.add(auth) - session.commit() - return auth +def getAuthority(session, auth_id): + return session.query(Authority).get(auth_id) + +def createAuthority(session, auth_id, user_key, host_key): + auth = Authority(auth_id=auth_id, + user_key=user_key, + host_key=host_key) + session.add(auth) + session.commit() + return auth class UserCert(Base): __tablename__ = 'user_certs' @@ -37,24 +37,29 @@ class UserCert(Base): user_id = sa.Column(sa.String(36), primary_key=True) fingerprint = sa.Column(sa.String(36), primary_key=True) auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id')) - pubkey = sa.Column(sa.Text) cert = sa.Column(sa.Text) +def getUserCert(session, user_id, fingerprint): + return session.query(UserCert).get([user_id, fingerprint]) + def createUserCert(session, user_id, auth_id, pub): - user = UserCert( - user_id=user_id, - auth_id=auth_id, - pubkey=pub, - ) - # Generate the fingerprint from the public key - user.fingerprint = sshpubkeys.SSHKey(pub).hash() # Retrieve the authority's private key and generate the certificate - auth = session.query(Authority).get(auth_id) + auth = getAuthority(session, auth_id) if auth is None: raise falcon.HTTPNotFound() - user.cert = generateCert(auth.user_privkey, pub) - if user.cert == '': + fingerprint = sshpubkeys.SSHKey(pub).hash() + certRecord = session.query(UserCert).get([user_id, fingerprint]) + if certRecord is not None: + raise falcon.HTTPConflict('This public key is already signed.') + cert = generateCert(auth.user_key, pub) + if cert is None: raise falcon.HTTPInternalServerError("Failed to generate the certificate") + user = UserCert( + user_id=user_id, + fingerprint=fingerprint, + auth_id=auth_id, + cert=cert + ) session.add(user) session.commit() return user @@ -65,18 +70,18 @@ class Token(Base): token_id = sa.Column(sa.String(36), primary_key=True, default=generate_uuid) auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id')) - instance_id = sa.Column(sa.String(36)) + host_id = sa.Column(sa.String(36)) hostname = sa.Column(sa.String(36)) used = sa.Column(sa.Boolean, default=False) date_used = sa.Column(sa.DateTime, default=datetime.min) fingerprint_used = sa.Column(sa.String(36)) -def createToken(session, instance_id, auth_id, hostname): +def createToken(session, host_id, auth_id, hostname): # Validate the certificate authority - auth = session.query(Authority).get(auth_id) + auth = getAuthority(session, auth_id) if auth is None: raise falcon.HTTPNotFound() - token = Token(instance_id=instance_id, + token = Token(host_id=host_id, auth_id=auth_id, hostname=hostname) session.add(token) @@ -86,31 +91,39 @@ def createToken(session, instance_id, auth_id, hostname): class HostCert(Base): __tablename__ = 'host_certs' - instance_id = sa.Column(sa.String(36), primary_key=True) + host_id = sa.Column(sa.String(36), primary_key=True) fingerprint = sa.Column(sa.String(36), primary_key=True) + auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id')) token_id = sa.Column(sa.String(36), sa.ForeignKey('tokens.token_id')) pubkey = sa.Column(sa.Text) cert = sa.Column(sa.Text) hostname = sa.Column(sa.String(36)) -def createHostCert(session, token_id, instance_id, pub): +def getHostCert(session, host_id, fingerprint): + return session.query(HostCert).get([host_id, fingerprint]) + +def createHostCert(session, token_id, host_id, pub): token = session.query(Token).get(token_id) if token is None: raise falcon.HTTPNotFound() if token.used: raise falcon.HTTPForbidden(description='The presented token was previously used') - if token.instance_id != instance_id: + if token.host_id != host_id: raise falcon.HTTPForbidden(description='The token is not valid for this instance ID') - auth = session.query(Authority).get(token.auth_id) + auth = getAuthority(session, token.auth_id) if auth is None: raise falcon.HTTPNotFound() - cert = generateCert(auth.host_privkey, pub, token.hostname) + fingerprint = sshpubkeys.SSHKey(pub).hash() + certRecord = session.query(HostCert).get([host_id, fingerprint]) + if certRecord is not None: + raise falcon.HTTPConflict('This public key is already signed.') + cert = generateCert(auth.host_key, pub, token.hostname) if cert == '': raise falcon.HTTPInternalServerError("Failed to generate the certificate") - host = HostCert(instance_id=instance_id, - fingerprint=sshpubkeys.SSHKey(pub).hash(), + host = HostCert(host_id=host_id, + fingerprint=fingerprint, + auth_id=token.auth_id, token_id=token_id, - pubkey=pub, cert=cert, hostname=token.hostname) session.add(host) diff --git a/tatu/tests/test_app.py b/tatu/tests/test_app.py index 628d1fd..2ebcb76 100644 --- a/tatu/tests/test_app.py +++ b/tatu/tests/test_app.py @@ -12,25 +12,37 @@ import sshpubkeys @pytest.fixture def db(): - return SQLAlchemySessionManager() + return SQLAlchemySessionManager() @pytest.fixture def client(db): - api = create_app(db) - return testing.TestClient(api) + api = create_app(db) + return testing.TestClient(api) + +token_id = '' + +host_id = str(uuid.uuid4()) +host_key = RSA.generate(2048) +host_pub_key = host_key.publickey().exportKey('OpenSSH') +host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash() + +user_id = str(uuid.uuid4()) +user_key = RSA.generate(2048) +user_pub_key = user_key.publickey().exportKey('OpenSSH') +user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash() auth_id = str(uuid.uuid4()) +auth_user_key = RSA.generate(2048) +auth_host_key = RSA.generate(2048) +auth_user_pub_key = auth_user_key.publickey().exportKey('OpenSSH') +auth_host_pub_key = auth_host_key.publickey().exportKey('OpenSSH') @pytest.mark.dependency() def test_post_authority(client, db): - user_ca = RSA.generate(2048) - host_ca = RSA.generate(2048) body = { 'auth_id': auth_id, - 'user_privkey': user_ca.exportKey('PEM'), - 'user_pubkey': user_ca.publickey().exportKey('OpenSSH'), - 'host_privkey': host_ca.exportKey('PEM'), - 'host_pubkey': host_ca.publickey().exportKey('OpenSSH') + 'user_key': auth_user_key.exportKey('PEM'), + 'host_key': auth_host_key.exportKey('PEM'), } response = client.simulate_post( '/authorities', @@ -38,21 +50,33 @@ def test_post_authority(client, db): ) assert response.status == falcon.HTTP_CREATED assert response.headers['location'] == '/authorities/' + auth_id - #with db.Session() as session: - session = db.Session() - auth = session.query(Authority).get(auth_id) - assert auth is not None + #session = db.Session() + #auth = session.query(Authority).get(auth_id) + #assert auth is not None -def user_request(auth=auth_id, user_id=None): - if user_id is None: - user_id = str(uuid.uuid4()) - user_key = RSA.generate(2048) - pub_key = user_key.publickey().exportKey('OpenSSH') +@pytest.mark.dependency(depends=['test_post_authority']) +def test_get_authority(client): + response = client.simulate_get('/authorities/' + auth_id) + assert response.status == falcon.HTTP_OK + body = json.loads(response.content) + assert 'auth_id' in body + assert 'user_key.pub' in body + assert body['user_key.pub'] == auth_user_pub_key + assert 'host_key.pub' in body + assert body['host_key.pub'] == auth_host_pub_key + assert 'user_key' not in body + assert 'host_key' not in body + +def test_get_authority_fails(client): + response = client.simulate_get('/authorities/' + str(uuid.uuid4())) + assert response.status == falcon.HTTP_NOT_FOUND + +def user_request(auth=auth_id, user_id=user_id, pub_key=user_pub_key): return { 'user_id': user_id, 'auth_id': auth, - 'pub_key': pub_key - } + 'key.pub': pub_key + } @pytest.mark.dependency(depends=['test_post_authority']) def test_post_user(client, db): @@ -66,39 +90,73 @@ def test_post_user(client, db): location = response.headers['location'].split('/') assert location[1] == 'usercerts' assert location[2] == body['user_id'] - assert location[3] == sshpubkeys.SSHKey(body['pub_key']).hash() + assert location[3] == sshpubkeys.SSHKey(body['key.pub']).hash() -@pytest.mark.dependency(depends=['test_post_authority']) -def test_post_user_bad_auth(client, db): - body = user_request(str(uuid.uuid4())) +@pytest.mark.dependency(depends=['test_post_user']) +def test_get_user(client): + response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint) + assert response.status == falcon.HTTP_OK + body = json.loads(response.content) + assert 'user_id' in body + assert 'fingerprint' in body + assert 'auth_id' in body + assert 'key-cert.pub' in body + assert body['auth_id'] == auth_id + +def test_get_user_fails(client): + response = client.simulate_get('/usercerts/' + str(uuid.uuid4()) + '/' + user_fingerprint) + assert response.status == falcon.HTTP_NOT_FOUND + +@pytest.mark.dependency(depends=['test_post_user']) +def test_post_second_cert_same_user(client): + key = RSA.generate(2048) + pub_key = key.publickey().exportKey('OpenSSH') + body = user_request(pub_key=pub_key) + response = client.simulate_post( + '/usercerts', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_CREATED + assert 'location' in response.headers + location = response.headers['location'].split('/') + assert location[1] == 'usercerts' + assert location[2] == user_id + assert location[3] == sshpubkeys.SSHKey(pub_key).hash() + +def test_post_user_unknown_auth(client, db): + body = user_request(auth=str(uuid.uuid4())) response = client.simulate_post( '/usercerts', body=json.dumps(body) ) assert response.status == falcon.HTTP_NOT_FOUND -def token_request(auth=auth_id, instance_id=None): - if instance_id is None: - instance_id = str(uuid.uuid4()) +@pytest.mark.dependency(depends=['test_post_user']) +def test_post_same_user_same_key_fails(client): + # Show that using the same user ID and public key fails. + body = user_request() + response = client.simulate_post( + '/usercerts', + body=json.dumps(body) + ) + assert response.status == falcon.HTTP_CONFLICT + +def token_request(auth=auth_id, host=host_id): return { - 'instance_id': instance_id, + 'host_id': host, 'auth_id': auth, 'hostname': 'testname.local' } -def host_request(token_id, instance_id=None): - if instance_id is None: - instance_id = str(uuid.uuid4()) - host_key = RSA.generate(2048) - pub_key = str(host_key.publickey().exportKey('OpenSSH')) +def host_request(token, host=host_id, pub_key=host_pub_key): return { - 'token_id': token_id, - 'instance_id': instance_id, - 'pub_key': pub_key + 'token_id': token, + 'host_id': host, + 'key.pub': pub_key } @pytest.mark.dependency(depends=['test_post_authority']) -def test_host_cert_workflow(client, db): +def test_post_token_and_host(client, db): token = token_request() response = client.simulate_post( '/hosttokens', @@ -108,7 +166,12 @@ def test_host_cert_workflow(client, db): assert 'location' in response.headers location_path = response.headers['location'].split('/') assert location_path[1] == 'hosttokens' - host = host_request(location_path[-1], token['instance_id']) + # Store the token ID for other tests + global token_id + token_id = location_path[-1] + # Verify that it's a valid UUID + uuid.UUID(token_id) + host = host_request(token_id) response = client.simulate_post( '/hostcerts', body=json.dumps(host) @@ -117,11 +180,27 @@ def test_host_cert_workflow(client, db): assert 'location' in response.headers location = response.headers['location'].split('/') assert location[1] == 'hostcerts' - assert location[2] == host['instance_id'] - assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash() + assert location[2] == host_id + assert location[3] == sshpubkeys.SSHKey(host_pub_key).hash() -@pytest.mark.dependency(depends=['test_post_authority']) -def test_post_token_bad_auth(client, db): +@pytest.mark.dependency(depends=['test_post_token_and_host']) +def test_get_host(client): + response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint) + assert response.status == falcon.HTTP_OK + body = json.loads(response.content) + assert 'host_id' in body + assert 'fingerprint' in body + assert 'auth_id' in body + assert 'key-cert.pub' in body + assert body['host_id'] == host_id + assert body['fingerprint'] == host_fingerprint + assert body['auth_id'] == auth_id + +def test_get_host_fails(client): + response = client.simulate_get('/hostcerts/' + str(uuid.uuid4()) + '/' + host_fingerprint) + assert response.status == falcon.HTTP_NOT_FOUND + +def test_post_token_unknown_auth(client, db): token = token_request(str(uuid.uuid4())) response = client.simulate_post( '/hosttokens', @@ -131,15 +210,16 @@ def test_post_token_bad_auth(client, db): @pytest.mark.dependency(depends=['test_post_authority']) def test_post_host_with_bogus_token(client, db): - token = token_request(str(uuid.uuid4())) + host = host_request(str(uuid.uuid4()), str(uuid.uuid4())) response = client.simulate_post( - '/hosttokens', - body=json.dumps(token) + '/hostcerts', + body=json.dumps(host) ) assert response.status == falcon.HTTP_NOT_FOUND -@pytest.mark.dependency(depends=['test_post_authority']) -def test_post_host_with_wrong_instance_id(client, db): +@pytest.mark.dependency(depends=['test_post_token_and_host']) +def test_post_host_with_wrong_host_id(client, db): + # Get a new token for the same host_id as the base test. token = token_request() response = client.simulate_post( '/hosttokens', @@ -149,15 +229,20 @@ def test_post_host_with_wrong_instance_id(client, db): assert 'location' in response.headers location_path = response.headers['location'].split('/') assert location_path[1] == 'hosttokens' - # Use the token with a different instance_id - host = host_request(location_path[-1], str(uuid.uuid4())) + # Use the token with a different host_id than it was created for. + # Use a different public key to avoid other error conditions. + key = RSA.generate(2048) + pub_key = key.publickey().exportKey('OpenSSH') + host = host_request(location_path[-1], str(uuid.uuid4()), pub_key) response = client.simulate_post( '/hostcerts', body=json.dumps(host) ) -@pytest.mark.dependency(depends=['test_post_authority']) -def test_post_host_with_used_token(client, db): +@pytest.mark.dependency(depends=['test_post_token_and_host']) +def test_post_host_same_public_key_fails(client): + # Use a new token compared to the test this depends on. + # Show that using the same host ID and public key fails. token = token_request() response = client.simulate_post( '/hosttokens', @@ -167,20 +252,20 @@ def test_post_host_with_used_token(client, db): assert 'location' in response.headers location_path = response.headers['location'].split('/') assert location_path[1] == 'hosttokens' - # First, use the token to sign a host public key - host = host_request(location_path[-1], token['instance_id']) + host = host_request(location_path[-1]) response = client.simulate_post( '/hostcerts', body=json.dumps(host) ) - assert response.status == falcon.HTTP_CREATED - assert 'location' in response.headers - location = response.headers['location'].split('/') - assert location[1] == 'hostcerts' - assert location[2] == host['instance_id'] - assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash() - # Now try using the token a sceond time, same instance_id, different pub key - host = host_request(location_path[-1], token['instance_id']) + assert response.status == falcon.HTTP_CONFLICT + +@pytest.mark.dependency(depends=['test_post_token_and_host']) +def test_post_host_with_used_token(client, db): + # Re-use the token from the test this depends on. + # Use the same host_id and different public key to avoid other errors. + key = RSA.generate(2048) + pub_key = key.publickey().exportKey('OpenSSH') + host = host_request(token_id, host_id, pub_key) response = client.simulate_post( '/hostcerts', body=json.dumps(host)