Added tests for resource Post.

This commit is contained in:
Pino de Candia 2017-10-27 23:20:51 +00:00
parent e35a3064ba
commit 13e5e63b81
5 changed files with 186 additions and 87 deletions

View File

@ -6,11 +6,11 @@ def create_app(sa):
api = falcon.API(middleware=[sa])
api.add_route('/authorities', models.Authorities())
api.add_route('/authorities/{ca_id}', models.Authority())
api.add_route('/user_certs', models.UserCerts())
api.add_route('/users/{user_id}/certs/{fingerprint}', models.UserCert())
api.add_route('/host_certs', models.HostCerts())
api.add_route('/hosts/{host_id}/certs/{fingerprint}', models.HostCert())
api.add_route('/host_cert_tokens', models.Token())
api.add_route('/usercerts', models.UserCerts())
api.add_route('/usercerts/{user_id}/{fingerprint}', models.UserCert())
api.add_route('/hostcerts', models.HostCerts())
api.add_route('/hostcerts/{host_id}/{fingerprint}', models.HostCert())
api.add_route('/hosttokens', models.Token())
return api

View File

@ -6,7 +6,6 @@ from tatu.db import models as db
class Authorities(object):
def on_post(self, req, resp):
print 'in Authorities on_post'
body = None
if req.content_length:
body = json.load(req.stream)
@ -29,20 +28,18 @@ class Authority(object):
class UserCerts(object):
def on_post(self, req, resp):
print 'in UserCerts on_post'
body = None
if req.content_length:
body = json.load(req.stream)
# TODO: validation, e.g. of UUIDs
# TODO: validation
user = db.createUserCert(
self.session,
body['user_id'],
body['auth_id'],
body['pub_key'],
body['priv_key']
body['pub_key']
)
resp.status = falcon.HTTP_201
resp.location = '/users/' + body['user_id'] + '/certs/' + user.fingerprint
resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint
class UserCert(object):
@ -52,28 +49,26 @@ class UserCert(object):
class HostCerts(object):
def on_post(self, req, resp):
print 'in HostCerts on_post'
body = None
if req.content_length:
body = json.load(req.stream)
host = db.createHostCert(
self.session,
body['token_id'],
body['instance_id'],
body['pub_key']
)
resp.status = falcon.HTTP_201
resp.location = '/hosts/' + host_cert.instance_id + '/certs/' + host_cert.fingerprint
resp.location = '/hostcerts/' + host.instance_id + '/' + host.fingerprint
class HostCert(object):
def on_get(self, req, resp, host_id, fingerprint):
print 'in HostCert on_post'
resp.status = falcon.HTTP_400
class Token(object):
def on_post(self, req, resp):
print 'in Token on_post'
body = None
if req.content_length:
body = json.load(req.stream)
@ -84,4 +79,4 @@ class Token(object):
body['hostname']
)
resp.status = falcon.HTTP_201
resp.body = json.dumps({'token_id': token.id})
resp.location = '/hosttokens/' + token.token_id

View File

@ -1,5 +1,7 @@
from datetime import datetime
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
import falcon
import sshpubkeys
import uuid
import os
@ -13,15 +15,14 @@ def generate_uuid():
class Authority(Base):
__tablename__ = 'authorities'
id = sa.Column(sa.String(36), primary_key=True)
auth_id = sa.Column(sa.String(36), primary_key=True)
user_pubkey = sa.Column(sa.Text)
user_privkey = sa.Column(sa.Text)
host_pubkey = sa.Column(sa.Text)
host_privkey = sa.Column(sa.Text)
def createAuthority(session, id, user_pub, user_priv, host_pub, host_priv):
print 'in createAuthority'
auth = Authority(id=id,
def createAuthority(session, auth_id, user_pub, user_priv, host_pub, host_priv):
auth = Authority(auth_id=auth_id,
user_pubkey=user_pub,
user_privkey=user_priv,
host_pubkey=host_pub,
@ -34,27 +35,26 @@ class UserCert(Base):
__tablename__ = 'user_certs'
user_id = sa.Column(sa.String(36), primary_key=True)
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id'))
fingerprint = sa.Column(sa.String(36), primary_key=True)
privkey = sa.Column(sa.Text)
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
pubkey = sa.Column(sa.Text)
cert = sa.Column(sa.Text)
def createUserCert(session, id, auth_id, pub, priv):
print 'in createUserCert'
def createUserCert(session, user_id, auth_id, pub):
user = UserCert(
user_id=id,
user_id=user_id,
auth_id=auth_id,
pubkey=pub,
privkey=priv
)
# Generate the fingerprint from the public key
user.fingerprint = sshpubkeys.SSHKey(pub).hash()
# Retrieve the authority's private key and generate the certificate
auth = session.query(Authority).get(auth_id)
if auth is None:
raise falcon.HTTPNotFound("Unrecognized certificate authority")
raise falcon.HTTPNotFound()
user.cert = generateCert(auth.user_privkey, pub)
if user.cert == '':
raise falcon.HTTPInternalServerError("Failed to generate the certificate")
session.add(user)
session.commit()
return user
@ -62,20 +62,20 @@ def createUserCert(session, id, auth_id, pub, priv):
class Token(Base):
__tablename__ = 'tokens'
id = sa.Column(sa.String(36), primary_key=True,
token_id = sa.Column(sa.String(36), primary_key=True,
default=generate_uuid)
hostname = sa.Column(sa.String(36))
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
instance_id = sa.Column(sa.String(36))
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.id'))
hostname = sa.Column(sa.String(36))
used = sa.Column(sa.Boolean, default=False)
date_used = sa.Column(sa.Date)
date_used = sa.Column(sa.DateTime, default=datetime.min)
fingerprint_used = sa.Column(sa.String(36))
def createToken(session, instance_id, auth_id, hostname):
# Validate the certificate authority
auth = session.query(Authority).get(auth_id)
if auth is None:
raise falcon.HTTPNotFound("Unrecognized certificate authority")
raise falcon.HTTPNotFound()
token = Token(instance_id=instance_id,
auth_id=auth_id,
hostname=hostname)
@ -86,32 +86,37 @@ def createToken(session, instance_id, auth_id, hostname):
class HostCert(Base):
__tablename__ = 'host_certs'
id = sa.Column(sa.String(36), primary_key=True)
instance_id = sa.Column(sa.String(36), primary_key=True)
fingerprint = sa.Column(sa.String(36), primary_key=True)
token_id = sa.Column(sa.String(36), sa.ForeignKey('tokens.id'))
token_id = sa.Column(sa.String(36), sa.ForeignKey('tokens.token_id'))
pubkey = sa.Column(sa.Text)
cert = sa.Column(sa.Text)
hostname = sa.Column(sa.String(36))
def createHostCert(session, token_id, pub):
print 'in createHostCert'
def createHostCert(session, token_id, instance_id, pub):
token = session.query(Token).get(token_id)
if token is None:
raise falcon.HTTPNotFound("Unrecognized token")
raise falcon.HTTPNotFound()
if token.used:
raise falcon.HTTPForbidden(description='The presented token was previously used')
if token.instance_id != instance_id:
raise falcon.HTTPForbidden(description='The token is not valid for this instance ID')
auth = session.query(Authority).get(token.auth_id)
if auth is None:
raise falcon.HTTPNotFound("Unrecognized certificate authority")
host = HostCert(id=token.instance_id,
raise falcon.HTTPNotFound()
cert = generateCert(auth.host_privkey, pub, token.hostname)
if cert == '':
raise falcon.HTTPInternalServerError("Failed to generate the certificate")
host = HostCert(instance_id=instance_id,
fingerprint=sshpubkeys.SSHKey(pub).hash(),
token_id=token_id,
pubkey=pub,
cert=generateCert(auth.host_privkey, pub))
cert=cert,
hostname=token.hostname)
session.add(host)
print host
# Update the token
token.used = true
token.date_used = now
token.used = True
token.date_used = datetime.utcnow()
token.fingerprint_used = host.fingerprint
session.add(token)
session.commit()

View File

@ -2,7 +2,6 @@
import json
import falcon
from falcon import testing
import msgpack
import pytest
import uuid
from tatu.api.app import create_app
@ -44,56 +43,146 @@ def test_post_authority(client, db):
auth = session.query(Authority).get(auth_id)
assert auth is not None
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_user(client, db):
user_id = str(uuid.uuid4())
def user_request(auth=auth_id, user_id=None):
if user_id is None:
user_id = str(uuid.uuid4())
user_key = RSA.generate(2048)
pub_key = user_key.publickey().exportKey('OpenSSH')
body = {
return {
'user_id': user_id,
'auth_id': auth_id,
'priv_key': user_key.exportKey('PEM'),
'auth_id': auth,
'pub_key': pub_key
}
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_user(client, db):
body = user_request()
response = client.simulate_post(
'/user_certs',
'/usercerts',
body=json.dumps(body)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location = response.headers['location'].split('/')
assert location[1] == 'users'
assert location[2] == user_id
assert location[3] == 'certs'
assert location[4] == sshpubkeys.SSHKey(pub_key).hash()
assert location[1] == 'usercerts'
assert location[2] == body['user_id']
assert location[3] == sshpubkeys.SSHKey(body['pub_key']).hash()
@pytest.mark.dependency(depends=['test_post_authority'])
@pytest.mark.skip(reason="not working yet")
def test_host_cert_workflow(client, db):
instance_id = str(uuid.uuid4())
token = {
def test_post_user_bad_auth(client, db):
body = user_request(str(uuid.uuid4()))
response = client.simulate_post(
'/usercerts',
body=json.dumps(body)
)
assert response.status == falcon.HTTP_NOT_FOUND
def token_request(auth=auth_id, instance_id=None):
if instance_id is None:
instance_id = str(uuid.uuid4())
return {
'instance_id': instance_id,
'auth_id': auth_id,
'auth_id': auth,
'hostname': 'testname.local'
}
def host_request(token_id, instance_id=None):
if instance_id is None:
instance_id = str(uuid.uuid4())
host_key = RSA.generate(2048)
pub_key = str(host_key.publickey().exportKey('OpenSSH'))
return {
'token_id': token_id,
'instance_id': instance_id,
'pub_key': pub_key
}
@pytest.mark.dependency(depends=['test_post_authority'])
def test_host_cert_workflow(client, db):
token = token_request()
response = client.simulate_post(
'/host_cert_tokens',
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'host_cert_tokens'
host_key = RSA.generate(2048)
pub_key = str(host_key.publickey().exportKey('OpenSSH'))
host = {
'token_id': location_path[-1],
'pub_key': pub_key
}
assert location_path[1] == 'hosttokens'
host = host_request(location_path[-1], token['instance_id'])
response = client.simulate_post(
'/host_certs',
'/hostcerts',
body=json.dumps(host)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location = response.headers['location'].split('/')
assert location[1] == 'hostcerts'
assert location[2] == host['instance_id']
assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash()
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_token_bad_auth(client, db):
token = token_request(str(uuid.uuid4()))
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_NOT_FOUND
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_host_with_bogus_token(client, db):
token = token_request(str(uuid.uuid4()))
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_NOT_FOUND
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_host_with_wrong_instance_id(client, db):
token = token_request()
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_CREATED
cert = json.loads(response.body)
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
# Use the token with a different instance_id
host = host_request(location_path[-1], str(uuid.uuid4()))
response = client.simulate_post(
'/hostcerts',
body=json.dumps(host)
)
@pytest.mark.dependency(depends=['test_post_authority'])
def test_post_host_with_used_token(client, db):
token = token_request()
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
# First, use the token to sign a host public key
host = host_request(location_path[-1], token['instance_id'])
response = client.simulate_post(
'/hostcerts',
body=json.dumps(host)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location = response.headers['location'].split('/')
assert location[1] == 'hostcerts'
assert location[2] == host['instance_id']
assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash()
# Now try using the token a sceond time, same instance_id, different pub key
host = host_request(location_path[-1], token['instance_id'])
response = client.simulate_post(
'/hostcerts',
body=json.dumps(host)
)
assert response.status == falcon.HTTP_FORBIDDEN

View File

@ -2,30 +2,40 @@ import os
import subprocess
import uuid
def generateCert(auth_key, entity_key, host_name=None):
# TODO: must clean up key files regardless of outcome
# Temporarily write the authority private key and entity public key to /tmp
ca_file = ''.join(['/tmp/', uuid.uuid4().hex])
pub_prefix = uuid.uuid4().hex
pub_file = ''.join(['/tmp/', pub_prefix, '.pub'])
def generateCert(auth_key, entity_key, hostname=None):
# Temporarily write the authority private key and entity public key to files
prefix = uuid.uuid4().hex
# Todo: make the temporary directory configurable or secure it.
dir = '/tmp/sshaas'
ca_file = ''.join([dir, prefix])
pub_file = ''.join([dir, prefix, '.pub'])
cert_file = ''.join([dir, prefix, '-cert.pub'])
cert = ''
try:
os.open(ca_file, os.O_WRONLY | os.O_CREAT, 0o600)
with open(ca_file, "w") as text_file:
text_file.write(auth_key)
with open(pub_file, "w") as text_file:
with open(pub_file, "w", 0o644) as text_file:
text_file.write(entity_key)
cert_file = ''.join(['/tmp/', pub_prefix, '-cert.pub'])
args = []
if host_name is None:
args = ['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n "myRoot,yourRoot"', pub_file]
args = ['ssh-keygen', '-P "pinot"', '-s', ca_file, '-I testID', '-V',
'-1d:+365d', '-n']
if hostname is None:
args.extend(['"myRoot,yourRoot"', pub_file])
else:
args = ['ssh-keygen', '-P "pino"', '-s', ca_file, '-I testID', '-V -1d:+365d', '-n', host_name, '-h', pub_file]
print args
subprocess.call(args, shell=True)
args.extend([hostname, '-h', pub_file])
print subprocess.check_output(args, stderr=subprocess.STDOUT)
# Read the contents of the certificate file
cert = ''
with open(cert_file, 'r') as text_file:
cert = text_file.read()
except Exception as e:
print e
finally:
# Delete temporary files
for file in [ca_file, pub_file, cert_file]:
os.remove(file)
try:
os.remove(file)
pass
except:
pass
return cert