From 93782dc2b8215188d44c0d60fc19fbfb3110ad8d Mon Sep 17 00:00:00 2001 From: Pino de Candia Date: Tue, 24 Oct 2017 17:58:57 +0000 Subject: [PATCH] Added first test. --- tatu/api/app.py | 10 +++++----- tatu/api/models.py | 4 ++++ tatu/db/models.py | 31 ++++--------------------------- tatu/db/persistence.py | 2 +- tatu/tests/__init__.py | 0 tatu/tests/test_app.py | 41 +++++++++++++++++++++++++++++++++++++++++ tatu/utils.py | 24 ++++++++++++++++++++++++ 7 files changed, 79 insertions(+), 33 deletions(-) create mode 100644 tatu/tests/__init__.py create mode 100644 tatu/tests/test_app.py create mode 100644 tatu/utils.py diff --git a/tatu/api/app.py b/tatu/api/app.py index c921725..802ef53 100644 --- a/tatu/api/app.py +++ b/tatu/api/app.py @@ -1,18 +1,18 @@ import falcon import models -from db.persistence import SQLAlchemySessionManager +from tatu.db.persistence import SQLAlchemySessionManager -def create_app(): - api = falcon.API(middleware=[SQLAlchemySessionManager()]) +def create_app(sa): + api = falcon.API(middleware=[sa]) api.add_route('/authorities', models.Authorities()) api.add_route('/authorities/{uuid}', models.Authority()) api.add_route('/users/{uuid}/certs', models.UserCerts()) api.add_route('/users/{uuid}/certs/{fingerprint}', models.UserCert()) api.add_route('/hosts/{uuid}/certs', models.HostCerts()) api.add_route('/hosts/{uuid}/certs/{fingerprint}', models.HostCert()) - api.add_route('/host_cert_token', models.Tokens()) + api.add_route('/host_cert_tokens', models.Token()) return api def get_app(): - return create_app() + return create_app(SQLAlchemySessionManager()) diff --git a/tatu/api/models.py b/tatu/api/models.py index e4bfcdb..27fe54e 100644 --- a/tatu/api/models.py +++ b/tatu/api/models.py @@ -37,3 +37,7 @@ class HostCert(object): def on_post(self, req, resp): resp.status = falcon.HTTP_400 +class Token(object): + + def on_post(self, req, resp): + resp.status = falcon.HTTP_400 diff --git a/tatu/db/models.py b/tatu/db/models.py index 525ae8d..590b52f 100644 --- a/tatu/db/models.py +++ b/tatu/db/models.py @@ -4,6 +4,7 @@ import sshpubkeys import uuid import subprocess import os +from tatu.utils import generateCert Base = declarative_base() @@ -40,30 +41,6 @@ class UserCert(Base): pubkey = sa.Column(sa.Text) cert = sa.Column(sa.Text) -def generateCert(auth_key, entity_key, host_name=None): - # Temporarily write the authority private key and entity public key to /tmp - ca_file = '/tmp'.join(uuid.uuid4().hex) - pub_prefix = uuid.uuid4().hex - pub_file = ''.join('/tmp/', pub_prefix, '.pub') - with open(ca_file, "w") as text_file: - text_file.write(auth_key) - with open(pub_file, "w") as text_file: - text_file.write(entity_key) - # Call keygen - if host_name is None: - subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file], shell=True) - else: - subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file], shell=True) - # Read the contents of the certificate file - cert_file = ''.join('/tmp/', pub_prefix, '-cert.pub') - cert = '' - with open(cert_file, 'r') as text_file: - cert = text_file.read() - # Delete temporary files - for file in [ca_file, pub_file, cert_file]: - os.remove(file) - return cert - def createUserCert(session, id, auth_id, pub, priv): with session: user = User(id=id, @@ -88,10 +65,10 @@ class Token(Base): default=generate_uuid) hostname = sa.Column(sa.String(36)) instance_id = sa.Column(sa.String(36)) - auth_id = sa.Column(sa.String(36), ForeignKey('authorities.id')) + auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id')) used = sa.Column(sa.Boolean) date_used = sa.Column(sa.Date) - fingerprint_used = sa.Column(sa.String(36), optional) + fingerprint_used = sa.Column(sa.String(36), default=False) def createToken(session, instance_id, auth_id, hostname): with session: @@ -127,7 +104,7 @@ def createHostCert(session, token_id, pub): if auth is None: raise falcon.HTTPNotFound("Unrecognized certificate authority") host = HostCert(id=token.instance_id, - fingerprint=sshpubkeys.SSHKey(pub).hash() + fingerprint=sshpubkeys.SSHKey(pub).hash(), token_id=token_id, pubkey=pub, cert=generateCert(auth.host_privkey, pub)) diff --git a/tatu/db/persistence.py b/tatu/db/persistence.py index ed52ca7..3b6fdce 100644 --- a/tatu/db/persistence.py +++ b/tatu/db/persistence.py @@ -15,7 +15,7 @@ class SQLAlchemySessionManager: def __init__(self): self.engine = create_engine(get_url()) - self.Session = scoped_session(sessionmaker(self._engine)) + self.Session = scoped_session(sessionmaker(self.engine)) def process_resource(self, req, resp, resource, params): resource.session = self.Session() diff --git a/tatu/tests/__init__.py b/tatu/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tatu/tests/test_app.py b/tatu/tests/test_app.py new file mode 100644 index 0000000..f8e1354 --- /dev/null +++ b/tatu/tests/test_app.py @@ -0,0 +1,41 @@ +# coding=utf-8 +import json +import falcon +from falcon import testing +import msgpack +import pytest +import uuid +from tatu.api.app import create_app +from tatu.db.persistence import SQLAlchemySessionManager +from tatu.db.models import Authority +from Crypto.PublicKey import RSA + +@pytest.fixture +def db(): + return SQLAlchemySessionManager() + +@pytest.fixture +def client(db): + api = create_app(db) + return testing.TestClient(api) + +def test_post_authority(client, db): + auth_id = str(uuid.uuid4()) + user_ca = RSA.generate(2048) + host_ca = RSA.generate(2048) + body = { + 'íd': auth_id, + 'user_privkey': user_ca.exportKey('PEM'), + 'user_pubkey': user_ca.publickey().exportKey('OpenSSH'), + 'host_privkey': host_ca.exportKey('PEM'), + 'host_pubkey': host_ca.publickey().exportKey('OpenSSH') + } + response = client.simulate_post( + '/authorities', + body=json.dumps(body), + ) + #with db.Session() as session: + session = db.Session() + auth = session.query(Authority).get(auth_id) + assert auth is not None + diff --git a/tatu/utils.py b/tatu/utils.py new file mode 100644 index 0000000..15b926e --- /dev/null +++ b/tatu/utils.py @@ -0,0 +1,24 @@ +def generateCert(auth_key, entity_key, host_name=None): + # Temporarily write the authority private key and entity public key to /tmp + ca_file = '/tmp'.join(uuid.uuid4().hex) + pub_prefix = uuid.uuid4().hex + pub_file = ''.join('/tmp/', pub_prefix, '.pub') + with open(ca_file, "w") as text_file: + text_file.write(auth_key) + with open(pub_file, "w") as text_file: + text_file.write(entity_key) + # Call keygen + if host_name is None: + subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file], shell=True) + else: + subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file], shell=True) + # Read the contents of the certificate file + cert_file = ''.join('/tmp/', pub_prefix, '-cert.pub') + cert = '' + with open(cert_file, 'r') as text_file: + cert = text_file.read() + # Delete temporary files + for file in [ca_file, pub_file, cert_file]: + os.remove(file) + return cert +