Cosmin Poieana ae15fee086 Normalize all metadata providers and plugins
Every meta data service should return bytes only for these capabilities:
    * get_content
    * get_user_data
While `_get_meta_data` and any other method derrived from it
(including public keys, certificates etc.) should return homogeneous
data types and only strings, not bytes.
The decoding procedure is handled at its roots, not in the plugins
and is done by only using `encoding.get_as_string` function.

Fixed bugs:
    * invalid certificate splitting under maas service which usually
      generated an extra invalid certificate (empty string + footer)
    * text operations on bytes in maas and cloudstack (split, comparing)
    * multiple types for certificates (now only strings)
    * not receiving bytes from opennebula service when using `get_user_data`
      (which leads to crash under later processing through io.BytesIO)
    * erroneous certificate parsing/stripping/replacing under x509 importing
      (footer remains, not all possible EOLs replaced as it should)

Also added new and refined actual misleading unittests.

Change-Id: I704c43f5f784458a881293d761a21e62aed85732
2015-06-21 19:25:20 +03:00

407 lines
19 KiB
Python

# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import six
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit.utils import x509constants
class CryptoAPICertManagerTests(unittest.TestCase):
def setUp(self):
self._ctypes = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules', {'ctypes': self._ctypes})
self._module_patcher.start()
self.x509 = importlib.import_module("cloudbaseinit.utils.windows.x509")
self._x509_manager = self.x509.CryptoAPICertManager()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.utils.windows.x509.free')
@mock.patch('cloudbaseinit.utils.windows.x509.malloc')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertGetCertificateContextProperty')
def _test_get_cert_thumprint(self, mock_CertGetCertificateContextProperty,
mock_malloc, mock_free, ret_val):
mock_DWORD = self._ctypes.wintypes.DWORD
mock_CSIZET = self._ctypes.c_size_t
mock_cast = self._ctypes.cast
mock_POINTER = self._ctypes.POINTER
mock_byref = self._ctypes.byref
mock_pointer = mock.MagicMock()
fake_cert_context_p = 'fake context'
mock_DWORD.return_value.value = 10
mock_CSIZET.return_value.value = mock_DWORD.return_value.value
mock_CertGetCertificateContextProperty.return_value = ret_val
mock_POINTER.return_value = mock_pointer
mock_cast.return_value.contents = [16]
if not ret_val:
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager._get_cert_thumprint,
fake_cert_context_p)
else:
expected = [mock.call(fake_cert_context_p,
self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID,
None, mock_byref.return_value),
mock.call(fake_cert_context_p,
self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID,
mock_malloc.return_value,
mock_byref.return_value)]
response = self._x509_manager._get_cert_thumprint(
fake_cert_context_p)
self.assertEqual(
expected,
mock_CertGetCertificateContextProperty.call_args_list)
mock_malloc.assert_called_with(mock_CSIZET.return_value)
mock_cast.assert_called_with(mock_malloc(), mock_pointer)
mock_free.assert_called_with(mock_malloc())
self.assertEqual('10', response)
def test_get_cert_thumprint(self):
self._test_get_cert_thumprint(ret_val=True)
def test_get_cert_thumprint_GetCertificateContextProperty_exception(self):
self._test_get_cert_thumprint(ret_val=False)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptDestroyKey')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptReleaseContext')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptGenKey')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptAcquireContext')
def _test_generate_key(self, mock_CryptAcquireContext, mock_CryptGenKey,
mock_CryptReleaseContext, mock_CryptDestroyKey,
acquired_context, generate_key_ret_val):
mock_HANDLE = self._ctypes.wintypes.HANDLE
mock_byref = self._ctypes.byref
mock_CryptAcquireContext.return_value = acquired_context
mock_CryptGenKey.return_value = generate_key_ret_val
if not acquired_context:
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager._generate_key,
'fake container', True)
else:
if not generate_key_ret_val:
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager._generate_key,
'fake container', True)
else:
self._x509_manager._generate_key('fake container', True)
mock_CryptAcquireContext.assert_called_with(
mock_byref(), 'fake container', None,
self.x509.cryptoapi.PROV_RSA_FULL,
self.x509.cryptoapi.CRYPT_MACHINE_KEYSET)
mock_CryptGenKey.assert_called_with(
mock_HANDLE(), self.x509.cryptoapi.AT_SIGNATURE,
0x08000000, mock_HANDLE())
mock_CryptDestroyKey.assert_called_once_with(
mock_HANDLE())
mock_CryptReleaseContext.assert_called_once_with(
mock_HANDLE(), 0)
def test_generate_key(self):
self._test_generate_key(acquired_context=True,
generate_key_ret_val='fake key')
def test_generate_key_GetCertificateContextProperty_exception(self):
self._test_generate_key(acquired_context=False,
generate_key_ret_val='fake key')
def test_generate_key_CryptGenKey_exception(self):
self._test_generate_key(acquired_context=True,
generate_key_ret_val=None)
@mock.patch('cloudbaseinit.utils.windows.x509.free')
@mock.patch('copy.copy')
@mock.patch('cloudbaseinit.utils.windows.x509.malloc')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
'._generate_key')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
'._get_cert_thumprint')
@mock.patch('uuid.uuid4')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertStrToName')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CRYPTOAPI_BLOB')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CRYPT_KEY_PROV_INFO')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CRYPT_ALGORITHM_IDENTIFIER')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'SYSTEMTIME')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'GetSystemTime')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertCreateSelfSignCertificate')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertAddEnhancedKeyUsageIdentifier')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertOpenStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertAddCertificateContextToStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertCloseStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFreeCertificateContext')
def _test_create_self_signed_cert(self, mock_CertFreeCertificateContext,
mock_CertCloseStore,
mock_CertAddCertificateContextToStore,
mock_CertOpenStore,
mock_CertAddEnhancedKeyUsageIdentifier,
mock_CertCreateSelfSignCertificate,
mock_GetSystemTime, mock_SYSTEMTIME,
mock_CRYPT_ALGORITHM_IDENTIFIER,
mock_CRYPT_KEY_PROV_INFO,
mock_CRYPTOAPI_BLOB,
mock_CertStrToName,
mock_uuid4, mock_get_cert_thumprint,
mock_generate_key, mock_malloc,
mock_copy, mock_free, certstr,
certificate, enhanced_key, store_handle,
context_to_store):
mock_POINTER = self._ctypes.POINTER
mock_byref = self._ctypes.byref
mock_cast = self._ctypes.cast
mock_uuid4.return_value = 'fake_name'
mock_CertCreateSelfSignCertificate.return_value = certificate
mock_CertAddEnhancedKeyUsageIdentifier.return_value = enhanced_key
mock_CertStrToName.return_value = certstr
mock_CertOpenStore.return_value = store_handle
mock_CertAddCertificateContextToStore.return_value = context_to_store
if (certstr is None or certificate is None or enhanced_key is None
or store_handle is None or context_to_store is None):
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager.create_self_signed_cert,
'fake subject', 10, True,
self.x509.STORE_NAME_MY)
else:
response = self._x509_manager.create_self_signed_cert(
subject='fake subject')
mock_cast.assert_called_with(mock_malloc(), mock_POINTER())
mock_CRYPTOAPI_BLOB.assert_called_once_with()
mock_CRYPT_KEY_PROV_INFO.assert_called_once_with()
mock_CRYPT_ALGORITHM_IDENTIFIER.assert_called_once_with()
mock_SYSTEMTIME.assert_called_once_with()
mock_GetSystemTime.assert_called_once_with(mock_byref())
mock_copy.assert_called_once_with(mock_SYSTEMTIME())
mock_CertCreateSelfSignCertificate.assert_called_once_with(
None, mock_byref(), 0, mock_byref(),
mock_byref(), mock_byref(), mock_byref(), None)
mock_CertAddEnhancedKeyUsageIdentifier.assert_called_with(
mock_CertCreateSelfSignCertificate(),
self.x509.cryptoapi.szOID_PKIX_KP_SERVER_AUTH)
mock_CertOpenStore.assert_called_with(
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
six.text_type(self.x509.STORE_NAME_MY))
mock_get_cert_thumprint.assert_called_once_with(
mock_CertCreateSelfSignCertificate())
mock_CertCloseStore.assert_called_once_with(store_handle, 0)
mock_CertFreeCertificateContext.assert_called_once_with(
mock_CertCreateSelfSignCertificate())
mock_free.assert_called_once_with(mock_cast())
self.assertEqual(mock_get_cert_thumprint.return_value, response)
mock_generate_key.assert_called_once_with('fake_name', True)
def test_create_self_signed_cert(self):
self._test_create_self_signed_cert(certstr='fake cert name',
certificate='fake certificate',
enhanced_key='fake key',
store_handle='fake handle',
context_to_store='fake context')
def test_create_self_signed_cert_CertStrToName_fail(self):
self._test_create_self_signed_cert(certstr=None,
certificate='fake certificate',
enhanced_key='fake key',
store_handle='fake handle',
context_to_store='fake context')
def test_create_self_signed_cert_CertCreateSelfSignCertificate_fail(self):
self._test_create_self_signed_cert(certstr='fake cert name',
certificate=None,
enhanced_key='fake key',
store_handle='fake handle',
context_to_store='fake context')
def test_create_self_signed_cert_AddEnhancedKeyUsageIdentifier_fail(self):
self._test_create_self_signed_cert(certstr='fake cert name',
certificate='fake certificate',
enhanced_key=None,
store_handle='fake handle',
context_to_store='fake context')
def test_create_self_signed_cert_CertOpenStore_fail(self):
self._test_create_self_signed_cert(certstr='fake cert name',
certificate='fake certificate',
enhanced_key='fake key',
store_handle=None,
context_to_store='fake context')
def test_create_self_signed_cert_AddCertificateContextToStore_fail(self):
self._test_create_self_signed_cert(certstr='fake cert name',
certificate='fake certificate',
enhanced_key='fake key',
store_handle='fake handle',
context_to_store=None)
def test_get_cert_base64(self):
fake_cert_data = ''
fake_cert_data += x509constants.PEM_HEADER + '\n'
fake_cert_data += 'fake cert' + '\n'
fake_cert_data += x509constants.PEM_FOOTER
response = self._x509_manager._get_cert_base64(fake_cert_data)
self.assertEqual('fake cert', response)
@mock.patch('cloudbaseinit.utils.windows.x509.free')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
'._get_cert_thumprint')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertCloseStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFreeCertificateContext')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertGetNameString')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertAddEncodedCertificateToStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertOpenStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CryptStringToBinaryA')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
'._get_cert_base64')
@mock.patch('cloudbaseinit.utils.windows.x509.malloc')
def _test_import_cert(self, mock_malloc, mock_get_cert_base64,
mock_CryptStringToBinaryA, mock_CertOpenStore,
mock_CertAddEncodedCertificateToStore,
mock_CertGetNameString,
mock_CertFreeCertificateContext,
mock_CertCloseStore, mock_get_cert_thumprint,
mock_free, crypttstr, store_handle, add_enc_cert,
upn_len):
mock_POINTER = self._ctypes.POINTER
mock_cast = self._ctypes.cast
mock_byref = self._ctypes.byref
mock_DWORD = self._ctypes.wintypes.DWORD
mock_create_unicode_buffer = self._ctypes.create_unicode_buffer
fake_cert_data = ''
fake_cert_data += x509constants.PEM_HEADER + '\n'
fake_cert_data += 'fake cert' + '\n'
fake_cert_data += x509constants.PEM_FOOTER
mock_get_cert_base64.return_value = 'fake cert'
mock_CryptStringToBinaryA.return_value = crypttstr
mock_CertOpenStore.return_value = store_handle
mock_CertAddEncodedCertificateToStore.return_value = add_enc_cert
mock_CertGetNameString.side_effect = [2, upn_len]
expected = [mock.call('fake cert', len('fake cert'),
self.x509.cryptoapi.CRYPT_STRING_BASE64, None,
mock_byref(), None, None),
mock.call('fake cert', len('fake cert'),
self.x509.cryptoapi.CRYPT_STRING_BASE64,
mock_cast(), mock_byref(), None, None)]
expected2 = [mock.call(mock_POINTER()(),
self.x509.cryptoapi.CERT_NAME_UPN_TYPE,
0, None, None, 0),
mock.call(mock_POINTER()(),
self.x509.cryptoapi.CERT_NAME_UPN_TYPE,
0, None, mock_create_unicode_buffer(), 2)]
if (not crypttstr or store_handle is None or add_enc_cert is None or
upn_len != 2):
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager.import_cert, fake_cert_data,
True, self.x509.STORE_NAME_MY)
else:
response = self._x509_manager.import_cert(fake_cert_data)
mock_cast.assert_called_with(mock_malloc(), mock_POINTER())
self.assertEqual(expected,
mock_CryptStringToBinaryA.call_args_list)
mock_CertOpenStore.assert_called_with(
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
six.text_type(self.x509.STORE_NAME_MY))
mock_CertAddEncodedCertificateToStore.assert_called_with(
mock_CertOpenStore(),
self.x509.cryptoapi.X509_ASN_ENCODING |
self.x509.cryptoapi.PKCS_7_ASN_ENCODING,
mock_cast(), mock_DWORD(),
self.x509.cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING,
mock_byref())
mock_create_unicode_buffer.assert_called_with(2)
self.assertEqual(expected2, mock_CertGetNameString.call_args_list)
mock_get_cert_thumprint.assert_called_once_with(mock_POINTER()())
mock_CertFreeCertificateContext.assert_called_once_with(
mock_POINTER()())
mock_CertCloseStore.assert_called_once_with(
mock_CertOpenStore(), 0)
mock_free.assert_called_once_with(mock_cast())
self.assertEqual(
(mock_get_cert_thumprint(),
mock_create_unicode_buffer().value), response)
mock_get_cert_base64.assert_called_with(fake_cert_data)
def test_import_cert(self):
self._test_import_cert(crypttstr=True, store_handle='fake handle',
add_enc_cert='fake encoded cert', upn_len=2)
def test_import_cert_CryptStringToBinaryA_fail(self):
self._test_import_cert(crypttstr=False, store_handle='fake handle',
add_enc_cert='fake encoded cert', upn_len=2)
def test_import_cert_CertOpenStore_fail(self):
self._test_import_cert(crypttstr=False, store_handle=None,
add_enc_cert='fake encoded cert', upn_len=2)
def test_import_cert_CertAddEncodedCertificateToStore_fail(self):
self._test_import_cert(crypttstr=True, store_handle='fake handle',
add_enc_cert=None, upn_len=2)
def test_import_cert_CertGetNameString_fail(self):
self._test_import_cert(crypttstr=True, store_handle='fake handle',
add_enc_cert='fake encoded cert', upn_len=3)