Added tests for creating User and Host Certs.
This commit is contained in:
parent
93782dc2b8
commit
e35a3064ba
@ -5,11 +5,11 @@ from tatu.db.persistence import SQLAlchemySessionManager
|
|||||||
def create_app(sa):
|
def create_app(sa):
|
||||||
api = falcon.API(middleware=[sa])
|
api = falcon.API(middleware=[sa])
|
||||||
api.add_route('/authorities', models.Authorities())
|
api.add_route('/authorities', models.Authorities())
|
||||||
api.add_route('/authorities/{uuid}', models.Authority())
|
api.add_route('/authorities/{ca_id}', models.Authority())
|
||||||
api.add_route('/users/{uuid}/certs', models.UserCerts())
|
api.add_route('/user_certs', models.UserCerts())
|
||||||
api.add_route('/users/{uuid}/certs/{fingerprint}', models.UserCert())
|
api.add_route('/users/{user_id}/certs/{fingerprint}', models.UserCert())
|
||||||
api.add_route('/hosts/{uuid}/certs', models.HostCerts())
|
api.add_route('/host_certs', models.HostCerts())
|
||||||
api.add_route('/hosts/{uuid}/certs/{fingerprint}', models.HostCert())
|
api.add_route('/hosts/{host_id}/certs/{fingerprint}', models.HostCert())
|
||||||
api.add_route('/host_cert_tokens', models.Token())
|
api.add_route('/host_cert_tokens', models.Token())
|
||||||
return api
|
return api
|
||||||
|
|
||||||
|
@ -1,43 +1,87 @@
|
|||||||
import falcon
|
import falcon
|
||||||
|
import json
|
||||||
|
from tatu.db import models as db
|
||||||
|
|
||||||
|
|
||||||
class Authorities(object):
|
class Authorities(object):
|
||||||
|
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
resp.status = falcon.HTTP_400
|
print 'in Authorities on_post'
|
||||||
|
body = None
|
||||||
|
if req.content_length:
|
||||||
|
body = json.load(req.stream)
|
||||||
|
db.createAuthority(
|
||||||
|
self.session,
|
||||||
|
body['auth_id'],
|
||||||
|
body['user_pubkey'],
|
||||||
|
body['user_privkey'],
|
||||||
|
body['host_pubkey'],
|
||||||
|
body['host_privkey']
|
||||||
|
)
|
||||||
|
resp.status = falcon.HTTP_201
|
||||||
|
resp.location = '/authorities/' + body['auth_id']
|
||||||
|
|
||||||
class Authority(object):
|
class Authority(object):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp, ca_id):
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_400
|
||||||
|
|
||||||
class UserCerts(object):
|
class UserCerts(object):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
resp.status = falcon.HTTP_400
|
print 'in UserCerts on_post'
|
||||||
|
body = None
|
||||||
|
if req.content_length:
|
||||||
|
body = json.load(req.stream)
|
||||||
|
# TODO: validation, e.g. of UUIDs
|
||||||
|
user = db.createUserCert(
|
||||||
|
self.session,
|
||||||
|
body['user_id'],
|
||||||
|
body['auth_id'],
|
||||||
|
body['pub_key'],
|
||||||
|
body['priv_key']
|
||||||
|
)
|
||||||
|
resp.status = falcon.HTTP_201
|
||||||
|
resp.location = '/users/' + body['user_id'] + '/certs/' + user.fingerprint
|
||||||
|
|
||||||
class UserCert(object):
|
class UserCert(object):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp, user_id, fingerprint):
|
||||||
resp.status = falcon.HTTP_400
|
|
||||||
|
|
||||||
def on_post(self, req, resp):
|
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_400
|
||||||
|
|
||||||
class HostCerts(object):
|
class HostCerts(object):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
resp.status = falcon.HTTP_400
|
print 'in HostCerts on_post'
|
||||||
|
body = None
|
||||||
|
if req.content_length:
|
||||||
|
body = json.load(req.stream)
|
||||||
|
host = db.createHostCert(
|
||||||
|
self.session,
|
||||||
|
body['token_id'],
|
||||||
|
body['pub_key']
|
||||||
|
)
|
||||||
|
resp.status = falcon.HTTP_201
|
||||||
|
resp.location = '/hosts/' + host_cert.instance_id + '/certs/' + host_cert.fingerprint
|
||||||
|
|
||||||
class HostCert(object):
|
class HostCert(object):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp, host_id, fingerprint):
|
||||||
resp.status = falcon.HTTP_400
|
print 'in HostCert on_post'
|
||||||
|
|
||||||
def on_post(self, req, resp):
|
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_400
|
||||||
|
|
||||||
class Token(object):
|
class Token(object):
|
||||||
|
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
resp.status = falcon.HTTP_400
|
print 'in Token on_post'
|
||||||
|
body = None
|
||||||
|
if req.content_length:
|
||||||
|
body = json.load(req.stream)
|
||||||
|
token = db.createToken(
|
||||||
|
self.session,
|
||||||
|
body['instance_id'],
|
||||||
|
body['auth_id'],
|
||||||
|
body['hostname']
|
||||||
|
)
|
||||||
|
resp.status = falcon.HTTP_201
|
||||||
|
resp.body = json.dumps({'token_id': token.id})
|
||||||
|
@ -2,7 +2,6 @@ import sqlalchemy as sa
|
|||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
import sshpubkeys
|
import sshpubkeys
|
||||||
import uuid
|
import uuid
|
||||||
import subprocess
|
|
||||||
import os
|
import os
|
||||||
from tatu.utils import generateCert
|
from tatu.utils import generateCert
|
||||||
|
|
||||||
@ -21,7 +20,7 @@ class Authority(Base):
|
|||||||
host_privkey = sa.Column(sa.Text)
|
host_privkey = sa.Column(sa.Text)
|
||||||
|
|
||||||
def createAuthority(session, id, user_pub, user_priv, host_pub, host_priv):
|
def createAuthority(session, id, user_pub, user_priv, host_pub, host_priv):
|
||||||
with session:
|
print 'in createAuthority'
|
||||||
auth = Authority(id=id,
|
auth = Authority(id=id,
|
||||||
user_pubkey=user_pub,
|
user_pubkey=user_pub,
|
||||||
user_privkey=user_priv,
|
user_privkey=user_priv,
|
||||||
@ -42,11 +41,13 @@ class UserCert(Base):
|
|||||||
cert = sa.Column(sa.Text)
|
cert = sa.Column(sa.Text)
|
||||||
|
|
||||||
def createUserCert(session, id, auth_id, pub, priv):
|
def createUserCert(session, id, auth_id, pub, priv):
|
||||||
with session:
|
print 'in createUserCert'
|
||||||
user = User(id=id,
|
user = UserCert(
|
||||||
auth_id=auth_id,
|
user_id=id,
|
||||||
pubkey=pub,
|
auth_id=auth_id,
|
||||||
privkey=priv)
|
pubkey=pub,
|
||||||
|
privkey=priv
|
||||||
|
)
|
||||||
# Generate the fingerprint from the public key
|
# Generate the fingerprint from the public key
|
||||||
user.fingerprint = sshpubkeys.SSHKey(pub).hash()
|
user.fingerprint = sshpubkeys.SSHKey(pub).hash()
|
||||||
# Retrieve the authority's private key and generate the certificate
|
# Retrieve the authority's private key and generate the certificate
|
||||||
@ -66,20 +67,18 @@ class Token(Base):
|
|||||||
hostname = sa.Column(sa.String(36))
|
hostname = sa.Column(sa.String(36))
|
||||||
instance_id = sa.Column(sa.String(36))
|
instance_id = sa.Column(sa.String(36))
|
||||||
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id'))
|
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id'))
|
||||||
used = sa.Column(sa.Boolean)
|
used = sa.Column(sa.Boolean, default=False)
|
||||||
date_used = sa.Column(sa.Date)
|
date_used = sa.Column(sa.Date)
|
||||||
fingerprint_used = sa.Column(sa.String(36), default=False)
|
fingerprint_used = sa.Column(sa.String(36))
|
||||||
|
|
||||||
def createToken(session, instance_id, auth_id, hostname):
|
def createToken(session, instance_id, auth_id, hostname):
|
||||||
with session:
|
|
||||||
# Validate the certificate authority
|
# Validate the certificate authority
|
||||||
auth = session.query(Authority).get(auth_id)
|
auth = session.query(Authority).get(auth_id)
|
||||||
if auth is None:
|
if auth is None:
|
||||||
raise falcon.HTTPNotFound("Unrecognized certificate authority")
|
raise falcon.HTTPNotFound("Unrecognized certificate authority")
|
||||||
token = Token(instance_id=instance_id,
|
token = Token(instance_id=instance_id,
|
||||||
auth_id=auth_id,
|
auth_id=auth_id,
|
||||||
hostname=hostname,
|
hostname=hostname)
|
||||||
used=false)
|
|
||||||
session.add(token)
|
session.add(token)
|
||||||
session.commit()
|
session.commit()
|
||||||
return token
|
return token
|
||||||
@ -94,7 +93,7 @@ class HostCert(Base):
|
|||||||
cert = sa.Column(sa.Text)
|
cert = sa.Column(sa.Text)
|
||||||
|
|
||||||
def createHostCert(session, token_id, pub):
|
def createHostCert(session, token_id, pub):
|
||||||
with session:
|
print 'in createHostCert'
|
||||||
token = session.query(Token).get(token_id)
|
token = session.query(Token).get(token_id)
|
||||||
if token is None:
|
if token is None:
|
||||||
raise falcon.HTTPNotFound("Unrecognized token")
|
raise falcon.HTTPNotFound("Unrecognized token")
|
||||||
@ -109,6 +108,7 @@ def createHostCert(session, token_id, pub):
|
|||||||
pubkey=pub,
|
pubkey=pub,
|
||||||
cert=generateCert(auth.host_privkey, pub))
|
cert=generateCert(auth.host_privkey, pub))
|
||||||
session.add(host)
|
session.add(host)
|
||||||
|
print host
|
||||||
# Update the token
|
# Update the token
|
||||||
token.used = true
|
token.used = true
|
||||||
token.date_used = now
|
token.date_used = now
|
||||||
|
@ -2,6 +2,7 @@ import os
|
|||||||
|
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
from sqlalchemy.orm import sessionmaker, scoped_session
|
from sqlalchemy.orm import sessionmaker, scoped_session
|
||||||
|
from tatu.db.models import Base
|
||||||
|
|
||||||
|
|
||||||
def get_url():
|
def get_url():
|
||||||
@ -15,6 +16,7 @@ class SQLAlchemySessionManager:
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.engine = create_engine(get_url())
|
self.engine = create_engine(get_url())
|
||||||
|
Base.metadata.create_all(self.engine)
|
||||||
self.Session = scoped_session(sessionmaker(self.engine))
|
self.Session = scoped_session(sessionmaker(self.engine))
|
||||||
|
|
||||||
def process_resource(self, req, resp, resource, params):
|
def process_resource(self, req, resp, resource, params):
|
||||||
|
@ -9,6 +9,7 @@ from tatu.api.app import create_app
|
|||||||
from tatu.db.persistence import SQLAlchemySessionManager
|
from tatu.db.persistence import SQLAlchemySessionManager
|
||||||
from tatu.db.models import Authority
|
from tatu.db.models import Authority
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
|
import sshpubkeys
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def db():
|
def db():
|
||||||
@ -19,12 +20,14 @@ def client(db):
|
|||||||
api = create_app(db)
|
api = create_app(db)
|
||||||
return testing.TestClient(api)
|
return testing.TestClient(api)
|
||||||
|
|
||||||
|
auth_id = str(uuid.uuid4())
|
||||||
|
|
||||||
|
@pytest.mark.dependency()
|
||||||
def test_post_authority(client, db):
|
def test_post_authority(client, db):
|
||||||
auth_id = str(uuid.uuid4())
|
|
||||||
user_ca = RSA.generate(2048)
|
user_ca = RSA.generate(2048)
|
||||||
host_ca = RSA.generate(2048)
|
host_ca = RSA.generate(2048)
|
||||||
body = {
|
body = {
|
||||||
'íd': auth_id,
|
'auth_id': auth_id,
|
||||||
'user_privkey': user_ca.exportKey('PEM'),
|
'user_privkey': user_ca.exportKey('PEM'),
|
||||||
'user_pubkey': user_ca.publickey().exportKey('OpenSSH'),
|
'user_pubkey': user_ca.publickey().exportKey('OpenSSH'),
|
||||||
'host_privkey': host_ca.exportKey('PEM'),
|
'host_privkey': host_ca.exportKey('PEM'),
|
||||||
@ -32,10 +35,65 @@ def test_post_authority(client, db):
|
|||||||
}
|
}
|
||||||
response = client.simulate_post(
|
response = client.simulate_post(
|
||||||
'/authorities',
|
'/authorities',
|
||||||
body=json.dumps(body),
|
body=json.dumps(body)
|
||||||
)
|
)
|
||||||
|
assert response.status == falcon.HTTP_CREATED
|
||||||
|
assert response.headers['location'] == '/authorities/' + auth_id
|
||||||
#with db.Session() as session:
|
#with db.Session() as session:
|
||||||
session = db.Session()
|
session = db.Session()
|
||||||
auth = session.query(Authority).get(auth_id)
|
auth = session.query(Authority).get(auth_id)
|
||||||
assert auth is not None
|
assert auth is not None
|
||||||
|
|
||||||
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
|
def test_post_user(client, db):
|
||||||
|
user_id = str(uuid.uuid4())
|
||||||
|
user_key = RSA.generate(2048)
|
||||||
|
pub_key = user_key.publickey().exportKey('OpenSSH')
|
||||||
|
body = {
|
||||||
|
'user_id': user_id,
|
||||||
|
'auth_id': auth_id,
|
||||||
|
'priv_key': user_key.exportKey('PEM'),
|
||||||
|
'pub_key': pub_key
|
||||||
|
}
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/user_certs',
|
||||||
|
body=json.dumps(body)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_CREATED
|
||||||
|
assert 'location' in response.headers
|
||||||
|
location = response.headers['location'].split('/')
|
||||||
|
assert location[1] == 'users'
|
||||||
|
assert location[2] == user_id
|
||||||
|
assert location[3] == 'certs'
|
||||||
|
assert location[4] == sshpubkeys.SSHKey(pub_key).hash()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||||
|
@pytest.mark.skip(reason="not working yet")
|
||||||
|
def test_host_cert_workflow(client, db):
|
||||||
|
instance_id = str(uuid.uuid4())
|
||||||
|
token = {
|
||||||
|
'instance_id': instance_id,
|
||||||
|
'auth_id': auth_id,
|
||||||
|
'hostname': 'testname.local'
|
||||||
|
}
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/host_cert_tokens',
|
||||||
|
body=json.dumps(token)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_CREATED
|
||||||
|
assert 'location' in response.headers
|
||||||
|
location_path = response.headers['location'].split('/')
|
||||||
|
assert location_path[1] == 'host_cert_tokens'
|
||||||
|
host_key = RSA.generate(2048)
|
||||||
|
pub_key = str(host_key.publickey().exportKey('OpenSSH'))
|
||||||
|
host = {
|
||||||
|
'token_id': location_path[-1],
|
||||||
|
'pub_key': pub_key
|
||||||
|
}
|
||||||
|
response = client.simulate_post(
|
||||||
|
'/host_certs',
|
||||||
|
body=json.dumps(token)
|
||||||
|
)
|
||||||
|
assert response.status == falcon.HTTP_CREATED
|
||||||
|
cert = json.loads(response.body)
|
||||||
|
@ -1,19 +1,26 @@
|
|||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import uuid
|
||||||
|
|
||||||
def generateCert(auth_key, entity_key, host_name=None):
|
def generateCert(auth_key, entity_key, host_name=None):
|
||||||
|
# TODO: must clean up key files regardless of outcome
|
||||||
# Temporarily write the authority private key and entity public key to /tmp
|
# Temporarily write the authority private key and entity public key to /tmp
|
||||||
ca_file = '/tmp'.join(uuid.uuid4().hex)
|
ca_file = ''.join(['/tmp/', uuid.uuid4().hex])
|
||||||
pub_prefix = uuid.uuid4().hex
|
pub_prefix = uuid.uuid4().hex
|
||||||
pub_file = ''.join('/tmp/', pub_prefix, '.pub')
|
pub_file = ''.join(['/tmp/', pub_prefix, '.pub'])
|
||||||
with open(ca_file, "w") as text_file:
|
with open(ca_file, "w") as text_file:
|
||||||
text_file.write(auth_key)
|
text_file.write(auth_key)
|
||||||
with open(pub_file, "w") as text_file:
|
with open(pub_file, "w") as text_file:
|
||||||
text_file.write(entity_key)
|
text_file.write(entity_key)
|
||||||
# Call keygen
|
cert_file = ''.join(['/tmp/', pub_prefix, '-cert.pub'])
|
||||||
|
args = []
|
||||||
if host_name is None:
|
if host_name is None:
|
||||||
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file], shell=True)
|
args = ['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file]
|
||||||
else:
|
else:
|
||||||
subprocess.call(['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file], shell=True)
|
args = ['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file]
|
||||||
|
print args
|
||||||
|
subprocess.call(args, shell=True)
|
||||||
# Read the contents of the certificate file
|
# Read the contents of the certificate file
|
||||||
cert_file = ''.join('/tmp/', pub_prefix, '-cert.pub')
|
|
||||||
cert = ''
|
cert = ''
|
||||||
with open(cert_file, 'r') as text_file:
|
with open(cert_file, 'r') as text_file:
|
||||||
cert = text_file.read()
|
cert = text_file.read()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user