Making anchor use pyca/cryptography
- lots of changes required to move away from M2Crypto - line wrapping and other formatting changes included - lots of X509 related stuff added making use of pyca low level bindings to OpenSSL - This change requires pyca/cryptography 0.7 Change-Id: I1894ea991ff279eca85092173f6b65f0425d1dc5
This commit is contained in:
parent
ec69b9a51d
commit
8b35b34542
0
anchor/X509/__init__.py
Normal file
0
anchor/X509/__init__.py
Normal file
271
anchor/X509/certificate.py
Normal file
271
anchor/X509/certificate.py
Normal file
@ -0,0 +1,271 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
import message_digest
|
||||
import name
|
||||
import errors
|
||||
|
||||
|
||||
class X509CertificateError(errors.X509Error):
|
||||
"""Specific error for X509 certificate operations"""
|
||||
def __init__(self, what):
|
||||
super(X509CertificateError, self).__init__(what)
|
||||
|
||||
|
||||
class X509Extension(object):
|
||||
"""An X509 V3 Certificate extension"""
|
||||
def __init__(self, ext):
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
self._ext = ext
|
||||
|
||||
def __str__(self):
|
||||
return "%s %s" % (self.get_name(), self.get_value())
|
||||
|
||||
def get_name(self):
|
||||
"""Get the extension name as a python string"""
|
||||
ext_obj = self._lib.X509_EXTENSION_get_object(self._ext)
|
||||
ext_nid = self._lib.OBJ_obj2nid(ext_obj)
|
||||
ext_name_str = self._lib.OBJ_nid2sn(ext_nid)
|
||||
return self._ffi.string(ext_name_str)
|
||||
|
||||
def get_value(self):
|
||||
"""Get the extension value as a python string"""
|
||||
bio = self._lib.BIO_new(self._lib.BIO_s_mem())
|
||||
bio = self._ffi.gc(bio, self._lib.BIO_free)
|
||||
self._lib.X509V3_EXT_print(bio, self._ext, 0, 0)
|
||||
size = 1024
|
||||
data = self._ffi.new("char[]", size)
|
||||
self._lib.BIO_gets(bio, data, size)
|
||||
return self._ffi.string(data)
|
||||
|
||||
|
||||
class X509Certificate(object):
|
||||
"""X509 certificate class"""
|
||||
def __init__(self):
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
certObj = self._lib.X509_new()
|
||||
if certObj == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not create X509 certifiacte "
|
||||
"object")
|
||||
|
||||
self._certObj = certObj
|
||||
|
||||
def __del__(self):
|
||||
if getattr(self, '_certObj', None):
|
||||
self._lib.X509_free(self._certObj)
|
||||
|
||||
def _asn1_utctime(self, t):
|
||||
# asn1_utctime = self._lib.ASN1_UTCTIME_new()
|
||||
asn1_utctime = self._lib.ASN1_UTCTIME_set(self._ffi.NULL, t)
|
||||
if asn1_utctime == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not create ASN1_UTCTIME object")
|
||||
|
||||
return asn1_utctime
|
||||
|
||||
def from_buffer(self, data):
|
||||
"""Build this X509 object from a data buffer in memory
|
||||
:param data: A data buffer
|
||||
"""
|
||||
bio = backend._bytes_to_bio(data.encode('ascii'))
|
||||
|
||||
# NOTE(tkelsey): some versions of OpenSSL dont re-use the cert object
|
||||
# properly, so free it and use the new one
|
||||
#
|
||||
certObj = self._lib.PEM_read_bio_X509(bio[0],
|
||||
self._ffi.NULL,
|
||||
self._ffi.NULL,
|
||||
self._ffi.NULL)
|
||||
if certObj == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not read X509 certificate from "
|
||||
"PEM data.")
|
||||
|
||||
self._lib.X509_free(self._certObj)
|
||||
self._certObj = certObj
|
||||
|
||||
def from_file(self, path):
|
||||
"""Build this X509 certificate object from a data file on disk
|
||||
:param path: A data buffer
|
||||
"""
|
||||
data = None
|
||||
with open(path, 'rb') as f:
|
||||
data = f.read()
|
||||
self.from_buffer(data)
|
||||
|
||||
def save(self, path):
|
||||
"""Save this X509 certificate object to a file on disk
|
||||
:param path: Output file path
|
||||
"""
|
||||
bio = self._lib.BIO_new_file(path, "w")
|
||||
ret = self._lib.PEM_write_bio_X509(bio, self._certObj)
|
||||
self._lib.BIO_free(bio)
|
||||
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not write X509 certificate to "
|
||||
"disk as PEM data.")
|
||||
|
||||
def set_version(self, v):
|
||||
"""Set the version of this X509 certificate object
|
||||
:param v: The version
|
||||
"""
|
||||
ret = self._lib.X509_set_version(self._certObj, v)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"version.")
|
||||
|
||||
def set_not_before(self, t):
|
||||
"""Set the 'not before' date field.
|
||||
:param t: a Python date-time object
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notBefore(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not before time.")
|
||||
|
||||
def set_not_after(self, t):
|
||||
"""Set the 'not after' date field.
|
||||
:param t: a Python date-time object
|
||||
"""
|
||||
ansi1_utc = self._asn1_utctime(t)
|
||||
ret = self._lib.X509_set_notAfter(self._certObj, ansi1_utc)
|
||||
self._lib.ASN1_UTCTIME_free(ansi1_utc)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"not after time.")
|
||||
|
||||
def set_pubkey(self, pkey):
|
||||
"""Set the public key field
|
||||
:param pkey: The public key, an EVP_PKEY ssl type
|
||||
"""
|
||||
ret = self._lib.X509_set_pubkey(self._certObj, pkey)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"pubkey.")
|
||||
|
||||
def get_subject(self):
|
||||
"""Get the subject name field value
|
||||
:return: An X509Name object instance
|
||||
"""
|
||||
val = self._lib.X509_get_subject_name(self._certObj)
|
||||
if val == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not get subject from X509 "
|
||||
"certificate.")
|
||||
|
||||
return name.X509Name(val)
|
||||
|
||||
def set_subject(self, subject):
|
||||
"""Set the subject name filed value
|
||||
:param subject: An X509Name object instance
|
||||
"""
|
||||
val = subject._name_obj
|
||||
ret = self._lib.X509_set_subject_name(self._certObj, val)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"subject.")
|
||||
|
||||
def set_issuer(self, issuer):
|
||||
"""Set the issuer name field value
|
||||
:param issuer: An X509Name object instance
|
||||
"""
|
||||
val = issuer._name_obj
|
||||
ret = self._lib.X509_set_issuer_name(self._certObj, val)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"issuer.")
|
||||
|
||||
def get_issuer(self):
|
||||
"""Get the issuer name field value
|
||||
:return: An X509Name object instance
|
||||
"""
|
||||
val = self._lib.X509_get_issuer_name(self._certObj)
|
||||
if val == self._ffi.NULL:
|
||||
raise X509CertificateError("Could not get subject from X509 "
|
||||
"certificate.")
|
||||
return name.X509Name(val)
|
||||
|
||||
def set_serial_number(self, serial):
|
||||
"""Set the serial number
|
||||
|
||||
The serial number is a 32 bit integer value that should be unique to
|
||||
each certificate issued by a given certificate authority.
|
||||
|
||||
:param serial: The serial number, 32 bit integer
|
||||
"""
|
||||
asn1_int = self._lib.ASN1_INTEGER_new()
|
||||
ret = self._lib.ASN1_INTEGER_set(asn1_int, serial)
|
||||
if ret != 0:
|
||||
ret = self._lib.X509_set_serialNumber(self._certObj, asn1_int)
|
||||
self._lib.ASN1_INTEGER_free(asn1_int)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not set X509 certificate "
|
||||
"serial number.")
|
||||
|
||||
def add_extension(self, ext, index):
|
||||
"""Add an X509 V3 Certificate extension
|
||||
:param ext: An X509Extension instance
|
||||
:param index: The index of the extension
|
||||
"""
|
||||
ret = self._lib.X509_add_ext(self._certObj, ext._ext, index)
|
||||
if ret == 0:
|
||||
raise X509CertificateError("Could not add X509 certificate "
|
||||
"extension.")
|
||||
|
||||
def sign(self, key, md='sha1'):
|
||||
"""Sign the X509 certificate with a key using a message digest algorithm
|
||||
:param key: The signing key, an EVP_PKEY OpenSSL object
|
||||
:param md: The name of a message digest algorithm to use, it must be
|
||||
valid and known to OpenSSL, possible values are
|
||||
- md5
|
||||
- sha1
|
||||
- sha256
|
||||
"""
|
||||
mda = getattr(self._lib, "EVP_%s" % md, None)
|
||||
if mda is None:
|
||||
msg = 'X509 signing error: Unknown algorithm {a}'.format(a=md)
|
||||
raise X509CertificateError(msg)
|
||||
ret = self._lib.X509_sign(self._certObj, key, mda())
|
||||
if ret == 0:
|
||||
raise X509CertificateError("X509 signing error: Could not sign "
|
||||
" certificate.")
|
||||
|
||||
def as_der(self):
|
||||
"""Return this X509 certificate as DER encoded data"""
|
||||
buf = None
|
||||
num = self._lib.i2d_X509(self._certObj, self._ffi.NULL)
|
||||
if num != 0:
|
||||
buf = self._ffi.new("unsigned char[]", num+1)
|
||||
buf_ptr = self._ffi.new("unsigned char**")
|
||||
buf_ptr[0] = buf
|
||||
num = self._lib.i2d_X509(self._certObj, buf_ptr)
|
||||
|
||||
if num == 0:
|
||||
raise X509CertificateError("Could not encode X509 certificate "
|
||||
"as DER.")
|
||||
return buf
|
||||
|
||||
def get_fingerprint(self, md='md5'):
|
||||
"""Get the fingerprint of this X509 certifiacte
|
||||
:param md: The message digest algorthim used to compute the fingerprint
|
||||
:return: The fingerprint encoded as a hex string
|
||||
"""
|
||||
der = self.as_der()
|
||||
md = message_digest.MessageDigest(md)
|
||||
md.update(der)
|
||||
digest = md.final()
|
||||
digest = hex(md._octx_to_num(digest))[2:-1].upper()
|
||||
return digest
|
18
anchor/X509/errors.py
Normal file
18
anchor/X509/errors.py
Normal file
@ -0,0 +1,18 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class X509Error(Exception):
|
||||
"""Base exception for X509 errors"""
|
||||
def __init__(self, what):
|
||||
super(X509Error, self).__init__(what)
|
66
anchor/X509/message_digest.py
Normal file
66
anchor/X509/message_digest.py
Normal file
@ -0,0 +1,66 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
|
||||
class MessageDigestError(Exception):
|
||||
def __init__(self, what):
|
||||
super(MessageDigestError, self).__init__(what)
|
||||
|
||||
|
||||
class MessageDigest(object):
|
||||
def __init__(self, algo):
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
md = getattr(self._lib, "EVP_%s" % algo, None)
|
||||
if md is None:
|
||||
msg = 'MessageDigest error: unknown algorithm {a}'.format(a=algo)
|
||||
raise MessageDigestError(msg)
|
||||
|
||||
ret = 0
|
||||
ctx = self._lib.EVP_MD_CTX_create()
|
||||
if ctx != self._ffi.NULL:
|
||||
self.ctx = ctx
|
||||
self.mda = md()
|
||||
ret = self._lib.EVP_DigestInit_ex(self.ctx,
|
||||
self.mda,
|
||||
self._ffi.NULL)
|
||||
|
||||
if ret == 0:
|
||||
raise MessageDigestError("Could not setup message digest context.")
|
||||
|
||||
def __del__(self):
|
||||
if getattr(self, 'ctx', None):
|
||||
self._lib.EVP_MD_CTX_cleanup(self.ctx)
|
||||
self._lib.EVP_MD_CTX_destroy(self.ctx)
|
||||
|
||||
def _octx_to_num(self, x):
|
||||
v = 0L
|
||||
lx = len(x)
|
||||
for i in range(lx):
|
||||
v = v + ord(x[i]) * (256L ** (lx-i-1))
|
||||
return v
|
||||
|
||||
def update(self, data):
|
||||
ret = self._lib.EVP_DigestUpdate(self.ctx, data, len(data))
|
||||
if ret == 0:
|
||||
raise MessageDigestError("Failed to update message digest data.")
|
||||
|
||||
def final(self):
|
||||
sz = self._lib.EVP_MD_size(self.mda)
|
||||
data = self._ffi.new("char[]", sz)
|
||||
ret = self._lib.EVP_DigestFinal_ex(self.ctx, data, self._ffi.NULL)
|
||||
if ret == 0:
|
||||
raise MessageDigestError("Failed to get message digest.")
|
||||
return self._ffi.string(data)
|
140
anchor/X509/name.py
Normal file
140
anchor/X509/name.py
Normal file
@ -0,0 +1,140 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
import errors
|
||||
|
||||
|
||||
class X509Name(object):
|
||||
"""An X509 Name object"""
|
||||
|
||||
# NOTE(tkelsey): this is not exhaustive
|
||||
nid = {'C': backend._lib.NID_countryName,
|
||||
'SP': backend._lib.NID_stateOrProvinceName,
|
||||
'ST': backend._lib.NID_stateOrProvinceName,
|
||||
'stateOrProvinceName': backend._lib.NID_stateOrProvinceName,
|
||||
'L': backend._lib.NID_localityName,
|
||||
'localityName': backend._lib.NID_localityName,
|
||||
'O': backend._lib.NID_organizationName,
|
||||
'organizationName': backend._lib.NID_organizationName,
|
||||
'OU': backend._lib.NID_organizationalUnitName,
|
||||
'organizationUnitName': backend._lib.NID_organizationalUnitName,
|
||||
'CN': backend._lib.NID_commonName,
|
||||
'commonName': backend._lib.NID_commonName,
|
||||
'Email': backend._lib.NID_pkcs9_emailAddress,
|
||||
'emailAddress': backend._lib.NID_pkcs9_emailAddress,
|
||||
'serialNumber': backend._lib.NID_serialNumber,
|
||||
'SN': backend._lib.NID_surname,
|
||||
'surname': backend._lib.NID_surname,
|
||||
'GN': backend._lib.NID_givenName,
|
||||
'givenName': backend._lib.NID_givenName
|
||||
}
|
||||
|
||||
class Entry():
|
||||
"""An X509 Name sub-entry object"""
|
||||
def __init__(self, obj):
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
self._entry = obj
|
||||
|
||||
def __str__(self):
|
||||
return "%s %s" % (self.get_name(), self.get_value())
|
||||
|
||||
def __cmp__(self, other):
|
||||
data = str(other)
|
||||
asn1_str_1 = self._lib.ASN1_STRING_new()
|
||||
asn1_str_1 = self._ffi.gc(asn1_str_1, self._lib.ASN1_STRING_free)
|
||||
ret = self._lib.ASN1_STRING_set(asn1_str_1, data, len(data))
|
||||
if ret != 0:
|
||||
asn1_str_2 = self._lib.X509_NAME_ENTRY_get_string(self._entry)
|
||||
ret = self._lib.ASN1_STRING_cmp(asn1_str_1, asn1_str_2)
|
||||
return (ret == 1)
|
||||
raise errors.X509Error("Could not setup ASN1 string data.")
|
||||
|
||||
def get_name(self):
|
||||
"""Get the name of this entry
|
||||
:return: entry name as a python string
|
||||
"""
|
||||
asn1_obj = self._lib.X509_NAME_ENTRY_get_object(self._entry)
|
||||
buf = self._ffi.new('char[]', 1024)
|
||||
ret = self._lib.OBJ_obj2txt(buf, 1024, asn1_obj, 0)
|
||||
if ret == 0:
|
||||
raise errors.X509Error("Could not convert ASN1_OBJECT to "
|
||||
"string.")
|
||||
return self._ffi.string(buf)
|
||||
|
||||
def get_value(self):
|
||||
"""Get the value of this entry
|
||||
:return: entry value as a python string
|
||||
"""
|
||||
val = self._lib.X509_NAME_ENTRY_get_data(self._entry)
|
||||
data = self._lib.ASN1_STRING_data(val)
|
||||
return self._ffi.string(data) # Encoding?
|
||||
|
||||
def __init__(self, name_obj):
|
||||
# NOTE(tkelsey): we create a copy of the name and own it
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
self._name_obj = self._lib.X509_NAME_dup(name_obj)
|
||||
if self._name_obj == self._ffi.NULL:
|
||||
raise errors.X509Error("Failed to copy X509_NAME object.")
|
||||
|
||||
def __del__(self):
|
||||
self._lib.X509_NAME_free(self._name_obj)
|
||||
|
||||
def __str__(self):
|
||||
# NOTE(tkelsey): we need to pass in a max size, so why not 1024
|
||||
val = self._lib.X509_NAME_oneline(self._name_obj, self._ffi.NULL, 1024)
|
||||
if val == self._ffi.NULL:
|
||||
raise errors.X509Error("Could not convert X509_NAME to string.")
|
||||
|
||||
val = self._ffi.gc(val, self._lib.OPENSSL_free)
|
||||
return self._ffi.string(val)
|
||||
|
||||
def __len__(self):
|
||||
return self._lib.X509_NAME_entry_count(self._name_obj)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
if not (0 <= idx < self.entry_count()):
|
||||
raise IndexError("index out of range")
|
||||
ent = self._lib.X509_NAME_get_entry(self._name_obj, idx)
|
||||
return X509Name.Entry(ent)
|
||||
|
||||
def __iter__(self):
|
||||
for i in xrange(self.entry_count()):
|
||||
yield self[i]
|
||||
|
||||
def entry_count(self):
|
||||
"""Get the number of entries in the name object"""
|
||||
return self._lib.X509_NAME_entry_count(self._name_obj)
|
||||
|
||||
def get_entries_by_nid_name(self, nid_name):
|
||||
"""Get a name entry corresponding to an NID name
|
||||
:param nid_name: an NID name, chosen from the X509Name.nid table
|
||||
:return: An X509Name.Entry object
|
||||
"""
|
||||
if nid_name not in X509Name.nid:
|
||||
raise errors.X509Error("Unknown NID name: %s" % nid_name)
|
||||
|
||||
out = []
|
||||
nid = X509Name.nid[nid_name]
|
||||
idx = self._lib.X509_NAME_get_index_by_NID(self._name_obj, nid, -1)
|
||||
while idx != -1:
|
||||
val = self._lib.X509_NAME_get_entry(self._name_obj, idx)
|
||||
if val != self._ffi.NULL:
|
||||
out.append(X509Name.Entry(val))
|
||||
|
||||
idx = self._lib.X509_NAME_get_index_by_NID(self._name_obj,
|
||||
nid, idx)
|
||||
return out
|
98
anchor/X509/signing_request.py
Normal file
98
anchor/X509/signing_request.py
Normal file
@ -0,0 +1,98 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
import errors
|
||||
import certificate
|
||||
import name
|
||||
|
||||
|
||||
class X509CsrError(errors.X509Error):
|
||||
def __init__(self, what):
|
||||
super(X509CsrError, self).__init__(what)
|
||||
|
||||
|
||||
class X509Csr(object):
|
||||
"""An X509 Certificate Signing Request"""
|
||||
def __init__(self):
|
||||
self._lib = backend._lib
|
||||
self._ffi = backend._ffi
|
||||
csrObj = self._lib.X509_REQ_new()
|
||||
if csrObj == self._ffi.NULL:
|
||||
raise X509CsrError("Could not create X509 CSR Object.")
|
||||
|
||||
self._csrObj = csrObj
|
||||
|
||||
def __del__(self):
|
||||
if getattr(self, '_csrObj', None):
|
||||
self._lib.X509_REQ_free(self._csrObj)
|
||||
|
||||
def from_buffer(self, data, password=None):
|
||||
"""Create this CSR from a buffer
|
||||
:param data: The data buffer
|
||||
:param password: decryption password, if needed
|
||||
"""
|
||||
bio = backend._bytes_to_bio(data.encode('ascii'))
|
||||
ptr = self._ffi.new("X509_REQ **")
|
||||
ptr[0] = self._csrObj
|
||||
ret = self._lib.PEM_read_bio_X509_REQ(bio[0], ptr,
|
||||
self._ffi.NULL,
|
||||
self._ffi.NULL)
|
||||
if ret == self._ffi.NULL:
|
||||
raise X509CsrError("Could not read X509 CSR from PEM data.")
|
||||
|
||||
def from_file(self, path, password=None):
|
||||
"""Create this CSR from a file on disk
|
||||
:param path: Path to the file on disk
|
||||
:param password: decryption password, if needed
|
||||
"""
|
||||
data = None
|
||||
with open(path, 'rb') as f:
|
||||
data = f.read()
|
||||
self.fromBuffer(data, password)
|
||||
|
||||
def get_pubkey(self):
|
||||
"""Get the public key from the CSR
|
||||
:return: an OpenSSL EVP_PKEY object
|
||||
"""
|
||||
pkey = self._lib.X509_REQ_get_pubkey(self._csrObj)
|
||||
if pkey == self._ffi.NULL:
|
||||
raise X509CsrError("Could not get pubkey from X509 CSR Object.")
|
||||
|
||||
return pkey
|
||||
|
||||
def get_subject(self):
|
||||
"""Get the subject name field from the CSR
|
||||
:return: an X509Name object
|
||||
"""
|
||||
subs = self._lib.X509_REQ_get_subject_name(self._csrObj)
|
||||
if subs == self._ffi.NULL:
|
||||
raise X509CsrError("Could not get subject from X509 CSR Object.")
|
||||
|
||||
return name.X509Name(subs)
|
||||
|
||||
def get_extensions(self):
|
||||
"""Get the list of all X509 V3 Extensions on this CSR
|
||||
:return: a list of X509Extension objects
|
||||
"""
|
||||
# TODO(tkelsey): I assume the ext list copies data and this is safe
|
||||
# TODO(tkelsey): Error checking needed here
|
||||
ret = []
|
||||
exts = self._lib.X509_REQ_get_extensions(self._csrObj)
|
||||
num = self._lib.sk_X509_EXTENSION_num(exts)
|
||||
for i in range(0, num):
|
||||
ext = self._lib.sk_X509_EXTENSION_value(exts, i)
|
||||
ret.append(certificate.X509Extension(ext))
|
||||
self._lib.sk_X509_EXTENSION_free(exts)
|
||||
return ret
|
37
anchor/X509/utils.py
Normal file
37
anchor/X509/utils.py
Normal file
@ -0,0 +1,37 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
|
||||
def load_pem_private_key(key_data, passwd=None):
|
||||
"""Load and return an OpenSSL EVP_PKEY public key object from a data buffer
|
||||
:param key_data: The data buffer
|
||||
:param passwd: Decryption password if neded (not used for now)
|
||||
:return: an OpenSSL EVP_PKEY public key object
|
||||
"""
|
||||
# TODO(tkelsey): look at using backend.read_private_key
|
||||
#
|
||||
|
||||
lib = backend._lib
|
||||
ffi = backend._ffi
|
||||
data = backend._bytes_to_bio(key_data)
|
||||
|
||||
evp_pkey = lib.EVP_PKEY_new()
|
||||
evp_pkey_ptr = ffi.new("EVP_PKEY**")
|
||||
evp_pkey_ptr[0] = evp_pkey
|
||||
evp_pkey = lib.PEM_read_bio_PrivateKey(data[0], evp_pkey_ptr,
|
||||
ffi.NULL, ffi.NULL)
|
||||
|
||||
evp_pkey = ffi.gc(evp_pkey, lib.EVP_PKEY_free)
|
||||
return evp_pkey
|
@ -24,7 +24,8 @@ if conf.auth.get('keystone'):
|
||||
|
||||
def validate(user, secret):
|
||||
if conf.auth.get('static'):
|
||||
if secret == conf.auth['static']['secret'] and user == conf.auth['static']['user']:
|
||||
if secret == conf.auth['static']['secret'] and \
|
||||
user == conf.auth['static']['user']:
|
||||
return AuthDetails(username=conf.auth['static']['user'], groups=[])
|
||||
|
||||
if conf.auth.get('ldap'):
|
||||
|
@ -11,9 +11,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import M2Crypto
|
||||
from X509 import certificate
|
||||
from X509 import signing_request
|
||||
from X509 import utils as X509_utils
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from pecan import conf
|
||||
@ -29,28 +33,30 @@ def parse_csr(csr, encoding):
|
||||
" null CSR.")
|
||||
return None
|
||||
|
||||
return M2Crypto.X509.load_request_string(csr.encode('ascii'))
|
||||
except Exception:
|
||||
logger.exception("Exception while parsing the CSR")
|
||||
out_req = signing_request.X509Csr()
|
||||
out_req.from_buffer(csr)
|
||||
return out_req
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Exception while parsing the CSR: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def validate_csr(auth_result, csr, request):
|
||||
args = {'auth_result': auth_result, 'csr': csr, 'conf': conf, 'request': request}
|
||||
#Check that M2crypto supports get_extensions()
|
||||
try:
|
||||
csr.get_extensions()
|
||||
except AttributeError:
|
||||
raise validators.ValidationError("Incorrect M2Crypto library version,"
|
||||
" cannot perform csr.get_extensions")
|
||||
args = {'auth_result': auth_result,
|
||||
'csr': csr,
|
||||
'conf': conf,
|
||||
'request': request}
|
||||
|
||||
for validator_steps in conf.validators:
|
||||
logger.debug("Checking validators set <%s>", validator_steps.get("name"))
|
||||
logger.debug("Checking validators set <%s>",
|
||||
validator_steps.get("name"))
|
||||
valid = True
|
||||
|
||||
for validator in validator_steps['steps']:
|
||||
if not isinstance(validator, tuple):
|
||||
logger.error("Validator should be defined by a tuple (got '%s' instead)", validator)
|
||||
logger.error("Validator should be defined by a tuple"
|
||||
" (got '%s' instead)", validator)
|
||||
break
|
||||
elif len(validator) == 1:
|
||||
validator_name, params = validator[0], {}
|
||||
@ -83,50 +89,57 @@ def validate_csr(auth_result, csr, request):
|
||||
|
||||
|
||||
def sign(csr):
|
||||
|
||||
try:
|
||||
ca = M2Crypto.X509.load_cert(conf.ca["cert_path"])
|
||||
except (IOError, M2Crypto.BIO.BIOError):
|
||||
logger.exception("Cannot load the signing CA")
|
||||
return None
|
||||
except M2Crypto.X509.X509Error:
|
||||
logger.exception("Signing CA file is not a valid certificate")
|
||||
ca = certificate.X509Certificate()
|
||||
ca.from_file(conf.ca["cert_path"])
|
||||
except Exception as e:
|
||||
logger.exception("Cannot load the signing CA: %s", e)
|
||||
return None
|
||||
|
||||
try:
|
||||
key = M2Crypto.EVP.load_key(conf.ca["key_path"])
|
||||
except (IOError, M2Crypto.BIO.BIOError):
|
||||
logger.exception("Cannot load the signing CA key")
|
||||
return None
|
||||
except M2Crypto.EVP.EVPError:
|
||||
logger.exception("Signing CA key file is not a valid key")
|
||||
key_data = None
|
||||
with open(conf.ca["key_path"]) as f:
|
||||
key_data = f.read()
|
||||
key = X509_utils.load_pem_private_key(key_data)
|
||||
except Exception as e:
|
||||
logger.exception("Cannot load the signing CA key: %s", e)
|
||||
return None
|
||||
|
||||
new_cert = M2Crypto.X509.X509()
|
||||
new_cert = certificate.X509Certificate()
|
||||
new_cert.set_version(0)
|
||||
|
||||
now = int(time.time())
|
||||
start_time = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
start_time.set_time(now)
|
||||
end_time = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
end_time.set_time(now+(conf.ca['valid_hours']*60*60))
|
||||
|
||||
start_time = int(time.time())
|
||||
end_time = start_time+(conf.ca['valid_hours']*60*60)
|
||||
new_cert.set_not_before(start_time)
|
||||
new_cert.set_not_after(end_time)
|
||||
|
||||
new_cert.set_pubkey(pkey=csr.get_pubkey())
|
||||
new_cert.set_subject(csr.get_subject())
|
||||
new_cert.set_issuer(ca.get_subject())
|
||||
serial = uuid.uuid4().get_hex()
|
||||
new_cert.set_serial_number(int(serial, 16))
|
||||
|
||||
for ext in (csr.get_extensions() or []):
|
||||
new_cert.add_ext(ext)
|
||||
# NOTE(tkelsey): this needs to be in the range of an int
|
||||
serial = int(int(uuid.uuid4().get_hex(), 16) % sys.maxsize)
|
||||
new_cert.set_serial_number(serial)
|
||||
|
||||
exts = csr.get_extensions()
|
||||
for i, ext in enumerate(exts):
|
||||
logger.info("Adding certificate extension: %i %s", i, str(ext))
|
||||
new_cert.add_extension(ext, i)
|
||||
|
||||
logger.info("Signing certificate for <%s> with serial <%s>",
|
||||
csr.get_subject(), serial)
|
||||
|
||||
logger.info("Signing certificate for <%s> with serial <%s>", csr.get_subject(), serial)
|
||||
new_cert.sign(key, conf.ca['signing_hash'])
|
||||
|
||||
new_cert.save(os.path.join(
|
||||
path = os.path.join(
|
||||
conf.ca['output_path'],
|
||||
'%s.crt' % new_cert.get_fingerprint(conf.ca['signing_hash'])))
|
||||
'%s.crt' % new_cert.get_fingerprint(conf.ca['signing_hash']))
|
||||
|
||||
return new_cert.as_pem()
|
||||
logger.info("Saving certificate to: %s", path)
|
||||
new_cert.save(path)
|
||||
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
|
||||
# return new_cert.as_pem()
|
||||
|
@ -11,7 +11,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import M2Crypto
|
||||
import netaddr
|
||||
import logging
|
||||
|
||||
@ -23,7 +22,9 @@ class ValidationError(Exception):
|
||||
|
||||
|
||||
def csr_get_cn(csr):
|
||||
return str(csr.get_subject().get_entries_by_nid(M2Crypto.X509.X509_Name.nid['CN'])[0].get_data())
|
||||
name = csr.get_subject()
|
||||
data = name.get_entries_by_nid_name('CN')
|
||||
return data[0].get_value()
|
||||
|
||||
|
||||
def check_domains(domain, allowed_domains):
|
||||
@ -53,8 +54,10 @@ def common_name(csr=None, allowed_domains=[], allowed_networks=[], **kwargs):
|
||||
or network ranges.
|
||||
"""
|
||||
|
||||
alt_present = any(ext.get_name() == "subjectAltName" for ext in (csr.get_extensions() or []))
|
||||
CNs = csr.get_subject().get_entries_by_nid(M2Crypto.X509.X509_Name.nid['CN'])
|
||||
alt_present = any(ext.get_name() == "subjectAltName"
|
||||
for ext in csr.get_extensions())
|
||||
|
||||
CNs = csr.get_subject().get_entries_by_nid_name('CN')
|
||||
|
||||
if alt_present:
|
||||
if len(CNs) > 1:
|
||||
@ -62,15 +65,19 @@ def common_name(csr=None, allowed_domains=[], allowed_networks=[], **kwargs):
|
||||
else:
|
||||
# rfc5280#section-4.2.1.6 says so
|
||||
if len(csr.get_subject()) == 0:
|
||||
raise ValidationError("Alt subjects have to exist if the main subject doesn't")
|
||||
raise ValidationError("Alt subjects have to exist if the main"
|
||||
" subject doesn't")
|
||||
|
||||
if len(CNs) > 0:
|
||||
cn = csr_get_cn(csr)
|
||||
if not (check_domains(cn, allowed_domains) or check_networks(cn, allowed_networks)):
|
||||
raise ValidationError("Domain '%s' not allowed (doesn't match known domains or networks)" % cn)
|
||||
if not (check_domains(cn, allowed_domains) or
|
||||
check_networks(cn, allowed_networks)):
|
||||
raise ValidationError("Domain '%s' not allowed (doesn't match"
|
||||
" known domains or networks)" % cn)
|
||||
|
||||
|
||||
def alternative_names(csr=None, allowed_domains=[], allowed_networks=[], **kwargs):
|
||||
def alternative_names(csr=None, allowed_domains=[], allowed_networks=[],
|
||||
**kwargs):
|
||||
"""
|
||||
Refuse requests for certificates if the domain does not match
|
||||
the list of known suffixes, or network ranges.
|
||||
@ -82,9 +89,13 @@ def alternative_names(csr=None, allowed_domains=[], allowed_networks=[], **kwarg
|
||||
for alternative in alternatives:
|
||||
parts = alternative.split(':', 1)
|
||||
if len(parts) != 2 or parts[0] != 'DNS':
|
||||
raise ValidationError("Alt name '%s' does not have a known type")
|
||||
if not (check_domains(parts[1], allowed_domains) or check_networks(parts[1], allowed_networks)):
|
||||
raise ValidationError("Domain '%s' not allowed (doesn't match known domains or networks)" % parts[1])
|
||||
raise ValidationError("Alt name '%s' does not have a "
|
||||
"known type")
|
||||
if not (check_domains(parts[1], allowed_domains) or
|
||||
check_networks(parts[1], allowed_networks)):
|
||||
raise ValidationError("Domain '%s' not allowed (doesn't"
|
||||
" match known domains or networks)"
|
||||
% parts[1])
|
||||
|
||||
|
||||
def server_group(auth_result=None, csr=None, group_prefixes={}, **kwargs):
|
||||
@ -110,7 +121,8 @@ def extensions(csr=None, allowed_extensions=[], **kwargs):
|
||||
exts = csr.get_extensions() or []
|
||||
for ext in exts:
|
||||
if ext.get_name() not in allowed_extensions:
|
||||
raise ValidationError("Extension '%s' not allowed" % ext.get_name())
|
||||
raise ValidationError("Extension '%s' not allowed"
|
||||
% ext.get_name())
|
||||
|
||||
|
||||
def key_usage(csr=None, allowed_usage=None, **kwargs):
|
||||
@ -123,7 +135,8 @@ def key_usage(csr=None, allowed_usage=None, **kwargs):
|
||||
if ext.get_name() == 'keyUsage':
|
||||
usages = set(usage.strip() for usage in ext.get_value().split(','))
|
||||
if usages & allowed != usages:
|
||||
raise ValidationError("Found some not allowed key usages: %s" % (usages - allowed))
|
||||
raise ValidationError("Found some not allowed key usages: %s"
|
||||
% (usages - allowed))
|
||||
|
||||
|
||||
def ca_status(csr=None, ca_requested=False, **kwargs):
|
||||
@ -142,7 +155,8 @@ def ca_status(csr=None, ca_requested=False, **kwargs):
|
||||
|
||||
if parts[0] == 'CA':
|
||||
if parts[1] != str(ca_requested).upper():
|
||||
raise ValidationError("Invalid CA status, 'CA:%s' requested" % parts[1])
|
||||
raise ValidationError("Invalid CA status, 'CA:%s'"
|
||||
" requested" % parts[1])
|
||||
elif parts[0] == 'pathlen':
|
||||
# errr.. it's ok, I guess
|
||||
pass
|
||||
@ -153,7 +167,9 @@ def ca_status(csr=None, ca_requested=False, **kwargs):
|
||||
has_cert_sign = ('Certificate Sign' in usages)
|
||||
has_crl_sign = ('CRL Sign' in usages)
|
||||
if ca_requested != has_cert_sign or ca_requested != has_crl_sign:
|
||||
raise ValidationError("Key usage doesn't match requested CA status (keyCertSign/cRLSign: %s/%s)" % (has_cert_sign, has_crl_sign))
|
||||
raise ValidationError("Key usage doesn't match requested CA"
|
||||
" status (keyCertSign/cRLSign: %s/%s)"
|
||||
% (has_cert_sign, has_crl_sign))
|
||||
|
||||
|
||||
def source_cidrs(request=None, cidrs=None, **kwargs):
|
||||
@ -166,5 +182,7 @@ def source_cidrs(request=None, cidrs=None, **kwargs):
|
||||
if request.client_addr in r:
|
||||
return
|
||||
except netaddr.AddrFormatError:
|
||||
raise ValidationError("Cidr <%s> does not describe a valid network", cidr)
|
||||
raise ValidationError("No network matched the request source <%s>", request.client_addr)
|
||||
raise ValidationError("Cidr <%s> does not describe a valid"
|
||||
" network", cidr)
|
||||
raise ValidationError("No network matched the request source <%s>",
|
||||
request.client_addr)
|
||||
|
Loading…
x
Reference in New Issue
Block a user