diff --git a/anchor/X509/signing_request.py b/anchor/X509/signing_request.py index cb2fda8..67a8407 100644 --- a/anchor/X509/signing_request.py +++ b/anchor/X509/signing_request.py @@ -19,15 +19,15 @@ import io from pyasn1.codec.der import decoder from pyasn1.codec.der import encoder from pyasn1.type import univ as asn1_univ -from pyasn1_modules import pem from anchor.asn1 import rfc5280 from anchor.asn1 import rfc6402 +from anchor import util from anchor.X509 import errors from anchor.X509 import extension from anchor.X509 import name from anchor.X509 import signature -from anchor.X509 import utils +from anchor.X509 import utils as x509_utils OID_extensionRequest = asn1_univ.ObjectIdentifier('1.2.840.113549.1.9.14') @@ -47,34 +47,40 @@ class X509Csr(signature.SignatureMixin): self._csr = csr @staticmethod - def from_open_file(f): + def from_open_file(f, encoding='pem'): + if encoding == 'pem': + try: + der_content = util.extract_pem(f.read()) + except Exception: + raise X509CsrError("Data not in PEM format") + elif encoding == 'der': + der_content = f.read() + else: + raise X509CsrError("Unknown encoding") + try: - der_content = pem.readPemFromFile( - f, startMarker='-----BEGIN CERTIFICATE REQUEST-----', - endMarker='-----END CERTIFICATE REQUEST-----') csr = decoder.decode(der_content, asn1Spec=rfc6402.CertificationRequest())[0] return X509Csr(csr) except Exception: - raise X509CsrError("Could not read X509 certificate from " - "PEM data.") + raise X509CsrError("Could not read X509 certificate from data.") @staticmethod - def from_buffer(data): + def from_buffer(data, encoding='pem'): """Create this CSR from a buffer :param data: The data buffer """ - return X509Csr.from_open_file(io.StringIO(data)) + return X509Csr.from_open_file(io.BytesIO(data), encoding) @staticmethod - def from_file(path): + def from_file(path, encoding='pem'): """Create this CSR from a file on disk :param path: Path to the file on disk """ with open(path, 'r') as f: - return X509Csr.from_open_file(f) + return X509Csr.from_open_file(f, encoding) def get_pubkey(self): """Get the public key from the CSR @@ -207,7 +213,7 @@ class X509Csr(signature.SignatureMixin): return self._get_signing_algorithm() def _get_signature(self): - return utils.bin_to_bytes(self._csr['signature']) + return x509_utils.bin_to_bytes(self._csr['signature']) def _get_signing_algorithm(self): return self._csr['signatureAlgorithm']['algorithm'] @@ -215,7 +221,7 @@ class X509Csr(signature.SignatureMixin): def _get_public_key(self): csr_info = self._csr['certificationRequestInfo'] key_info = csr_info['subjectPublicKeyInfo'] - return utils.get_public_key_from_der(encoder.encode(key_info)) + return x509_utils.get_public_key_from_der(encoder.encode(key_info)) def _get_bytes_to_sign(self): return encoder.encode(self._csr['certificationRequestInfo']) diff --git a/anchor/certificate_ops.py b/anchor/certificate_ops.py index 5750c13..b11f143 100644 --- a/anchor/certificate_ops.py +++ b/anchor/certificate_ops.py @@ -21,12 +21,14 @@ import uuid import pecan from webob import exc as http_status +from anchor import cmc from anchor import jsonloader +from anchor import util from anchor import validation from anchor.X509 import certificate from anchor.X509 import extension from anchor.X509 import signing_request -from anchor.X509 import utils +from anchor.X509 import utils as x509_utils logger = logging.getLogger(__name__) @@ -41,10 +43,10 @@ class SigningError(Exception): pass -def parse_csr(csr, encoding): +def parse_csr(data, encoding): """Loads the user provided CSR into the backend X509 library. - :param csr: CSR as provided by the API user + :param data: CSR as provided by the API user :param encoding: encoding for the CSR (must be PEM today) :return: CSR object from backend X509 library or aborts """ @@ -53,17 +55,27 @@ def parse_csr(csr, encoding): logger.error("parse_csr failed: bad encoding ({})".format(encoding)) pecan.abort(400, "invalid CSR") - if csr is None: + if data is None: logger.error("parse_csr failed: missing CSR") pecan.abort(400, "invalid CSR") - # load the CSR into the backend X509 library + # get DER version + der = util.extract_pem(data.encode('ascii')) + if der is None: + logger.error("perse_csr failed: PEM contentents not found") + pecan.abort(400, "PEM contents not found") + + # try to unpack the certificate from CMC wrappers try: - out_req = signing_request.X509Csr.from_buffer(csr) - return out_req - except Exception as e: - logger.exception("Exception while parsing the CSR: %s", e) - pecan.abort(400, "CSR cannot be parsed") + csr = cmc.parse_request(der) + return signing_request.X509Csr(csr) + except cmc.CMCParsingError: + # it's not CMC data, that's fine, it's likely the CSR itself + try: + return signing_request.X509Csr.from_buffer(der, 'der') + except Exception as e: + logger.exception("Exception while parsing the CSR: %s", e) + pecan.abort(400, "CSR cannot be parsed") def validate_csr(ra_name, auth_result, csr, request): @@ -208,7 +220,7 @@ def sign(csr, ca_conf): raise SigningError("Cannot load the signing CA: %s" % (e,)) try: - key = utils.get_private_key_from_file(ca_conf['key_path']) + key = x509_utils.get_private_key_from_file(ca_conf['key_path']) except Exception as e: raise SigningError("Cannot load the signing CA key: %s" % (e,)) diff --git a/anchor/cmc.py b/anchor/cmc.py new file mode 100644 index 0000000..404a072 --- /dev/null +++ b/anchor/cmc.py @@ -0,0 +1,80 @@ +from anchor.asn1 import rfc5652 +from anchor.asn1 import rfc6402 + +from pyasn1.codec.der import decoder +from pyasn1 import error + + +class CMCParsingError(Exception): + pass + + +class UnexpectedContentType(CMCParsingError): + def __init__(self, content_type): + self.content_type = content_type + + def __str__(self): + return "Unexpected content type, got %s" % self.content_type + + +def _unwrap_signed_data(data): + # Since we don't have trust with anyone signing the requests, this + # signature is not relevant. The request itself is self-signed which + # stops accidents. + result = decoder.decode(data, rfc5652.SignedData())[0] + return _unwrap_generic( + result['encapContentInfo']['eContentType'], + result['encapContentInfo']['eContent']) + + +def _unwrap_content_info(data): + result = decoder.decode(data, rfc5652.ContentInfo())[0] + return _unwrap_generic(result['contentType'], result['content']) + + +def _unwrap_generic(content_type, data): + unwrapper = CONTENT_TYPES.get(content_type) + if unwrapper is None: + return (content_type, data) + return unwrapper(data) + + +def strip_wrappers(data): + # assume the outer wrapper is contentinfo + return _unwrap_content_info(data) + + +CONTENT_TYPES = { + rfc5652.id_ct_contentInfo: _unwrap_content_info, + rfc5652.id_signedData: _unwrap_signed_data, +} + + +def parse_request(data): + try: + content_type, data = strip_wrappers(data) + except error.PyAsn1Error: + raise CMCParsingError("Cannot find valid CMC wrapper") + + if content_type != rfc6402.id_cct_PKIData: + raise UnexpectedContentType(content_type) + + pd = decoder.decode(data, rfc6402.PKIData())[0] + if len(pd['reqSequence']) == 0: + raise CMCParsingError("No certificate requests") + if len(pd['reqSequence']) > 1: + raise CMCParsingError("Can't handle multiple certificates") + req = pd['reqSequence'][0] + + if req.getName() != 'tcr': + raise CMCParsingError("Can handle only tagged cert requests") + + return req['tcr']['certificationRequest'] + + +if __name__ == "__main__": + import sys + with open(sys.argv[1], 'rb') as f: + data = f.read() + cert_req = parse_request(data) + print(cert_req.prettyPrint()) diff --git a/anchor/util.py b/anchor/util.py index 14f2318..ca37431 100644 --- a/anchor/util.py +++ b/anchor/util.py @@ -13,6 +13,7 @@ from __future__ import absolute_import +import base64 import hmac import re @@ -74,3 +75,28 @@ def verify_domain(domain, allow_wildcards=False): raise ValueError( "domain <%s> contains invalid characters " "(RFC1034/3.5)" % (domain,)) + + +def extract_pem(data, use_markers=True): + """Extract and unpack PEM data + + Anything between the BEGIN and END lines will be unpacked using base64. The + specific BEGIN/END content name is ignored since it's not standard anyway. + """ + if not isinstance(data, bytes): + raise TypeError("data must be bytes") + lines = data.splitlines() + seen_start = not use_markers + b64_content = b"" + for line in lines: + if line.startswith(b"-----END ") and line.endswith(b"-----"): + break + if seen_start: + b64_content += line + if line.startswith(b"-----BEGIN ") and line.endswith(b"-----"): + seen_start = True + + if not b64_content: + return None + decoder = getattr(base64, 'decodebytes', base64.decodestring) + return decoder(b64_content) diff --git a/tests/X509/test_x509_csr.py b/tests/X509/test_x509_csr.py index 7192280..4d7bdb5 100644 --- a/tests/X509/test_x509_csr.py +++ b/tests/X509/test_x509_csr.py @@ -49,7 +49,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def setUp(self): super(TestX509Csr, self).setUp() - self.csr = signing_request.X509Csr.from_buffer(TestX509Csr.csr_sample) + self.csr = signing_request.X509Csr.from_buffer( + TestX509Csr.csr_sample_bytes) def tearDown(self): pass @@ -87,7 +88,7 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def test_read_from_file(self): open_name = 'anchor.X509.signing_request.open' - f = io.StringIO(self.csr_sample) + f = io.BytesIO(self.csr_sample_bytes) with mock.patch(open_name, create=True) as mock_open: mock_open.return_value = f csr = signing_request.X509Csr.from_file("some_path") @@ -98,8 +99,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def test_bad_data_throws(self): bad_data = ( - u"some bad data is " - "EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m") + b"some bad data is " + b"EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m") csr = signing_request.X509Csr() self.assertRaises(x509_errors.X509Error, diff --git a/tests/__init__.py b/tests/__init__.py index 5fd5036..6db0be4 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -79,7 +79,7 @@ class DefaultRequestMixin(object): # CN=server1.example.com # 2048 RSA, basicConstraints, keyUsage exts csr_sample_cn = 'server1.example.com' - csr_sample = textwrap.dedent(u""" + csr_sample = textwrap.dedent(""" -----BEGIN CERTIFICATE REQUEST----- MIIDDjCCAfYCAQAwgZwxCzAJBgNVBAYTAlVLMQ8wDQYDVQQIEwZOYXJuaWExEjAQ BgNVBAcTCUZ1bmt5dG93bjEXMBUGA1UEChMOQW5jaG9yIFRlc3RpbmcxEDAOBgNV @@ -99,3 +99,4 @@ class DefaultRequestMixin(object): DxpZNBHlkA6LWaRqAtWws3uvom7IjHGgSr7UITrOR5iO5Hrm85X7K0AT6Bu75RZL +uYLLfj9Nb/iznREl9E3a/fN -----END CERTIFICATE REQUEST-----""") + csr_sample_bytes = csr_sample.encode('ascii') diff --git a/tests/fixups/test_fixup_functionality.py b/tests/fixups/test_fixup_functionality.py index 09905fd..4324fc6 100644 --- a/tests/fixups/test_fixup_functionality.py +++ b/tests/fixups/test_fixup_functionality.py @@ -32,7 +32,7 @@ class TestFixupFunctionality(tests.DefaultConfigMixin, super(TestFixupFunctionality, self).setUp() jsonloader.conf.load_extensions() self.csr = signing_request.X509Csr.from_buffer( - TestFixupFunctionality.csr_sample) + TestFixupFunctionality.csr_sample_bytes) def test_with_noop(self): """Ensure single fixup is processed.""" diff --git a/tests/test_signing_backend.py b/tests/test_signing_backend.py index eeef3ba..55207fb 100644 --- a/tests/test_signing_backend.py +++ b/tests/test_signing_backend.py @@ -33,7 +33,7 @@ class UnknownExtension(extension.X509Extension): class SigningBackendExtensions(tests.DefaultConfigMixin, tests.DefaultRequestMixin, unittest.TestCase): def test_copy_good_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = extension.X509ExtensionSubjectAltName() ext.add_dns_id("example.com") csr.add_extension(ext) @@ -44,7 +44,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, extension.X509ExtensionSubjectAltName))) def test_ignore_unknown_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = UnknownExtension() csr.add_extension(ext) @@ -53,7 +53,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, self.assertEqual(2, len(cert.get_extensions())) def test_fail_critical_unknown_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = UnknownExtension() ext.set_critical(True) csr.add_extension(ext) diff --git a/tests/validators/test_base_validation_functions.py b/tests/validators/test_base_validation_functions.py index 94d866d..7170a57 100644 --- a/tests/validators/test_base_validation_functions.py +++ b/tests/validators/test_base_validation_functions.py @@ -29,7 +29,7 @@ class TestBaseValidators(tests.DefaultRequestMixin, unittest.TestCase): def setUp(self): super(TestBaseValidators, self).setUp() self.csr = signing_request.X509Csr.from_buffer( - self.csr_sample) + self.csr_sample_bytes) def tearDown(self): super(TestBaseValidators, self).tearDown() diff --git a/tests/validators/test_callable_validators.py b/tests/validators/test_callable_validators.py index 08f1b25..ba0ff7f 100644 --- a/tests/validators/test_callable_validators.py +++ b/tests/validators/test_callable_validators.py @@ -555,11 +555,11 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): ) def test_csr_signature(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) self.assertIsNone(custom.csr_signature(csr=csr)) def test_csr_signature_bad_sig(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with mock.patch.object(x509_csr.X509Csr, '_get_signature', return_value=(b'A'*49)): with self.assertRaisesRegexp(errors.ValidationError, @@ -567,7 +567,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): custom.csr_signature(csr=csr) def test_csr_signature_bad_algo(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with mock.patch.object(x509_csr.X509Csr, '_get_signing_algorithm', return_value=rfc2459.id_dsa_with_sha1): with self.assertRaisesRegexp(errors.ValidationError, @@ -575,7 +575,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): custom.csr_signature(csr=csr) def test_public_key_good_rsa(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) self.assertIsNone(custom.public_key(csr=csr, allowed_keys={'RSA': 1024})) @@ -595,18 +595,18 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): dsa_key_der = base64.b64decode(dsa_key_pem) spki = decoder.decode(dsa_key_der, asn1Spec=rfc5280.SubjectPublicKeyInfo())[0] - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) csr._csr['certificationRequestInfo']['subjectPublicKeyInfo'] = spki self.assertIsNone(custom.public_key(csr=csr, allowed_keys={'DSA': 1024})) def test_public_key_too_short(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with self.assertRaises(errors.ValidationError): custom.public_key(csr=csr, allowed_keys={'RSA': 99999999}) def test_public_key_wrong_algo(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with self.assertRaises(errors.ValidationError): custom.public_key(csr=csr, allowed_keys={'XXX': 0}) diff --git a/tests/validators/test_standards_validator.py b/tests/validators/test_standards_validator.py index 99232c0..259520a 100644 --- a/tests/validators/test_standards_validator.py +++ b/tests/validators/test_standards_validator.py @@ -29,7 +29,7 @@ import tests class TestStandardsValidator(tests.DefaultRequestMixin, unittest.TestCase): def test_passing(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) standards.standards_compliance(csr=csr)