
Signing requests are expected to arrive at /v1/sign/<registration_authority> now. Virtual registration authority is a new concept which right now includes everything the original configuration included. That means for example each registration authority available within Anchor deployment can configure its own CA, auth, and validators. Clients request a specific registration authority via the URL. This does not mean they can just choose who signs the CSR - they still need to pass all checks. Only the guesswork of "which validation set applies to them" is gone because of this change. The previous concept of validator sets is gone. Each registration authority configures its own validators and all of them need to pass. Previous endpoint /sign will not work anymore. It's incompatible with the new design. The configuration file changes in the following way: 1. Registration authorities need to be defined in the main config. 2. Validator sets are not available anymore. 3. CA and auth settings at the top level need to be named. They can be referred to in the registration authority block. 4. Old names are removed. Any use of "auth", "ca", or "validators" at the top level will result in an error and explanation regarding the upgrade path. Further documentation and a sample config with the new layout can be found in the docs/configuration.rst file. Closes-bug: 1463752 Change-Id: I5a949e0c79a2d56eadadf5ece62bb8b8eea89e78
167 lines
7.4 KiB
Python
167 lines
7.4 KiB
Python
# -*- coding:utf-8 -*-
|
|
#
|
|
# Copyright 2015 Nebula Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import textwrap
|
|
import unittest
|
|
|
|
import mock
|
|
from webob import exc as http_status
|
|
|
|
from anchor import certificate_ops
|
|
from anchor.X509 import name as x509_name
|
|
import tests
|
|
|
|
|
|
class CertificateOpsTests(tests.DefaultConfigMixin, unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# This is a CSR with CN=anchor-test.example.com
|
|
self.expected_cn = "anchor-test.example.com"
|
|
self.csr = textwrap.dedent(u"""
|
|
-----BEGIN CERTIFICATE REQUEST-----
|
|
MIIEsDCCApgCAQAwazELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWEx
|
|
FjAUBgNVBAcTDU1vdW50YWluIFZpZXcxDTALBgNVBAoTBEFjbWUxIDAeBgNVBAMT
|
|
F2FuY2hvci10ZXN0LmV4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A
|
|
MIICCgKCAgEAvRri1XL/BIR882HdRisntITkEwDmBUmNcVKioOOc6wfLDzhrDFZc
|
|
fo34CvSPm4q4qlnGd4mmJt6rmDwZFhp4PPWHvZ4XWNygI0hZK3P+R6YZWOe2EwCU
|
|
M2+yCLLDAVucQZmqFtKLv3fedjM3udgEHrf6rf8eyE9X0eyXGJ7jNLEQvJktpr6v
|
|
JrKnMVssyzUXek4ZiWUoMY864MYeG+ZbdeHzSCMC9iCdfQIdTGUJ0SclOPoRPSUu
|
|
zyTE4FfDFLCZof6gcYudpdSK4Iy84G89kIb1Yfdvg5ak71uMtPRxSncsbfg4TK8r
|
|
WuyXs4alIA5bunhf89b6zt3BKufTzs6jBi9oLertafPW9Xgl2PbIeyN9JH1207/L
|
|
EnXpTS5XK/SDRnzkKCe9aSHy+mS77bRQjV+U67V+TVc9OrbZYQ5krHb3mlWf31wU
|
|
owS0d7DQLhGrPtBs/C85u4DUTHJcZys7RX4Q7fArDkN1sszhtoxNb2WTgYLKYQYF
|
|
IHdYRF8Bqq7ZrNJ+2MOQS1kowXTluJKuCQbgL+UwJl+wtrRdxt64CKSCmtH7h4H+
|
|
yhDD2J6CP3jz4SUQY/CxCmHzI1SVDmHwtr7J02V468Bz+zwT9YbLyvwPKAKJAW5h
|
|
MUpYN+Yg6Ch7TEa/qw+tbkJbSQeXiAIpzRVAzffo8+djG+UnMdLSMBcCAwEAAaAA
|
|
MA0GCSqGSIb3DQEBBQUAA4ICAQAr3YwkjT9Lft7DQP328BfudnAQR+tdodAtZGRU
|
|
y3ZUVupQwgtYdCCRnneCdVcAQUnj6tZHkzBhHBflVz24vXZZHiQilaajzeCoJpj5
|
|
jXy1ZjPK/efTKw8H325N8hHqGgiXEp86K06LZ4a6m3K+lBZbhb2hSt2MJx8DDn1Y
|
|
YE1Ssvo0rxDrhnPbAeAdmVNT4zCazYTAaYk2IwAAY9BsoRQouYsSHbVxG+KFGp2A
|
|
Rw9ryCqBXUAbj0b7whOFEj7pqg3F8nNbPuFdaUoCGaN8TWQFy4diwFsujGONDl4w
|
|
Df82BlAj/ty9z5WUCs01+z9X4SDm+vchSMqBKgAYZSKEAEmlQf++3tlJpEG7jM1l
|
|
SqYkeSVWrkHSBXkNQQ2iNmzMBvCA40Qont8OXP/gqS0+rS37f2LtuUueCgV8Gtay
|
|
RWgH7/JcdLEMm/XohRyD2yVz/JhKNWkYyEjtpr4wFTgFX48v6H4fE7o0HcUHy4nK
|
|
vN4vwXoa71x65lL1HcZdqYr/ff9KHcwxaOnflTgXzBMvm++F7EwEOK61TDuNkZ8h
|
|
gaf8Ejt1XNtA1jPNnRES7gqafOJAwYyshr5XoLzHUgbXBTVlEp5t2buxf76n+nzz
|
|
Zz6BD8nuXQMGPy60ql12MQvLmdX7mFFHthucExhA/9R7wSPtdS8OBPljumgUuhRR
|
|
BcW7kw==
|
|
-----END CERTIFICATE REQUEST-----""")
|
|
super(CertificateOpsTests, self).setUp()
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
def test_parse_csr_success1(self):
|
|
"""Test basic success path for parse_csr."""
|
|
result = certificate_ops.parse_csr(self.csr, 'pem')
|
|
subject = result.get_subject()
|
|
actual_cn = subject.get_entries_by_oid(
|
|
x509_name.OID_commonName)[0].get_value()
|
|
self.assertEqual(actual_cn, self.expected_cn)
|
|
|
|
def test_parse_csr_success2(self):
|
|
"""Test basic success path for parse_csr."""
|
|
result = certificate_ops.parse_csr(self.csr, 'PEM')
|
|
subject = result.get_subject()
|
|
actual_cn = subject.get_entries_by_oid(
|
|
x509_name.OID_commonName)[0].get_value()
|
|
self.assertEqual(actual_cn, self.expected_cn)
|
|
|
|
def test_parse_csr_fail1(self):
|
|
"""Test invalid CSR format (wrong value) for parse_csr."""
|
|
with self.assertRaises(http_status.HTTPClientError):
|
|
certificate_ops.parse_csr(self.csr, 'blah')
|
|
|
|
def test_parse_csr_fail2(self):
|
|
"""Test invalid CSR format (wrong type) for parse_csr."""
|
|
with self.assertRaises(http_status.HTTPClientError):
|
|
certificate_ops.parse_csr(self.csr, True)
|
|
|
|
def test_parse_csr_fail3(self):
|
|
"""Test invalid CSR (None) format for parse_csr."""
|
|
with self.assertRaises(http_status.HTTPClientError):
|
|
certificate_ops.parse_csr(None, 'pem')
|
|
|
|
def test_parse_csr_fail4(self):
|
|
"""Test invalid CSR (wrong value) format for parse_csr."""
|
|
with self.assertRaises(http_status.HTTPClientError):
|
|
certificate_ops.parse_csr('invalid csr input', 'pem')
|
|
|
|
def test_validate_csr_success(self):
|
|
"""Test basic success path for validate_csr."""
|
|
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
|
|
config = "anchor.jsonloader.conf._config"
|
|
self.sample_conf_ra['default_ra']['validators'] = {'extensions': {
|
|
'allowed_extensions': []}}
|
|
data = self.sample_conf
|
|
|
|
with mock.patch.dict(config, data):
|
|
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
|
|
|
|
def test_validate_csr_bypass(self):
|
|
"""Test empty validator set for validate_csr."""
|
|
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
|
|
config = "anchor.jsonloader.conf._config"
|
|
self.sample_conf_ra['default_ra']['validators'] = {}
|
|
data = self.sample_conf
|
|
|
|
with mock.patch.dict(config, data):
|
|
# this should work, it allows people to bypass validation
|
|
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
|
|
|
|
def test_validate_csr_fail(self):
|
|
"""Test failure path for validate_csr."""
|
|
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
|
|
config = "anchor.jsonloader.conf._config"
|
|
self.sample_conf_ra['default_ra']['validators'] = {
|
|
'common_name': {
|
|
'allowed_domains': ['.testing.com']
|
|
}
|
|
}
|
|
data = self.sample_conf
|
|
|
|
with mock.patch.dict(config, data):
|
|
with self.assertRaises(http_status.HTTPException) as cm:
|
|
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
|
|
self.assertEqual(cm.exception.code, 400)
|
|
|
|
def test_ca_cert_read_failure(self):
|
|
"""Test CA certificate read failure."""
|
|
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
|
|
config = "anchor.jsonloader.conf._config"
|
|
ca_conf = self.sample_conf_ca['default_ca']
|
|
ca_conf['cert_path'] = '/xxx/not/a/valid/path'
|
|
ca_conf['key_path'] = 'tests/CA/root-ca-unwrapped.key'
|
|
data = self.sample_conf
|
|
|
|
with mock.patch.dict(config, data):
|
|
with self.assertRaises(http_status.HTTPException) as cm:
|
|
certificate_ops.sign('default_ra', csr_obj)
|
|
self.assertEqual(cm.exception.code, 500)
|
|
|
|
def test_ca_key_read_failure(self):
|
|
"""Test CA key read failure."""
|
|
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
|
|
config = "anchor.jsonloader.conf._config"
|
|
self.sample_conf_ca['default_ca']['cert_path'] = 'tests/CA/root-ca.crt'
|
|
self.sample_conf_ca['default_ca']['key_path'] = '/xxx/not/a/valid/path'
|
|
data = self.sample_conf
|
|
|
|
with mock.patch.dict(config, data):
|
|
with self.assertRaises(http_status.HTTPException) as cm:
|
|
certificate_ops.sign('default_ra', csr_obj)
|
|
self.assertEqual(cm.exception.code, 500)
|