Add the PKCS11-based signing backend

Change-Id: I576a6837f2239d4e82baaacc68342a897db07f1d
This commit is contained in:
Stanisław Pitucha 2016-02-09 20:43:36 +11:00
parent d0fa9b519d
commit 524a4cef34
5 changed files with 341 additions and 4 deletions

View File

@ -257,9 +257,11 @@ base.
Signing backends
================
Anchor allows the use of configurable signing backend. While it provides
one implementation (based on cryptography.io and OpenSSL), other
implementations may be configured.
Anchor allows the use of configurable signing backend. Currently it provides two
implementation: one based on cryptography.io ("anchor"), the other using PKCS#11
libraries ("pkcs11"). The first one is used in the sample config. Other backends
may have extra dependencies: pkcs11 requires the PyKCS11 module, not required by
anchor by default.
The resulting certificate is stored locally if the `output_path` is set
to any string. This does not depend on the configured backend.
@ -279,6 +281,7 @@ following options:
Sample configuration for the default backend:
"ca": {
"backend": "anchor"
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
"output_path": "certs",
@ -286,7 +289,8 @@ Sample configuration for the default backend:
"valid_hours": 24
}
For more information, please refer to the documentation.
Other backends may be created too. For more information, please refer to the
documentation.
Fixups
======

120
anchor/signers/pkcs11.py Normal file
View File

@ -0,0 +1,120 @@
from cryptography.hazmat import backends as cio_backends
from cryptography.hazmat.primitives import hashes
from pyasn1.codec.der import encoder
from pyasn1.type import univ as asn1_univ
from pyasn1_modules import rfc2315
from anchor import errors
from anchor import signers
from anchor import util
def import_pkcs():
# separate function for mocking the import failure
return __import__("PyKCS11")
def conf_validator(name, ca_conf):
# mandatory CA settings
ca_config_requirements = ["cert_path", "output_path", "signing_hash",
"valid_hours", "slot", "pin", "key_id",
"pkcs11_path"]
for requirement in ca_config_requirements:
if requirement not in ca_conf.keys():
raise errors.ConfigValidationException(
"CA config missing: %s (for signing CA %s)" % (requirement,
name))
# all are specified, check the CA certificate and key are readable with
# sane permissions
util.check_file_exists(ca_conf['cert_path'])
util.check_file_exists(ca_conf['pkcs11_path'])
# PyKCS11 is an optional dependency
try:
PyKCS11 = import_pkcs()
except ImportError:
raise errors.ConfigValidationException(
"PyKCS11 library cannot be imported")
# library at the selected path should be possible to load
try:
pkcslib = PyKCS11.PyKCS11Lib()
pkcslib.load(ca_conf['pkcs11_path'])
except PyKCS11.PyKCS11Error:
raise errors.ConfigValidationException(
"Selected pkcs11 library failed to load")
slot = ca_conf['slot']
slots = pkcslib.getSlotList()
if slot not in slots:
raise errors.ConfigValidationException(
"Slot %s cannot be found in the pkcs11 store" % slot)
try:
session = pkcslib.openSession(slot)
session.login(ca_conf['pin'])
except PyKCS11.PyKCS11Error:
raise errors.ConfigValidationException(
"Cannot login to the selected slot")
def make_signer(key_id, slot, pin, pkcs11_path, md):
HASH_OIDS = {
'SHA256': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1'),
'SHA384': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.2'),
'SHA512': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.3'),
'SHA224': asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.2.4'),
}
PyKCS11 = import_pkcs()
try:
pkcslib = PyKCS11.PyKCS11Lib()
pkcslib.load(pkcs11_path)
session = pkcslib.openSession(slot)
session.login(pin)
except PyKCS11.PyKCS11Error:
raise signers.SigningError("Could not setup the pkcs11 session")
keys = session.findObjects((
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_RSA),
(PyKCS11.CKA_SIGN, True),
(PyKCS11.CKA_ID, key_id),
))
if not keys:
raise signers.SigningError("Cannot find the requested key")
key = keys[0]
cio_hash = getattr(hashes, md, None)
if not cio_hash:
raise signers.SigningError("Requested hash is not supported")
h = hashes.Hash(cio_hash(), cio_backends.default_backend())
def pkcs11_signer(to_sign):
pkcslib.getInfo # just to keep pkcslib in scope, it's a NOOP
h.update(to_sign)
di = rfc2315.DigestInfo()
di['digestAlgorithm'] = None
di['digestAlgorithm'][0] = HASH_OIDS[md]
di['digest'] = h.finalize()
signature = bytes(session.sign(key, encoder.encode(di),
PyKCS11.MechanismRSAPKCS1))
session.logout()
return signature
return pkcs11_signer
@signers.config_validator(conf_validator)
def sign(csr, ca_conf):
slot = ca_conf['slot']
pin = ca_conf['pin']
pkcs11_path = ca_conf['pkcs11_path']
key_id = [int(ca_conf['key_id'][i:i+2], 16) for
i in range(0, len(ca_conf['key_id']), 2)]
signing_hash = ca_conf['signing_hash'].upper()
signer = make_signer(key_id, slot, pin, pkcs11_path, signing_hash)
return signers.sign_generic(csr, ca_conf, 'RSA', signer)

View File

@ -46,6 +46,50 @@ Valid options for this backend are:
* ``valid_hours``: validity period for the issued certificates, defined in
hours
pkcs11
------
This backend uses a provided pkcs11 library for the signing operation. The final
certificate is created in the same way as with `anchor` backend with regards to
extensions and fixups.
The interface doesn't rely on any special functionality of the store. Only the
RSA private key needs to be available as a secret. The only used mechanism is
CKM_RSA_PKCS. That means any pkcs11 backend from gnome keyring to tpm and
external HSMs should work.
This backend requires ``PyKCS11`` package to be installed.
A sample configuration for the ``signing_ca`` block looks like this:
.. code:: json
{
"local": {
"backend": "pkcs11",
"cert_path": "CA/root-ca.crt",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24,
"slot": 18,
"pin": "the_pin",
"key_id": "b22f6e84a7b29db389b57a24384b95cca0bb4bc0",
"pkcs11_path": "/usr/lib/.../pkcs11/...-pkcs11.so"
}
}
Valid options for this backend are:
* ``cert_path``: path to the signing CA certificate
* ``signing_hash``: hash to use when signing the issued certificate ("sha224,
"sha256", "sha384", "sha512" are valid options)
* ``valid_hours``: validity period for the issued certificates, defined in
hours
* ``slot``: slot number where the key can be found
* ``pin``: text version of the pin required to access the right slot
* ``key_id``: key id written as a hex string
* ``pkcs11_path``: path to the dynamic library compatible with pkcs11 interface
Backend development
-------------------

View File

@ -29,6 +29,7 @@ source-dir = doc/source
[entry_points]
anchor.signing_backends =
anchor = anchor.signers.cryptography_io:sign
pkcs11 = anchor.signers.pkcs11:sign
anchor.validators =
check_domains = anchor.validators.custom:check_domains

View File

@ -17,10 +17,14 @@
import textwrap
import unittest
import mock
from pyasn1.type import univ as asn1_univ
from anchor import errors
from anchor import signers
from anchor.signers import cryptography_io
from anchor.signers import pkcs11
from anchor import util
from anchor.X509 import certificate
from anchor.X509 import extension
from anchor.X509 import signing_request
@ -95,3 +99,167 @@ class TestCryptographyBackend(tests.DefaultConfigMixin,
def test_sign_bad_key(self):
with self.assertRaises(signers.SigningError):
cryptography_io.make_signer("BAD", "sha256", "RSA")
class TestPKCSBackend(unittest.TestCase):
def setUp(self):
self.good_conf = {
"cert_path": "tests/CA/root-ca.crt",
"output_path": "/somepath",
"signing_hash": "sha256",
"valid_hours": 24,
"slot": 5,
"pin": "somepin",
"key_id": "aabbccddeeff",
"pkcs11_path": "/somepath/library.so",
}
def test_conf_checks_package(self):
with mock.patch.object(util, 'check_file_exists', return_value=True):
with mock.patch.object(pkcs11, 'import_pkcs',
side_effect=ImportError()):
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", self.good_conf)
def test_conf_checks_fields(self):
for key in self.good_conf:
conf = self.good_conf.copy()
del conf[key]
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", conf)
def test_conf_checks_file_permissions(self):
with mock.patch.object(util, 'check_file_exists', return_value=False):
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", self.good_conf)
def test_conf_checks_library_loading(self):
class MockExc(Exception):
pass
lib = mock.Mock()
lib.load.side_effect = MockExc()
mod = mock.Mock()
mod.PyKCS11Error = MockExc
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(util, 'check_file_exists', return_value=True):
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", self.good_conf)
def test_conf_checks_valid_slot(self):
class MockExc(Exception):
pass
lib = mock.Mock()
lib.getSlotList.return_value = [4, 6]
mod = mock.Mock()
mod.PyKCS11Error = MockExc
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(util, 'check_file_exists', return_value=True):
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", self.good_conf)
def test_conf_checks_valid_pin(self):
class MockExc(Exception):
pass
session = mock.Mock()
session.login.side_effect = MockExc()
lib = mock.Mock()
lib.getSlotList.return_value = [self.good_conf['slot']]
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Error = MockExc
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(util, 'check_file_exists', return_value=True):
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaises(errors.ConfigValidationException):
pkcs11.conf_validator("name", self.good_conf)
def test_conf_allows_valid(self):
session = mock.Mock()
lib = mock.Mock()
lib.getSlotList.return_value = [self.good_conf['slot']]
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(util, 'check_file_exists', return_value=True):
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
pkcs11.conf_validator("name", self.good_conf)
def test_make_signer_fails(self):
with mock.patch.object(pkcs11, 'make_signer',
side_effect=signers.SigningError):
with self.assertRaises(signers.SigningError):
pkcs11.sign(mock.Mock(), self.good_conf)
def test_sign_login_fails(self):
class MockExc(Exception):
pass
session = mock.Mock()
session.login.side_effect = MockExc()
lib = mock.Mock()
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Error = MockExc
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaisesRegexp(signers.SigningError,
"pkcs11 session"):
pkcs11.sign(mock.Mock(), self.good_conf)
def test_sign_key_missing(self):
class MockExc(Exception):
pass
session = mock.Mock()
session.findObjects.return_value = []
lib = mock.Mock()
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaisesRegexp(signers.SigningError,
"requested key"):
pkcs11.sign(mock.Mock(), self.good_conf)
def test_sign_bad_hash(self):
session = mock.Mock()
session.findObjects.return_value = [object()]
lib = mock.Mock()
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Lib.return_value = lib
self.good_conf['signing_hash'] = 'unknown'
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
with self.assertRaisesRegexp(signers.SigningError,
"hash is not supported"):
pkcs11.sign(mock.Mock(), self.good_conf)
def test_working_signer(self):
res = b"123"
session = mock.Mock()
session.findObjects.return_value = [object()]
session.sign.return_value = res
lib = mock.Mock()
lib.openSession.return_value = session
mod = mock.Mock()
mod.PyKCS11Lib.return_value = lib
with mock.patch.object(pkcs11, 'import_pkcs', return_value=mod):
signer = pkcs11.make_signer((1, 2, 3), self.good_conf['slot'],
self.good_conf['pin'],
self.good_conf['pkcs11_path'],
self.good_conf['signing_hash'].upper())
self.assertEqual(res, signer(b"data"))