From 7f51b08ea330225985a0353135602359bf0b6ef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Pitucha?= Date: Fri, 15 Jan 2016 17:32:09 +1100 Subject: [PATCH] Add support for CMC requests Incoming CMC requests should be stripped of all wrappers, then the internal pkcs10 request is processed as usual. No verification is done on the SignedData wrapper, because there's no known certificate to trust. Response is just the bare certificate for now. Change-Id: I92c76df775e5f339ac2fae95582097e3afe138af --- anchor/X509/signing_request.py | 34 ++++---- anchor/certificate_ops.py | 34 +++++--- anchor/cmc.py | 80 +++++++++++++++++++ anchor/util.py | 26 ++++++ tests/X509/test_x509_csr.py | 9 ++- tests/__init__.py | 3 +- tests/fixups/test_fixup_functionality.py | 2 +- tests/test_signing_backend.py | 6 +- .../test_base_validation_functions.py | 2 +- tests/validators/test_callable_validators.py | 14 ++-- tests/validators/test_standards_validator.py | 2 +- 11 files changed, 169 insertions(+), 43 deletions(-) create mode 100644 anchor/cmc.py diff --git a/anchor/X509/signing_request.py b/anchor/X509/signing_request.py index cb2fda8..67a8407 100644 --- a/anchor/X509/signing_request.py +++ b/anchor/X509/signing_request.py @@ -19,15 +19,15 @@ import io from pyasn1.codec.der import decoder from pyasn1.codec.der import encoder from pyasn1.type import univ as asn1_univ -from pyasn1_modules import pem from anchor.asn1 import rfc5280 from anchor.asn1 import rfc6402 +from anchor import util from anchor.X509 import errors from anchor.X509 import extension from anchor.X509 import name from anchor.X509 import signature -from anchor.X509 import utils +from anchor.X509 import utils as x509_utils OID_extensionRequest = asn1_univ.ObjectIdentifier('1.2.840.113549.1.9.14') @@ -47,34 +47,40 @@ class X509Csr(signature.SignatureMixin): self._csr = csr @staticmethod - def from_open_file(f): + def from_open_file(f, encoding='pem'): + if encoding == 'pem': + try: + der_content = util.extract_pem(f.read()) + except Exception: + raise X509CsrError("Data not in PEM format") + elif encoding == 'der': + der_content = f.read() + else: + raise X509CsrError("Unknown encoding") + try: - der_content = pem.readPemFromFile( - f, startMarker='-----BEGIN CERTIFICATE REQUEST-----', - endMarker='-----END CERTIFICATE REQUEST-----') csr = decoder.decode(der_content, asn1Spec=rfc6402.CertificationRequest())[0] return X509Csr(csr) except Exception: - raise X509CsrError("Could not read X509 certificate from " - "PEM data.") + raise X509CsrError("Could not read X509 certificate from data.") @staticmethod - def from_buffer(data): + def from_buffer(data, encoding='pem'): """Create this CSR from a buffer :param data: The data buffer """ - return X509Csr.from_open_file(io.StringIO(data)) + return X509Csr.from_open_file(io.BytesIO(data), encoding) @staticmethod - def from_file(path): + def from_file(path, encoding='pem'): """Create this CSR from a file on disk :param path: Path to the file on disk """ with open(path, 'r') as f: - return X509Csr.from_open_file(f) + return X509Csr.from_open_file(f, encoding) def get_pubkey(self): """Get the public key from the CSR @@ -207,7 +213,7 @@ class X509Csr(signature.SignatureMixin): return self._get_signing_algorithm() def _get_signature(self): - return utils.bin_to_bytes(self._csr['signature']) + return x509_utils.bin_to_bytes(self._csr['signature']) def _get_signing_algorithm(self): return self._csr['signatureAlgorithm']['algorithm'] @@ -215,7 +221,7 @@ class X509Csr(signature.SignatureMixin): def _get_public_key(self): csr_info = self._csr['certificationRequestInfo'] key_info = csr_info['subjectPublicKeyInfo'] - return utils.get_public_key_from_der(encoder.encode(key_info)) + return x509_utils.get_public_key_from_der(encoder.encode(key_info)) def _get_bytes_to_sign(self): return encoder.encode(self._csr['certificationRequestInfo']) diff --git a/anchor/certificate_ops.py b/anchor/certificate_ops.py index 5750c13..b11f143 100644 --- a/anchor/certificate_ops.py +++ b/anchor/certificate_ops.py @@ -21,12 +21,14 @@ import uuid import pecan from webob import exc as http_status +from anchor import cmc from anchor import jsonloader +from anchor import util from anchor import validation from anchor.X509 import certificate from anchor.X509 import extension from anchor.X509 import signing_request -from anchor.X509 import utils +from anchor.X509 import utils as x509_utils logger = logging.getLogger(__name__) @@ -41,10 +43,10 @@ class SigningError(Exception): pass -def parse_csr(csr, encoding): +def parse_csr(data, encoding): """Loads the user provided CSR into the backend X509 library. - :param csr: CSR as provided by the API user + :param data: CSR as provided by the API user :param encoding: encoding for the CSR (must be PEM today) :return: CSR object from backend X509 library or aborts """ @@ -53,17 +55,27 @@ def parse_csr(csr, encoding): logger.error("parse_csr failed: bad encoding ({})".format(encoding)) pecan.abort(400, "invalid CSR") - if csr is None: + if data is None: logger.error("parse_csr failed: missing CSR") pecan.abort(400, "invalid CSR") - # load the CSR into the backend X509 library + # get DER version + der = util.extract_pem(data.encode('ascii')) + if der is None: + logger.error("perse_csr failed: PEM contentents not found") + pecan.abort(400, "PEM contents not found") + + # try to unpack the certificate from CMC wrappers try: - out_req = signing_request.X509Csr.from_buffer(csr) - return out_req - except Exception as e: - logger.exception("Exception while parsing the CSR: %s", e) - pecan.abort(400, "CSR cannot be parsed") + csr = cmc.parse_request(der) + return signing_request.X509Csr(csr) + except cmc.CMCParsingError: + # it's not CMC data, that's fine, it's likely the CSR itself + try: + return signing_request.X509Csr.from_buffer(der, 'der') + except Exception as e: + logger.exception("Exception while parsing the CSR: %s", e) + pecan.abort(400, "CSR cannot be parsed") def validate_csr(ra_name, auth_result, csr, request): @@ -208,7 +220,7 @@ def sign(csr, ca_conf): raise SigningError("Cannot load the signing CA: %s" % (e,)) try: - key = utils.get_private_key_from_file(ca_conf['key_path']) + key = x509_utils.get_private_key_from_file(ca_conf['key_path']) except Exception as e: raise SigningError("Cannot load the signing CA key: %s" % (e,)) diff --git a/anchor/cmc.py b/anchor/cmc.py new file mode 100644 index 0000000..404a072 --- /dev/null +++ b/anchor/cmc.py @@ -0,0 +1,80 @@ +from anchor.asn1 import rfc5652 +from anchor.asn1 import rfc6402 + +from pyasn1.codec.der import decoder +from pyasn1 import error + + +class CMCParsingError(Exception): + pass + + +class UnexpectedContentType(CMCParsingError): + def __init__(self, content_type): + self.content_type = content_type + + def __str__(self): + return "Unexpected content type, got %s" % self.content_type + + +def _unwrap_signed_data(data): + # Since we don't have trust with anyone signing the requests, this + # signature is not relevant. The request itself is self-signed which + # stops accidents. + result = decoder.decode(data, rfc5652.SignedData())[0] + return _unwrap_generic( + result['encapContentInfo']['eContentType'], + result['encapContentInfo']['eContent']) + + +def _unwrap_content_info(data): + result = decoder.decode(data, rfc5652.ContentInfo())[0] + return _unwrap_generic(result['contentType'], result['content']) + + +def _unwrap_generic(content_type, data): + unwrapper = CONTENT_TYPES.get(content_type) + if unwrapper is None: + return (content_type, data) + return unwrapper(data) + + +def strip_wrappers(data): + # assume the outer wrapper is contentinfo + return _unwrap_content_info(data) + + +CONTENT_TYPES = { + rfc5652.id_ct_contentInfo: _unwrap_content_info, + rfc5652.id_signedData: _unwrap_signed_data, +} + + +def parse_request(data): + try: + content_type, data = strip_wrappers(data) + except error.PyAsn1Error: + raise CMCParsingError("Cannot find valid CMC wrapper") + + if content_type != rfc6402.id_cct_PKIData: + raise UnexpectedContentType(content_type) + + pd = decoder.decode(data, rfc6402.PKIData())[0] + if len(pd['reqSequence']) == 0: + raise CMCParsingError("No certificate requests") + if len(pd['reqSequence']) > 1: + raise CMCParsingError("Can't handle multiple certificates") + req = pd['reqSequence'][0] + + if req.getName() != 'tcr': + raise CMCParsingError("Can handle only tagged cert requests") + + return req['tcr']['certificationRequest'] + + +if __name__ == "__main__": + import sys + with open(sys.argv[1], 'rb') as f: + data = f.read() + cert_req = parse_request(data) + print(cert_req.prettyPrint()) diff --git a/anchor/util.py b/anchor/util.py index 14f2318..ca37431 100644 --- a/anchor/util.py +++ b/anchor/util.py @@ -13,6 +13,7 @@ from __future__ import absolute_import +import base64 import hmac import re @@ -74,3 +75,28 @@ def verify_domain(domain, allow_wildcards=False): raise ValueError( "domain <%s> contains invalid characters " "(RFC1034/3.5)" % (domain,)) + + +def extract_pem(data, use_markers=True): + """Extract and unpack PEM data + + Anything between the BEGIN and END lines will be unpacked using base64. The + specific BEGIN/END content name is ignored since it's not standard anyway. + """ + if not isinstance(data, bytes): + raise TypeError("data must be bytes") + lines = data.splitlines() + seen_start = not use_markers + b64_content = b"" + for line in lines: + if line.startswith(b"-----END ") and line.endswith(b"-----"): + break + if seen_start: + b64_content += line + if line.startswith(b"-----BEGIN ") and line.endswith(b"-----"): + seen_start = True + + if not b64_content: + return None + decoder = getattr(base64, 'decodebytes', base64.decodestring) + return decoder(b64_content) diff --git a/tests/X509/test_x509_csr.py b/tests/X509/test_x509_csr.py index 7192280..4d7bdb5 100644 --- a/tests/X509/test_x509_csr.py +++ b/tests/X509/test_x509_csr.py @@ -49,7 +49,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def setUp(self): super(TestX509Csr, self).setUp() - self.csr = signing_request.X509Csr.from_buffer(TestX509Csr.csr_sample) + self.csr = signing_request.X509Csr.from_buffer( + TestX509Csr.csr_sample_bytes) def tearDown(self): pass @@ -87,7 +88,7 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def test_read_from_file(self): open_name = 'anchor.X509.signing_request.open' - f = io.StringIO(self.csr_sample) + f = io.BytesIO(self.csr_sample_bytes) with mock.patch(open_name, create=True) as mock_open: mock_open.return_value = f csr = signing_request.X509Csr.from_file("some_path") @@ -98,8 +99,8 @@ class TestX509Csr(tests.DefaultRequestMixin, unittest.TestCase): def test_bad_data_throws(self): bad_data = ( - u"some bad data is " - "EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m") + b"some bad data is " + b"EHRlc3RAYW5jaG9yLnRlc3QwTDANBgkqhkiG9w0BAQEFAAM7ADA4AjEA6m") csr = signing_request.X509Csr() self.assertRaises(x509_errors.X509Error, diff --git a/tests/__init__.py b/tests/__init__.py index 5fd5036..6db0be4 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -79,7 +79,7 @@ class DefaultRequestMixin(object): # CN=server1.example.com # 2048 RSA, basicConstraints, keyUsage exts csr_sample_cn = 'server1.example.com' - csr_sample = textwrap.dedent(u""" + csr_sample = textwrap.dedent(""" -----BEGIN CERTIFICATE REQUEST----- MIIDDjCCAfYCAQAwgZwxCzAJBgNVBAYTAlVLMQ8wDQYDVQQIEwZOYXJuaWExEjAQ BgNVBAcTCUZ1bmt5dG93bjEXMBUGA1UEChMOQW5jaG9yIFRlc3RpbmcxEDAOBgNV @@ -99,3 +99,4 @@ class DefaultRequestMixin(object): DxpZNBHlkA6LWaRqAtWws3uvom7IjHGgSr7UITrOR5iO5Hrm85X7K0AT6Bu75RZL +uYLLfj9Nb/iznREl9E3a/fN -----END CERTIFICATE REQUEST-----""") + csr_sample_bytes = csr_sample.encode('ascii') diff --git a/tests/fixups/test_fixup_functionality.py b/tests/fixups/test_fixup_functionality.py index 09905fd..4324fc6 100644 --- a/tests/fixups/test_fixup_functionality.py +++ b/tests/fixups/test_fixup_functionality.py @@ -32,7 +32,7 @@ class TestFixupFunctionality(tests.DefaultConfigMixin, super(TestFixupFunctionality, self).setUp() jsonloader.conf.load_extensions() self.csr = signing_request.X509Csr.from_buffer( - TestFixupFunctionality.csr_sample) + TestFixupFunctionality.csr_sample_bytes) def test_with_noop(self): """Ensure single fixup is processed.""" diff --git a/tests/test_signing_backend.py b/tests/test_signing_backend.py index eeef3ba..55207fb 100644 --- a/tests/test_signing_backend.py +++ b/tests/test_signing_backend.py @@ -33,7 +33,7 @@ class UnknownExtension(extension.X509Extension): class SigningBackendExtensions(tests.DefaultConfigMixin, tests.DefaultRequestMixin, unittest.TestCase): def test_copy_good_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = extension.X509ExtensionSubjectAltName() ext.add_dns_id("example.com") csr.add_extension(ext) @@ -44,7 +44,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, extension.X509ExtensionSubjectAltName))) def test_ignore_unknown_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = UnknownExtension() csr.add_extension(ext) @@ -53,7 +53,7 @@ class SigningBackendExtensions(tests.DefaultConfigMixin, self.assertEqual(2, len(cert.get_extensions())) def test_fail_critical_unknown_extensions(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) ext = UnknownExtension() ext.set_critical(True) csr.add_extension(ext) diff --git a/tests/validators/test_base_validation_functions.py b/tests/validators/test_base_validation_functions.py index 94d866d..7170a57 100644 --- a/tests/validators/test_base_validation_functions.py +++ b/tests/validators/test_base_validation_functions.py @@ -29,7 +29,7 @@ class TestBaseValidators(tests.DefaultRequestMixin, unittest.TestCase): def setUp(self): super(TestBaseValidators, self).setUp() self.csr = signing_request.X509Csr.from_buffer( - self.csr_sample) + self.csr_sample_bytes) def tearDown(self): super(TestBaseValidators, self).tearDown() diff --git a/tests/validators/test_callable_validators.py b/tests/validators/test_callable_validators.py index 08f1b25..ba0ff7f 100644 --- a/tests/validators/test_callable_validators.py +++ b/tests/validators/test_callable_validators.py @@ -555,11 +555,11 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): ) def test_csr_signature(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) self.assertIsNone(custom.csr_signature(csr=csr)) def test_csr_signature_bad_sig(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with mock.patch.object(x509_csr.X509Csr, '_get_signature', return_value=(b'A'*49)): with self.assertRaisesRegexp(errors.ValidationError, @@ -567,7 +567,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): custom.csr_signature(csr=csr) def test_csr_signature_bad_algo(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with mock.patch.object(x509_csr.X509Csr, '_get_signing_algorithm', return_value=rfc2459.id_dsa_with_sha1): with self.assertRaisesRegexp(errors.ValidationError, @@ -575,7 +575,7 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): custom.csr_signature(csr=csr) def test_public_key_good_rsa(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) self.assertIsNone(custom.public_key(csr=csr, allowed_keys={'RSA': 1024})) @@ -595,18 +595,18 @@ class TestValidators(tests.DefaultRequestMixin, unittest.TestCase): dsa_key_der = base64.b64decode(dsa_key_pem) spki = decoder.decode(dsa_key_der, asn1Spec=rfc5280.SubjectPublicKeyInfo())[0] - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) csr._csr['certificationRequestInfo']['subjectPublicKeyInfo'] = spki self.assertIsNone(custom.public_key(csr=csr, allowed_keys={'DSA': 1024})) def test_public_key_too_short(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with self.assertRaises(errors.ValidationError): custom.public_key(csr=csr, allowed_keys={'RSA': 99999999}) def test_public_key_wrong_algo(self): - csr = x509_csr.X509Csr.from_buffer(self.csr_sample) + csr = x509_csr.X509Csr.from_buffer(self.csr_sample_bytes) with self.assertRaises(errors.ValidationError): custom.public_key(csr=csr, allowed_keys={'XXX': 0}) diff --git a/tests/validators/test_standards_validator.py b/tests/validators/test_standards_validator.py index 99232c0..259520a 100644 --- a/tests/validators/test_standards_validator.py +++ b/tests/validators/test_standards_validator.py @@ -29,7 +29,7 @@ import tests class TestStandardsValidator(tests.DefaultRequestMixin, unittest.TestCase): def test_passing(self): - csr = signing_request.X509Csr.from_buffer(self.csr_sample) + csr = signing_request.X509Csr.from_buffer(self.csr_sample_bytes) standards.standards_compliance(csr=csr)