Added tests for GET and error cases.
This commit is contained in:
parent
13e5e63b81
commit
aae0119466
@ -5,7 +5,7 @@ from tatu.db.persistence import SQLAlchemySessionManager
|
||||
def create_app(sa):
|
||||
api = falcon.API(middleware=[sa])
|
||||
api.add_route('/authorities', models.Authorities())
|
||||
api.add_route('/authorities/{ca_id}', models.Authority())
|
||||
api.add_route('/authorities/{auth_id}', models.Authority())
|
||||
api.add_route('/usercerts', models.UserCerts())
|
||||
api.add_route('/usercerts/{user_id}/{fingerprint}', models.UserCert())
|
||||
api.add_route('/hostcerts', models.HostCerts())
|
||||
|
@ -1,6 +1,7 @@
|
||||
import falcon
|
||||
import json
|
||||
from tatu.db import models as db
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
|
||||
class Authorities(object):
|
||||
@ -12,18 +13,30 @@ class Authorities(object):
|
||||
db.createAuthority(
|
||||
self.session,
|
||||
body['auth_id'],
|
||||
body['user_pubkey'],
|
||||
body['user_privkey'],
|
||||
body['host_pubkey'],
|
||||
body['host_privkey']
|
||||
user_key=body['user_key'],
|
||||
host_key=body['host_key'],
|
||||
)
|
||||
resp.status = falcon.HTTP_201
|
||||
resp.location = '/authorities/' + body['auth_id']
|
||||
|
||||
class Authority(object):
|
||||
|
||||
def on_get(self, req, resp, ca_id):
|
||||
resp.status = falcon.HTTP_400
|
||||
def on_get(self, req, resp, auth_id):
|
||||
auth = db.getAuthority(self.session, auth_id)
|
||||
if auth is None:
|
||||
resp.status = falcon.HTTP_NOT_FOUND
|
||||
return
|
||||
user_key = RSA.importKey(auth.user_key)
|
||||
user_pub_key = user_key.publickey().exportKey('OpenSSH')
|
||||
host_key = RSA.importKey(auth.host_key)
|
||||
host_pub_key = host_key.publickey().exportKey('OpenSSH')
|
||||
body = {
|
||||
'auth_id': auth_id,
|
||||
'user_key.pub': user_pub_key,
|
||||
'host_key.pub': host_pub_key
|
||||
}
|
||||
resp.body = json.dumps(body)
|
||||
resp.status = falcon.HTTP_OK
|
||||
|
||||
class UserCerts(object):
|
||||
|
||||
@ -36,7 +49,7 @@ class UserCerts(object):
|
||||
self.session,
|
||||
body['user_id'],
|
||||
body['auth_id'],
|
||||
body['pub_key']
|
||||
body['key.pub']
|
||||
)
|
||||
resp.status = falcon.HTTP_201
|
||||
resp.location = '/usercerts/' + user.user_id + '/' + user.fingerprint
|
||||
@ -44,7 +57,18 @@ class UserCerts(object):
|
||||
class UserCert(object):
|
||||
|
||||
def on_get(self, req, resp, user_id, fingerprint):
|
||||
resp.status = falcon.HTTP_400
|
||||
user = db.getUserCert(self.session, user_id, fingerprint)
|
||||
if user is None:
|
||||
resp.status = falcon.HTTP_NOT_FOUND
|
||||
return
|
||||
body = {
|
||||
'user_id': user.user_id,
|
||||
'fingerprint': user.fingerprint,
|
||||
'auth_id': user.auth_id,
|
||||
'key-cert.pub': user.cert
|
||||
}
|
||||
resp.body = json.dumps(body)
|
||||
resp.status = falcon.HTTP_OK
|
||||
|
||||
class HostCerts(object):
|
||||
|
||||
@ -55,16 +79,27 @@ class HostCerts(object):
|
||||
host = db.createHostCert(
|
||||
self.session,
|
||||
body['token_id'],
|
||||
body['instance_id'],
|
||||
body['pub_key']
|
||||
body['host_id'],
|
||||
body['key.pub']
|
||||
)
|
||||
resp.status = falcon.HTTP_201
|
||||
resp.location = '/hostcerts/' + host.instance_id + '/' + host.fingerprint
|
||||
resp.location = '/hostcerts/' + host.host_id + '/' + host.fingerprint
|
||||
|
||||
class HostCert(object):
|
||||
|
||||
def on_get(self, req, resp, host_id, fingerprint):
|
||||
resp.status = falcon.HTTP_400
|
||||
host = db.getHostCert(self.session, host_id, fingerprint)
|
||||
if host is None:
|
||||
resp.status = falcon.HTTP_NOT_FOUND
|
||||
return
|
||||
body = {
|
||||
'host_id': host.host_id,
|
||||
'fingerprint': host.fingerprint,
|
||||
'auth_id': host.auth_id,
|
||||
'key-cert.pub': host.pubkey,
|
||||
}
|
||||
resp.body = json.dumps(body)
|
||||
resp.status = falcon.HTTP_OK
|
||||
|
||||
class Token(object):
|
||||
|
||||
@ -74,7 +109,7 @@ class Token(object):
|
||||
body = json.load(req.stream)
|
||||
token = db.createToken(
|
||||
self.session,
|
||||
body['instance_id'],
|
||||
body['host_id'],
|
||||
body['auth_id'],
|
||||
body['hostname']
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ import sshpubkeys
|
||||
import uuid
|
||||
import os
|
||||
from tatu.utils import generateCert
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
@ -16,20 +17,19 @@ class Authority(Base):
|
||||
__tablename__ = 'authorities'
|
||||
|
||||
auth_id = sa.Column(sa.String(36), primary_key=True)
|
||||
user_pubkey = sa.Column(sa.Text)
|
||||
user_privkey = sa.Column(sa.Text)
|
||||
host_pubkey = sa.Column(sa.Text)
|
||||
host_privkey = sa.Column(sa.Text)
|
||||
user_key = sa.Column(sa.Text)
|
||||
host_key = sa.Column(sa.Text)
|
||||
|
||||
def createAuthority(session, auth_id, user_pub, user_priv, host_pub, host_priv):
|
||||
auth = Authority(auth_id=auth_id,
|
||||
user_pubkey=user_pub,
|
||||
user_privkey=user_priv,
|
||||
host_pubkey=host_pub,
|
||||
host_privkey=host_priv)
|
||||
session.add(auth)
|
||||
session.commit()
|
||||
return auth
|
||||
def getAuthority(session, auth_id):
|
||||
return session.query(Authority).get(auth_id)
|
||||
|
||||
def createAuthority(session, auth_id, user_key, host_key):
|
||||
auth = Authority(auth_id=auth_id,
|
||||
user_key=user_key,
|
||||
host_key=host_key)
|
||||
session.add(auth)
|
||||
session.commit()
|
||||
return auth
|
||||
|
||||
class UserCert(Base):
|
||||
__tablename__ = 'user_certs'
|
||||
@ -37,24 +37,29 @@ class UserCert(Base):
|
||||
user_id = sa.Column(sa.String(36), primary_key=True)
|
||||
fingerprint = sa.Column(sa.String(36), primary_key=True)
|
||||
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
|
||||
pubkey = sa.Column(sa.Text)
|
||||
cert = sa.Column(sa.Text)
|
||||
|
||||
def getUserCert(session, user_id, fingerprint):
|
||||
return session.query(UserCert).get([user_id, fingerprint])
|
||||
|
||||
def createUserCert(session, user_id, auth_id, pub):
|
||||
user = UserCert(
|
||||
user_id=user_id,
|
||||
auth_id=auth_id,
|
||||
pubkey=pub,
|
||||
)
|
||||
# Generate the fingerprint from the public key
|
||||
user.fingerprint = sshpubkeys.SSHKey(pub).hash()
|
||||
# Retrieve the authority's private key and generate the certificate
|
||||
auth = session.query(Authority).get(auth_id)
|
||||
auth = getAuthority(session, auth_id)
|
||||
if auth is None:
|
||||
raise falcon.HTTPNotFound()
|
||||
user.cert = generateCert(auth.user_privkey, pub)
|
||||
if user.cert == '':
|
||||
fingerprint = sshpubkeys.SSHKey(pub).hash()
|
||||
certRecord = session.query(UserCert).get([user_id, fingerprint])
|
||||
if certRecord is not None:
|
||||
raise falcon.HTTPConflict('This public key is already signed.')
|
||||
cert = generateCert(auth.user_key, pub)
|
||||
if cert is None:
|
||||
raise falcon.HTTPInternalServerError("Failed to generate the certificate")
|
||||
user = UserCert(
|
||||
user_id=user_id,
|
||||
fingerprint=fingerprint,
|
||||
auth_id=auth_id,
|
||||
cert=cert
|
||||
)
|
||||
session.add(user)
|
||||
session.commit()
|
||||
return user
|
||||
@ -65,18 +70,18 @@ class Token(Base):
|
||||
token_id = sa.Column(sa.String(36), primary_key=True,
|
||||
default=generate_uuid)
|
||||
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
|
||||
instance_id = sa.Column(sa.String(36))
|
||||
host_id = sa.Column(sa.String(36))
|
||||
hostname = sa.Column(sa.String(36))
|
||||
used = sa.Column(sa.Boolean, default=False)
|
||||
date_used = sa.Column(sa.DateTime, default=datetime.min)
|
||||
fingerprint_used = sa.Column(sa.String(36))
|
||||
|
||||
def createToken(session, instance_id, auth_id, hostname):
|
||||
def createToken(session, host_id, auth_id, hostname):
|
||||
# Validate the certificate authority
|
||||
auth = session.query(Authority).get(auth_id)
|
||||
auth = getAuthority(session, auth_id)
|
||||
if auth is None:
|
||||
raise falcon.HTTPNotFound()
|
||||
token = Token(instance_id=instance_id,
|
||||
token = Token(host_id=host_id,
|
||||
auth_id=auth_id,
|
||||
hostname=hostname)
|
||||
session.add(token)
|
||||
@ -86,31 +91,39 @@ def createToken(session, instance_id, auth_id, hostname):
|
||||
class HostCert(Base):
|
||||
__tablename__ = 'host_certs'
|
||||
|
||||
instance_id = sa.Column(sa.String(36), primary_key=True)
|
||||
host_id = sa.Column(sa.String(36), primary_key=True)
|
||||
fingerprint = sa.Column(sa.String(36), primary_key=True)
|
||||
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
|
||||
token_id = sa.Column(sa.String(36), sa.ForeignKey('tokens.token_id'))
|
||||
pubkey = sa.Column(sa.Text)
|
||||
cert = sa.Column(sa.Text)
|
||||
hostname = sa.Column(sa.String(36))
|
||||
|
||||
def createHostCert(session, token_id, instance_id, pub):
|
||||
def getHostCert(session, host_id, fingerprint):
|
||||
return session.query(HostCert).get([host_id, fingerprint])
|
||||
|
||||
def createHostCert(session, token_id, host_id, pub):
|
||||
token = session.query(Token).get(token_id)
|
||||
if token is None:
|
||||
raise falcon.HTTPNotFound()
|
||||
if token.used:
|
||||
raise falcon.HTTPForbidden(description='The presented token was previously used')
|
||||
if token.instance_id != instance_id:
|
||||
if token.host_id != host_id:
|
||||
raise falcon.HTTPForbidden(description='The token is not valid for this instance ID')
|
||||
auth = session.query(Authority).get(token.auth_id)
|
||||
auth = getAuthority(session, token.auth_id)
|
||||
if auth is None:
|
||||
raise falcon.HTTPNotFound()
|
||||
cert = generateCert(auth.host_privkey, pub, token.hostname)
|
||||
fingerprint = sshpubkeys.SSHKey(pub).hash()
|
||||
certRecord = session.query(HostCert).get([host_id, fingerprint])
|
||||
if certRecord is not None:
|
||||
raise falcon.HTTPConflict('This public key is already signed.')
|
||||
cert = generateCert(auth.host_key, pub, token.hostname)
|
||||
if cert == '':
|
||||
raise falcon.HTTPInternalServerError("Failed to generate the certificate")
|
||||
host = HostCert(instance_id=instance_id,
|
||||
fingerprint=sshpubkeys.SSHKey(pub).hash(),
|
||||
host = HostCert(host_id=host_id,
|
||||
fingerprint=fingerprint,
|
||||
auth_id=token.auth_id,
|
||||
token_id=token_id,
|
||||
pubkey=pub,
|
||||
cert=cert,
|
||||
hostname=token.hostname)
|
||||
session.add(host)
|
||||
|
@ -12,25 +12,37 @@ import sshpubkeys
|
||||
|
||||
@pytest.fixture
|
||||
def db():
|
||||
return SQLAlchemySessionManager()
|
||||
return SQLAlchemySessionManager()
|
||||
|
||||
@pytest.fixture
|
||||
def client(db):
|
||||
api = create_app(db)
|
||||
return testing.TestClient(api)
|
||||
api = create_app(db)
|
||||
return testing.TestClient(api)
|
||||
|
||||
token_id = ''
|
||||
|
||||
host_id = str(uuid.uuid4())
|
||||
host_key = RSA.generate(2048)
|
||||
host_pub_key = host_key.publickey().exportKey('OpenSSH')
|
||||
host_fingerprint = sshpubkeys.SSHKey(host_pub_key).hash()
|
||||
|
||||
user_id = str(uuid.uuid4())
|
||||
user_key = RSA.generate(2048)
|
||||
user_pub_key = user_key.publickey().exportKey('OpenSSH')
|
||||
user_fingerprint = sshpubkeys.SSHKey(user_pub_key).hash()
|
||||
|
||||
auth_id = str(uuid.uuid4())
|
||||
auth_user_key = RSA.generate(2048)
|
||||
auth_host_key = RSA.generate(2048)
|
||||
auth_user_pub_key = auth_user_key.publickey().exportKey('OpenSSH')
|
||||
auth_host_pub_key = auth_host_key.publickey().exportKey('OpenSSH')
|
||||
|
||||
@pytest.mark.dependency()
|
||||
def test_post_authority(client, db):
|
||||
user_ca = RSA.generate(2048)
|
||||
host_ca = RSA.generate(2048)
|
||||
body = {
|
||||
'auth_id': auth_id,
|
||||
'user_privkey': user_ca.exportKey('PEM'),
|
||||
'user_pubkey': user_ca.publickey().exportKey('OpenSSH'),
|
||||
'host_privkey': host_ca.exportKey('PEM'),
|
||||
'host_pubkey': host_ca.publickey().exportKey('OpenSSH')
|
||||
'user_key': auth_user_key.exportKey('PEM'),
|
||||
'host_key': auth_host_key.exportKey('PEM'),
|
||||
}
|
||||
response = client.simulate_post(
|
||||
'/authorities',
|
||||
@ -38,21 +50,33 @@ def test_post_authority(client, db):
|
||||
)
|
||||
assert response.status == falcon.HTTP_CREATED
|
||||
assert response.headers['location'] == '/authorities/' + auth_id
|
||||
#with db.Session() as session:
|
||||
session = db.Session()
|
||||
auth = session.query(Authority).get(auth_id)
|
||||
assert auth is not None
|
||||
#session = db.Session()
|
||||
#auth = session.query(Authority).get(auth_id)
|
||||
#assert auth is not None
|
||||
|
||||
def user_request(auth=auth_id, user_id=None):
|
||||
if user_id is None:
|
||||
user_id = str(uuid.uuid4())
|
||||
user_key = RSA.generate(2048)
|
||||
pub_key = user_key.publickey().exportKey('OpenSSH')
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_get_authority(client):
|
||||
response = client.simulate_get('/authorities/' + auth_id)
|
||||
assert response.status == falcon.HTTP_OK
|
||||
body = json.loads(response.content)
|
||||
assert 'auth_id' in body
|
||||
assert 'user_key.pub' in body
|
||||
assert body['user_key.pub'] == auth_user_pub_key
|
||||
assert 'host_key.pub' in body
|
||||
assert body['host_key.pub'] == auth_host_pub_key
|
||||
assert 'user_key' not in body
|
||||
assert 'host_key' not in body
|
||||
|
||||
def test_get_authority_fails(client):
|
||||
response = client.simulate_get('/authorities/' + str(uuid.uuid4()))
|
||||
assert response.status == falcon.HTTP_NOT_FOUND
|
||||
|
||||
def user_request(auth=auth_id, user_id=user_id, pub_key=user_pub_key):
|
||||
return {
|
||||
'user_id': user_id,
|
||||
'auth_id': auth,
|
||||
'pub_key': pub_key
|
||||
}
|
||||
'key.pub': pub_key
|
||||
}
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_user(client, db):
|
||||
@ -66,39 +90,73 @@ def test_post_user(client, db):
|
||||
location = response.headers['location'].split('/')
|
||||
assert location[1] == 'usercerts'
|
||||
assert location[2] == body['user_id']
|
||||
assert location[3] == sshpubkeys.SSHKey(body['pub_key']).hash()
|
||||
assert location[3] == sshpubkeys.SSHKey(body['key.pub']).hash()
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_user_bad_auth(client, db):
|
||||
body = user_request(str(uuid.uuid4()))
|
||||
@pytest.mark.dependency(depends=['test_post_user'])
|
||||
def test_get_user(client):
|
||||
response = client.simulate_get('/usercerts/' + user_id + '/' + user_fingerprint)
|
||||
assert response.status == falcon.HTTP_OK
|
||||
body = json.loads(response.content)
|
||||
assert 'user_id' in body
|
||||
assert 'fingerprint' in body
|
||||
assert 'auth_id' in body
|
||||
assert 'key-cert.pub' in body
|
||||
assert body['auth_id'] == auth_id
|
||||
|
||||
def test_get_user_fails(client):
|
||||
response = client.simulate_get('/usercerts/' + str(uuid.uuid4()) + '/' + user_fingerprint)
|
||||
assert response.status == falcon.HTTP_NOT_FOUND
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_user'])
|
||||
def test_post_second_cert_same_user(client):
|
||||
key = RSA.generate(2048)
|
||||
pub_key = key.publickey().exportKey('OpenSSH')
|
||||
body = user_request(pub_key=pub_key)
|
||||
response = client.simulate_post(
|
||||
'/usercerts',
|
||||
body=json.dumps(body)
|
||||
)
|
||||
assert response.status == falcon.HTTP_CREATED
|
||||
assert 'location' in response.headers
|
||||
location = response.headers['location'].split('/')
|
||||
assert location[1] == 'usercerts'
|
||||
assert location[2] == user_id
|
||||
assert location[3] == sshpubkeys.SSHKey(pub_key).hash()
|
||||
|
||||
def test_post_user_unknown_auth(client, db):
|
||||
body = user_request(auth=str(uuid.uuid4()))
|
||||
response = client.simulate_post(
|
||||
'/usercerts',
|
||||
body=json.dumps(body)
|
||||
)
|
||||
assert response.status == falcon.HTTP_NOT_FOUND
|
||||
|
||||
def token_request(auth=auth_id, instance_id=None):
|
||||
if instance_id is None:
|
||||
instance_id = str(uuid.uuid4())
|
||||
@pytest.mark.dependency(depends=['test_post_user'])
|
||||
def test_post_same_user_same_key_fails(client):
|
||||
# Show that using the same user ID and public key fails.
|
||||
body = user_request()
|
||||
response = client.simulate_post(
|
||||
'/usercerts',
|
||||
body=json.dumps(body)
|
||||
)
|
||||
assert response.status == falcon.HTTP_CONFLICT
|
||||
|
||||
def token_request(auth=auth_id, host=host_id):
|
||||
return {
|
||||
'instance_id': instance_id,
|
||||
'host_id': host,
|
||||
'auth_id': auth,
|
||||
'hostname': 'testname.local'
|
||||
}
|
||||
|
||||
def host_request(token_id, instance_id=None):
|
||||
if instance_id is None:
|
||||
instance_id = str(uuid.uuid4())
|
||||
host_key = RSA.generate(2048)
|
||||
pub_key = str(host_key.publickey().exportKey('OpenSSH'))
|
||||
def host_request(token, host=host_id, pub_key=host_pub_key):
|
||||
return {
|
||||
'token_id': token_id,
|
||||
'instance_id': instance_id,
|
||||
'pub_key': pub_key
|
||||
'token_id': token,
|
||||
'host_id': host,
|
||||
'key.pub': pub_key
|
||||
}
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_host_cert_workflow(client, db):
|
||||
def test_post_token_and_host(client, db):
|
||||
token = token_request()
|
||||
response = client.simulate_post(
|
||||
'/hosttokens',
|
||||
@ -108,7 +166,12 @@ def test_host_cert_workflow(client, db):
|
||||
assert 'location' in response.headers
|
||||
location_path = response.headers['location'].split('/')
|
||||
assert location_path[1] == 'hosttokens'
|
||||
host = host_request(location_path[-1], token['instance_id'])
|
||||
# Store the token ID for other tests
|
||||
global token_id
|
||||
token_id = location_path[-1]
|
||||
# Verify that it's a valid UUID
|
||||
uuid.UUID(token_id)
|
||||
host = host_request(token_id)
|
||||
response = client.simulate_post(
|
||||
'/hostcerts',
|
||||
body=json.dumps(host)
|
||||
@ -117,11 +180,27 @@ def test_host_cert_workflow(client, db):
|
||||
assert 'location' in response.headers
|
||||
location = response.headers['location'].split('/')
|
||||
assert location[1] == 'hostcerts'
|
||||
assert location[2] == host['instance_id']
|
||||
assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash()
|
||||
assert location[2] == host_id
|
||||
assert location[3] == sshpubkeys.SSHKey(host_pub_key).hash()
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_token_bad_auth(client, db):
|
||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||
def test_get_host(client):
|
||||
response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint)
|
||||
assert response.status == falcon.HTTP_OK
|
||||
body = json.loads(response.content)
|
||||
assert 'host_id' in body
|
||||
assert 'fingerprint' in body
|
||||
assert 'auth_id' in body
|
||||
assert 'key-cert.pub' in body
|
||||
assert body['host_id'] == host_id
|
||||
assert body['fingerprint'] == host_fingerprint
|
||||
assert body['auth_id'] == auth_id
|
||||
|
||||
def test_get_host_fails(client):
|
||||
response = client.simulate_get('/hostcerts/' + str(uuid.uuid4()) + '/' + host_fingerprint)
|
||||
assert response.status == falcon.HTTP_NOT_FOUND
|
||||
|
||||
def test_post_token_unknown_auth(client, db):
|
||||
token = token_request(str(uuid.uuid4()))
|
||||
response = client.simulate_post(
|
||||
'/hosttokens',
|
||||
@ -131,15 +210,16 @@ def test_post_token_bad_auth(client, db):
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_host_with_bogus_token(client, db):
|
||||
token = token_request(str(uuid.uuid4()))
|
||||
host = host_request(str(uuid.uuid4()), str(uuid.uuid4()))
|
||||
response = client.simulate_post(
|
||||
'/hosttokens',
|
||||
body=json.dumps(token)
|
||||
'/hostcerts',
|
||||
body=json.dumps(host)
|
||||
)
|
||||
assert response.status == falcon.HTTP_NOT_FOUND
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_host_with_wrong_instance_id(client, db):
|
||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||
def test_post_host_with_wrong_host_id(client, db):
|
||||
# Get a new token for the same host_id as the base test.
|
||||
token = token_request()
|
||||
response = client.simulate_post(
|
||||
'/hosttokens',
|
||||
@ -149,15 +229,20 @@ def test_post_host_with_wrong_instance_id(client, db):
|
||||
assert 'location' in response.headers
|
||||
location_path = response.headers['location'].split('/')
|
||||
assert location_path[1] == 'hosttokens'
|
||||
# Use the token with a different instance_id
|
||||
host = host_request(location_path[-1], str(uuid.uuid4()))
|
||||
# Use the token with a different host_id than it was created for.
|
||||
# Use a different public key to avoid other error conditions.
|
||||
key = RSA.generate(2048)
|
||||
pub_key = key.publickey().exportKey('OpenSSH')
|
||||
host = host_request(location_path[-1], str(uuid.uuid4()), pub_key)
|
||||
response = client.simulate_post(
|
||||
'/hostcerts',
|
||||
body=json.dumps(host)
|
||||
)
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_authority'])
|
||||
def test_post_host_with_used_token(client, db):
|
||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||
def test_post_host_same_public_key_fails(client):
|
||||
# Use a new token compared to the test this depends on.
|
||||
# Show that using the same host ID and public key fails.
|
||||
token = token_request()
|
||||
response = client.simulate_post(
|
||||
'/hosttokens',
|
||||
@ -167,20 +252,20 @@ def test_post_host_with_used_token(client, db):
|
||||
assert 'location' in response.headers
|
||||
location_path = response.headers['location'].split('/')
|
||||
assert location_path[1] == 'hosttokens'
|
||||
# First, use the token to sign a host public key
|
||||
host = host_request(location_path[-1], token['instance_id'])
|
||||
host = host_request(location_path[-1])
|
||||
response = client.simulate_post(
|
||||
'/hostcerts',
|
||||
body=json.dumps(host)
|
||||
)
|
||||
assert response.status == falcon.HTTP_CREATED
|
||||
assert 'location' in response.headers
|
||||
location = response.headers['location'].split('/')
|
||||
assert location[1] == 'hostcerts'
|
||||
assert location[2] == host['instance_id']
|
||||
assert location[3] == sshpubkeys.SSHKey(host['pub_key']).hash()
|
||||
# Now try using the token a sceond time, same instance_id, different pub key
|
||||
host = host_request(location_path[-1], token['instance_id'])
|
||||
assert response.status == falcon.HTTP_CONFLICT
|
||||
|
||||
@pytest.mark.dependency(depends=['test_post_token_and_host'])
|
||||
def test_post_host_with_used_token(client, db):
|
||||
# Re-use the token from the test this depends on.
|
||||
# Use the same host_id and different public key to avoid other errors.
|
||||
key = RSA.generate(2048)
|
||||
pub_key = key.publickey().exportKey('OpenSSH')
|
||||
host = host_request(token_id, host_id, pub_key)
|
||||
response = client.simulate_post(
|
||||
'/hostcerts',
|
||||
body=json.dumps(host)
|
||||
|
Loading…
x
Reference in New Issue
Block a user