From 81264fb9bb092489a578d655ad3deaa00c333d7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Pitucha?= Date: Fri, 4 Sep 2015 14:50:36 +1000 Subject: [PATCH] Add fixup enforcing SAN extension Fixup to make sure that if we have a CN, we have a matching SAN entry. Change-Id: Ic37a053d909f2411e8f08acfa7cf9606a6316e58 Closes-bug: 1401580 --- anchor/X509/signing_request.py | 9 ++ anchor/fixups.py | 44 +++++++ doc/source/fixups.rst | 9 ++ setup.cfg | 3 + ..._fixup_ensure_alternative_names_present.py | 109 ++++++++++++++++++ 5 files changed, 174 insertions(+) create mode 100644 anchor/fixups.py create mode 100644 tests/fixups/test_fixup_ensure_alternative_names_present.py diff --git a/anchor/X509/signing_request.py b/anchor/X509/signing_request.py index 13edfc6..88d0857 100644 --- a/anchor/X509/signing_request.py +++ b/anchor/X509/signing_request.py @@ -101,6 +101,15 @@ class X509Csr(signature.SignatureMixin): subject = ri['subject'][0] return name.X509Name(subject) + def set_subject(self, subject): + if not isinstance(subject, name.X509Name): + raise TypeError("subject must be an X509Name") + ri = self.get_request_info() + if ri['subject'] is None: + ri['subject'] = None + + ri['subject'][0] = subject._name_obj + def get_attributes(self): ri = self.get_request_info() if ri['attributes'] is None: diff --git a/anchor/fixups.py b/anchor/fixups.py new file mode 100644 index 0000000..3dd0969 --- /dev/null +++ b/anchor/fixups.py @@ -0,0 +1,44 @@ +# -*- coding:utf-8 -*- +# +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +from anchor.X509 import extension + + +def enforce_alternative_names_present(csr=None, **kwargs): + """Make sure that if CN is set, it's also present in SAN extension.""" + sans = csr.get_extensions(extension.X509ExtensionSubjectAltName) + if sans: + san = sans[0] + else: + san = extension.X509ExtensionSubjectAltName() + + san_updated = False + for cn in csr.get_subject_cn(): + try: + ip = netaddr.IPAddress(cn) + if ip not in san.get_ips(): + san.add_ip(ip) + san_updated = True + except netaddr.AddrFormatError: + if cn not in san.get_dns_ids(): + san.add_dns_id(cn) + san_updated = True + + if san_updated: + csr.add_extension(san) + return csr diff --git a/doc/source/fixups.rst b/doc/source/fixups.rst index a5d927c..8a9646a 100644 --- a/doc/source/fixups.rst +++ b/doc/source/fixups.rst @@ -8,3 +8,12 @@ and the configuration. Unlike validators, each fixup has to return either a new CSR structure or the modified original. + +Included fixups +--------------- + +``enforce_alternative_names_present`` + No parameters. + + If the value from CN does not exist in subject alternative names, it will + be copied into either then DNS or IP field, depending on the format. diff --git a/setup.cfg b/setup.cfg index bacb9f1..8493952 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,6 +49,9 @@ anchor.authentication = ldap = anchor.auth.ldap:login static = anchor.auth.static:login +anchor.fixups = + enforce_alternative_names_present = anchor.fixups:enforce_alternative_names_present + [files] packages = anchor diff --git a/tests/fixups/test_fixup_ensure_alternative_names_present.py b/tests/fixups/test_fixup_ensure_alternative_names_present.py new file mode 100644 index 0000000..c68b56b --- /dev/null +++ b/tests/fixups/test_fixup_ensure_alternative_names_present.py @@ -0,0 +1,109 @@ +# -*- coding:utf-8 -*- +# +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import netaddr + +from anchor import fixups +from anchor.X509 import extension +from anchor.X509 import name +from anchor.X509 import signing_request + + +class TestEnsureAlternativeNamesPresent(unittest.TestCase): + def setUp(self): + super(TestEnsureAlternativeNamesPresent, self).setUp() + + def _csr_with_cn(self, cn): + csr = signing_request.X509Csr() + subject = name.X509Name() + subject.add_name_entry(name.OID_commonName, cn) + csr.set_subject(subject) + return csr + + def test_no_cn(self): + csr = signing_request.X509Csr() + subject = name.X509Name() + subject.add_name_entry(name.OID_localityName, "somewhere") + csr.set_subject(subject) + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(0, len(new_csr.get_extensions())) + + def test_cn_only_ip(self): + csr = self._csr_with_cn("1.2.3.4") + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + self.assertEqual([netaddr.IPAddress("1.2.3.4")], ext.get_ips()) + + def test_cn_only_dns(self): + csr = self._csr_with_cn("example.com") + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + self.assertEqual(["example.com"], ext.get_dns_ids()) + + def test_cn_existing_ip(self): + csr = self._csr_with_cn("1.2.3.4") + san = extension.X509ExtensionSubjectAltName() + san.add_ip(netaddr.IPAddress("1.2.3.4")) + csr.add_extension(san) + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + self.assertEqual([netaddr.IPAddress("1.2.3.4")], ext.get_ips()) + + def test_cn_existing_dns(self): + csr = self._csr_with_cn("example.com") + san = extension.X509ExtensionSubjectAltName() + san.add_dns_id("example.com") + csr.add_extension(san) + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + self.assertEqual(["example.com"], ext.get_dns_ids()) + + def test_cn_extra_ip(self): + csr = self._csr_with_cn("1.2.3.4") + san = extension.X509ExtensionSubjectAltName() + san.add_ip(netaddr.IPAddress("2.3.4.5")) + csr.add_extension(san) + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + ips = ext.get_ips() + self.assertIn(netaddr.IPAddress("1.2.3.4"), ips) + self.assertIn(netaddr.IPAddress("2.3.4.5"), ips) + + def test_cn_extra_dns(self): + csr = self._csr_with_cn("example.com") + san = extension.X509ExtensionSubjectAltName() + san.add_dns_id("other.example.com") + csr.add_extension(san) + + new_csr = fixups.enforce_alternative_names_present(csr=csr) + self.assertEqual(1, len(new_csr.get_extensions())) + ext = new_csr.get_extensions(extension.X509ExtensionSubjectAltName)[0] + ids = ext.get_dns_ids() + self.assertIn("example.com", ids) + self.assertIn("other.example.com", ids)