diff --git a/README.rst b/README.rst index b3b5209..c6c29fe 100644 --- a/README.rst +++ b/README.rst @@ -257,9 +257,11 @@ base. Signing backends ================ -Anchor allows the use of configurable signing backend. While it provides -one implementation (based on cryptography.io and OpenSSL), other -implementations may be configured. +Anchor allows the use of configurable signing backend. Currently it provides two +implementation: one based on cryptography.io ("anchor"), the other using PKCS#11 +libraries ("pkcs11"). The first one is used in the sample config. Other backends +may have extra dependencies: pkcs11 requires the PyKCS11 module, not required by +anchor by default. The resulting certificate is stored locally if the `output_path` is set to any string. This does not depend on the configured backend. @@ -279,6 +281,7 @@ following options: Sample configuration for the default backend: "ca": { + "backend": "anchor" "cert_path": "CA/root-ca.crt", "key_path": "CA/root-ca-unwrapped.key", "output_path": "certs", @@ -286,7 +289,8 @@ Sample configuration for the default backend: "valid_hours": 24 } -For more information, please refer to the documentation. +Other backends may be created too. For more information, please refer to the +documentation. Fixups ====== diff --git a/anchor/signers/pkcs11.py b/anchor/signers/pkcs11.py new file mode 100644 index 0000000..a971f60 --- /dev/null +++ b/anchor/signers/pkcs11.py @@ -0,0 +1,120 @@ +from cryptography.hazmat import backends as cio_backends +from cryptography.hazmat.primitives import hashes +from pyasn1.codec.der import encoder +from pyasn1.type import univ as asn1_univ +from pyasn1_modules import rfc2315 + +from anchor import errors +from anchor import signers +from anchor import util + + +def import_pkcs(): + # separate function for mocking the import failure + return __import__("PyKCS11") + + +def conf_validator(name, ca_conf): + # mandatory CA settings + ca_config_requirements = ["cert_path", "output_path", "signing_hash", + "valid_hours", "slot", "pin", "key_id", + "pkcs11_path"] + + for requirement in ca_config_requirements: + if requirement not in ca_conf.keys(): + raise errors.ConfigValidationException( + "CA config missing: %s (for signing CA %s)" % (requirement, + name)) + + # all are specified, check the CA certificate and key are readable with + # sane permissions + util.check_file_exists(ca_conf['cert_path']) + util.check_file_exists(ca_conf['pkcs11_path']) + + # PyKCS11 is an optional dependency + try: + PyKCS11 = import_pkcs() + except ImportError: + raise errors.ConfigValidationException( + "PyKCS11 library cannot be imported") + + # library at the selected path should be possible to load + try: + pkcslib = PyKCS11.PyKCS11Lib() + pkcslib.load(ca_conf['pkcs11_path']) + except PyKCS11.PyKCS11Error: + raise errors.ConfigValidationException( + "Selected pkcs11 library failed to load") + + slot = ca_conf['slot'] + slots = pkcslib.getSlotList() + if slot not in slots: + raise errors.ConfigValidationException( + "Slot %s cannot be found in the pkcs11 store" % slot) + + try: + session = pkcslib.openSession(slot) + session.login(ca_conf['pin']) + except PyKCS11.PyKCS11Error: + raise errors.ConfigValidationException( + "Cannot login to the selected slot") + + +def make_signer(key_id, slot, pin, pkcs11_path, md): + HASH_OIDS = { + 'SHA256': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1'), + 'SHA384': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.2'), + 'SHA512': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.3'), + 'SHA224': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.4'), + } + + PyKCS11 = import_pkcs() + try: + pkcslib = PyKCS11.PyKCS11Lib() + pkcslib.load(pkcs11_path) + session = pkcslib.openSession(slot) + session.login(pin) + except PyKCS11.PyKCS11Error: + raise signers.SigningError("Could not setup the pkcs11 session") + + keys = session.findObjects(( + (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), + (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_RSA), + (PyKCS11.CKA_SIGN, True), + (PyKCS11.CKA_ID, key_id), + )) + if not keys: + raise signers.SigningError("Cannot find the requested key") + key = keys[0] + cio_hash = getattr(hashes, md, None) + if not cio_hash: + raise signers.SigningError("Requested hash is not supported") + + h = hashes.Hash(cio_hash(), cio_backends.default_backend()) + + def pkcs11_signer(to_sign): + pkcslib.getInfo # just to keep pkcslib in scope, it's a NOOP + h.update(to_sign) + di = rfc2315.DigestInfo() + di['digestAlgorithm'] = None + di['digestAlgorithm'][0] = HASH_OIDS[md] + di['digest'] = h.finalize() + signature = bytes(session.sign(key, encoder.encode(di), + PyKCS11.MechanismRSAPKCS1)) + session.logout() + return signature + + return pkcs11_signer + + +@signers.config_validator(conf_validator) +def sign(csr, ca_conf): + slot = ca_conf['slot'] + pin = ca_conf['pin'] + pkcs11_path = ca_conf['pkcs11_path'] + key_id = [int(ca_conf['key_id'][i:i+2], 16) for + i in range(0, len(ca_conf['key_id']), 2)] + signing_hash = ca_conf['signing_hash'].upper() + + signer = make_signer(key_id, slot, pin, pkcs11_path, signing_hash) + return signers.sign_generic(csr, ca_conf, 'RSA', signer) diff --git a/doc/source/signing_backends.rst b/doc/source/signing_backends.rst index 3cf130f..4d6b9c0 100644 --- a/doc/source/signing_backends.rst +++ b/doc/source/signing_backends.rst @@ -46,6 +46,50 @@ Valid options for this backend are: * ``valid_hours``: validity period for the issued certificates, defined in hours +pkcs11 +------ + +This backend uses a provided pkcs11 library for the signing operation. The final +certificate is created in the same way as with `anchor` backend with regards to +extensions and fixups. + +The interface doesn't rely on any special functionality of the store. Only the +RSA private key needs to be available as a secret. The only used mechanism is +CKM_RSA_PKCS. That means any pkcs11 backend from gnome keyring to tpm and +external HSMs should work. + +This backend requires ``PyKCS11`` package to be installed. + +A sample configuration for the ``signing_ca`` block looks like this: + +.. code:: json + + { + "local": { + "backend": "pkcs11", + "cert_path": "CA/root-ca.crt", + "output_path": "certs", + "signing_hash": "sha256", + "valid_hours": 24, + "slot": 18, + "pin": "the_pin", + "key_id": "b22f6e84a7b29db389b57a24384b95cca0bb4bc0", + "pkcs11_path": "/usr/lib/.../pkcs11/...-pkcs11.so" + } + } + +Valid options for this backend are: + +* ``cert_path``: path to the signing CA certificate +* ``signing_hash``: hash to use when signing the issued certificate ("sha224, + "sha256", "sha384", "sha512" are valid options) +* ``valid_hours``: validity period for the issued certificates, defined in + hours +* ``slot``: slot number where the key can be found +* ``pin``: text version of the pin required to access the right slot +* ``key_id``: key id written as a hex string +* ``pkcs11_path``: path to the dynamic library compatible with pkcs11 interface + Backend development ------------------- diff --git a/setup.cfg b/setup.cfg index 17df612..a6d6765 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ source-dir = doc/source [entry_points] anchor.signing_backends = anchor = anchor.signers.cryptography_io:sign + pkcs11 = anchor.signers.pkcs11:sign anchor.validators = check_domains = anchor.validators.custom:check_domains diff --git a/tests/test_signing_backend.py b/tests/test_signing_backend.py index fa2edd5..678b335 100644 --- a/tests/test_signing_backend.py +++ b/tests/test_signing_backend.py @@ -17,10 +17,14 @@ import textwrap import unittest +import mock from pyasn1.type import univ as asn1_univ +from anchor import errors from anchor import signers from anchor.signers import cryptography_io +from anchor.signers import pkcs11 +from anchor import util from anchor.X509 import certificate from anchor.X509 import extension from anchor.X509 import signing_request @@ -95,3 +99,167 @@ class TestCryptographyBackend(tests.DefaultConfigMixin, def test_sign_bad_key(self): with self.assertRaises(signers.SigningError): cryptography_io.make_signer("BAD", "sha256", "RSA") + + +class TestPKCSBackend(unittest.TestCase): + def setUp(self): + self.good_conf = { + "cert_path": "tests/CA/root-ca.crt", + "output_path": "/somepath", + "signing_hash": "sha256", + "valid_hours": 24, + "slot": 5, + "pin": "somepin", + "key_id": "aabbccddeeff", + "pkcs11_path": "/somepath/library.so", + } + + def test_conf_checks_package(self): + with mock.patch.object(util, 'check_file_exists', return_value=True): + with mock.patch.object(pkcs11, 'import_pkcs', + side_effect=ImportError()): + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", self.good_conf) + + def test_conf_checks_fields(self): + for key in self.good_conf: + conf = self.good_conf.copy() + del conf[key] + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", conf) + + def test_conf_checks_file_permissions(self): + with mock.patch.object(util, 'check_file_exists', return_value=False): + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", self.good_conf) + + def test_conf_checks_library_loading(self): + class MockExc(Exception): + pass + + lib = mock.Mock() + lib.load.side_effect = MockExc() + mod = mock.Mock() + mod.PyKCS11Error = MockExc + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(util, 'check_file_exists', return_value=True): + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", self.good_conf) + + def test_conf_checks_valid_slot(self): + class MockExc(Exception): + pass + + lib = mock.Mock() + lib.getSlotList.return_value = [4, 6] + mod = mock.Mock() + mod.PyKCS11Error = MockExc + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(util, 'check_file_exists', return_value=True): + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", self.good_conf) + + def test_conf_checks_valid_pin(self): + class MockExc(Exception): + pass + + session = mock.Mock() + session.login.side_effect = MockExc() + lib = mock.Mock() + lib.getSlotList.return_value = [self.good_conf['slot']] + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Error = MockExc + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(util, 'check_file_exists', return_value=True): + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaises(errors.ConfigValidationException): + pkcs11.conf_validator("name", self.good_conf) + + def test_conf_allows_valid(self): + session = mock.Mock() + lib = mock.Mock() + lib.getSlotList.return_value = [self.good_conf['slot']] + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(util, 'check_file_exists', return_value=True): + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + pkcs11.conf_validator("name", self.good_conf) + + def test_make_signer_fails(self): + with mock.patch.object(pkcs11, 'make_signer', + side_effect=signers.SigningError): + with self.assertRaises(signers.SigningError): + pkcs11.sign(mock.Mock(), self.good_conf) + + def test_sign_login_fails(self): + class MockExc(Exception): + pass + + session = mock.Mock() + session.login.side_effect = MockExc() + lib = mock.Mock() + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Error = MockExc + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaisesRegexp(signers.SigningError, + "pkcs11 session"): + pkcs11.sign(mock.Mock(), self.good_conf) + + def test_sign_key_missing(self): + class MockExc(Exception): + pass + + session = mock.Mock() + session.findObjects.return_value = [] + lib = mock.Mock() + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaisesRegexp(signers.SigningError, + "requested key"): + pkcs11.sign(mock.Mock(), self.good_conf) + + def test_sign_bad_hash(self): + session = mock.Mock() + session.findObjects.return_value = [object()] + lib = mock.Mock() + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Lib.return_value = lib + self.good_conf['signing_hash'] = 'unknown' + + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + with self.assertRaisesRegexp(signers.SigningError, + "hash is not supported"): + pkcs11.sign(mock.Mock(), self.good_conf) + + def test_working_signer(self): + res = b"123" + + session = mock.Mock() + session.findObjects.return_value = [object()] + session.sign.return_value = res + lib = mock.Mock() + lib.openSession.return_value = session + mod = mock.Mock() + mod.PyKCS11Lib.return_value = lib + + with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod): + signer = pkcs11.make_signer((1, 2, 3), self.good_conf['slot'], + self.good_conf['pin'], + self.good_conf['pkcs11_path'], + self.good_conf['signing_hash'].upper()) + self.assertEqual(res, signer(b"data"))