Validator chains now exit on the first error

Also, added some more robust error handling for buggy validators.
A broken validator will now cause Anchor to fail-closed and deny
a certificate, exactly as if that validator test had failed
normally.

A message is logged giving information about the error and a 500
is returned to the calling service with minimal information.

Change-Id: Iddbd0e831a52cc02cc569f220aebf82bd59554d0
This commit is contained in:
Tim Kelsey 2015-04-24 16:26:00 +01:00
parent abd2fef7a8
commit a573aa7e04
2 changed files with 32 additions and 11 deletions

View File

@ -106,20 +106,22 @@ def validate_csr(auth_result, csr, request):
# so we set the initial state to valid.
valid = True
for name, vset in jsonloader.conf.validators.iteritems():
logger.debug("validate_csr: checking {}".format(name))
try:
for name, vset in jsonloader.conf.validators.iteritems():
logger.debug("validate_csr: checking with set {}".format(name))
for vname, validator in vset.iteritems():
valid = _run_validator(vname, validator, args)
if not valid:
break # early out at the first error
results = [_run_validator(x, y, args) for x, y in vset.iteritems()]
results.append(valid) # track previous failures
valid = all(results)
# valid here says that either (1) we didn't run any tests, or (2) we
# ran some tests and they all passed. Either way, we can just return.
if valid:
return
except Exception as e:
logger.exception("Error running validator <%s> - %s", vname, e)
pecan.abort(500, "Internal Validation Error running validator "
"'{}' in set '{}'".format(vname, name))
# something failed, return a 400 to the client
pecan.abort(400, "CSR failed validation")
if not valid:
pecan.abort(400, "CSR failed validation")
def sign(csr):

View File

@ -25,6 +25,7 @@ import pecan
from pecan import testing as pecan_testing
from anchor import jsonloader
from anchor import validators
from anchor.X509 import certificate as X509_cert
import config
@ -154,3 +155,21 @@ class TestFunctional(unittest.TestCase):
# make sure the cert was issued by anchor
self.assertEqual("/C=UK/ST=Some-State/O=OSSG/CN=anchor.example.com",
str(cert.get_issuer()))
def test_check_broken_validator(self):
data = {'user': 'myusername',
'secret': 'simplepassword',
'encoding': 'pem',
'csr': TestFunctional.csr_good}
def derp(**kwdargs):
raise Exception("BOOM")
validators.broken_validator = derp
jsonloader.conf.validators["default"]["broken_validator"] = {}
resp = self.app.post('/sign', data, expect_errors=True)
self.assertEqual(500, resp.status_int)
self.assertTrue(("Internal Validation Error running "
"validator 'broken_validator' "
"in set 'default'") in str(resp))