diff --git a/cloudbaseinit/tests/utils/windows/test_x509.py b/cloudbaseinit/tests/utils/windows/test_x509.py index 736d9c57..fe759047 100644 --- a/cloudbaseinit/tests/utils/windows/test_x509.py +++ b/cloudbaseinit/tests/utils/windows/test_x509.py @@ -142,8 +142,9 @@ class CryptoAPICertManagerTests(unittest.TestCase): generate_key_ret_val=None) @mock.patch('cloudbaseinit.utils.windows.x509.free') - @mock.patch('copy.copy') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') + @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' + '._add_system_time_interval') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._generate_key') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' @@ -185,8 +186,9 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CRYPTOAPI_BLOB, mock_CertStrToName, mock_uuid4, mock_get_cert_thumprint, - mock_generate_key, mock_malloc, - mock_copy, mock_free, certstr, + mock_generate_key, + mock_add_system_time_interval, + mock_malloc, mock_free, certstr, certificate, enhanced_key, store_handle, context_to_store): @@ -215,7 +217,6 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CRYPT_ALGORITHM_IDENTIFIER.assert_called_once_with() mock_SYSTEMTIME.assert_called_once_with() mock_GetSystemTime.assert_called_once_with(mock_byref()) - mock_copy.assert_called_once_with(mock_SYSTEMTIME()) mock_CertCreateSelfSignCertificate.assert_called_once_with( None, mock_byref(), 0, mock_byref(), mock_byref(), mock_byref(), mock_byref(), None) @@ -228,6 +229,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): six.text_type(self.x509.STORE_NAME_MY)) mock_get_cert_thumprint.assert_called_once_with( mock_CertCreateSelfSignCertificate()) + mock_add_system_time_interval.assert_called_once_with( + mock_SYSTEMTIME.return_value, self.x509.X509_END_DATE_INTERVAL) mock_CertCloseStore.assert_called_once_with(store_handle, 0) mock_CertFreeCertificateContext.assert_called_once_with( @@ -238,6 +241,32 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_generate_key.assert_called_once_with('fake_name', True) + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'SYSTEMTIME') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'FILETIME') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'SystemTimeToFileTime') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'FileTimeToSystemTime') + def test_add_system_time_interval(self, mock_FileTimeToSystemTime, + mock_SystemTimeToFileTime, + mock_FILETIME, mock_SYSTEMTIME): + mock_system_time = mock.MagicMock() + fake_increment = 1 + mock_byref = self._ctypes.byref + + new_system_time = self._x509_manager._add_system_time_interval( + mock_system_time, fake_increment) + + mock_FILETIME.assert_called_once_with() + mock_SystemTimeToFileTime.assert_called_once_with(mock_byref(), + mock_byref()) + mock_SYSTEMTIME.assert_called_once_with() + mock_FileTimeToSystemTime.assert_called_once_with(mock_byref(), + mock_byref()) + self.assertEqual(mock_SYSTEMTIME.return_value, new_system_time) + def test_create_self_signed_cert(self): self._test_create_self_signed_cert(certstr='fake cert name', certificate='fake certificate', diff --git a/cloudbaseinit/utils/windows/cryptoapi.py b/cloudbaseinit/utils/windows/cryptoapi.py index 762cf91b..7a24b5bf 100644 --- a/cloudbaseinit/utils/windows/cryptoapi.py +++ b/cloudbaseinit/utils/windows/cryptoapi.py @@ -42,6 +42,13 @@ class SYSTEMTIME(ctypes.Structure): ] +class FILETIME(ctypes.Structure): + _fields_ = [ + ('dwLowDateTime', wintypes.DWORD), + ('dwHighDateTime', wintypes.DWORD), + ] + + class CERT_CONTEXT(ctypes.Structure): _fields_ = [ ('dwCertEncodingType', wintypes.DWORD), @@ -137,12 +144,22 @@ crypt32.CertStrToNameW.argtypes = [wintypes.DWORD, wintypes.LPCWSTR, ctypes.POINTER(wintypes.LPCWSTR)] CertStrToName = crypt32.CertStrToNameW -# TODO(alexpilotti): this is not a CryptoAPI funtion, putting it in a separate -# module would be more correct +# TODO(alexpilotti): the following time related functions are not CryptoAPI +# specific, putting them in a separate module would be more correct kernel32.GetSystemTime.restype = None kernel32.GetSystemTime.argtypes = [ctypes.POINTER(SYSTEMTIME)] GetSystemTime = kernel32.GetSystemTime +kernel32.SystemTimeToFileTime.restype = wintypes.BOOL +kernel32.SystemTimeToFileTime.argtypes = [ctypes.POINTER(SYSTEMTIME), + ctypes.POINTER(FILETIME)] +SystemTimeToFileTime = kernel32.SystemTimeToFileTime + +kernel32.FileTimeToSystemTime.restype = wintypes.BOOL +kernel32.FileTimeToSystemTime.argtypes = [ctypes.POINTER(FILETIME), + ctypes.POINTER(SYSTEMTIME)] +FileTimeToSystemTime = kernel32.FileTimeToSystemTime + # TODO(alexpilotti): this is not a CryptoAPI funtion, putting it in a separate # module would be more correct kernel32.GetLastError.restype = wintypes.DWORD diff --git a/cloudbaseinit/utils/windows/x509.py b/cloudbaseinit/utils/windows/x509.py index 07a000ae..a23f3eff 100644 --- a/cloudbaseinit/utils/windows/x509.py +++ b/cloudbaseinit/utils/windows/x509.py @@ -13,7 +13,6 @@ # under the License. -import copy import ctypes from ctypes import wintypes import uuid @@ -36,6 +35,8 @@ STORE_NAME_MY = "My" STORE_NAME_ROOT = "Root" STORE_NAME_TRUSTED_PEOPLE = "TrustedPeople" +X509_END_DATE_INTERVAL = 10 * 365 * 24 * 60 * 60 * 10000000 + class CryptoAPICertManager(object): def _get_cert_thumprint(self, cert_context_p): @@ -107,6 +108,26 @@ class CryptoAPICertManager(object): if crypt_prov_handle: cryptoapi.CryptReleaseContext(crypt_prov_handle, 0) + @staticmethod + def _add_system_time_interval(system_time, increment): + '''increment's unit: 10ns''' + file_time = cryptoapi.FILETIME() + if not cryptoapi.SystemTimeToFileTime(ctypes.byref(system_time), + ctypes.byref(file_time)): + raise cryptoapi.CryptoAPIException() + + t = file_time.dwLowDateTime + (file_time.dwHighDateTime << 32) + t += increment + + file_time.dwLowDateTime = t & 0xFFFFFFFF + file_time.dwHighDateTime = t >> 32 & 0xFFFFFFFF + + new_system_time = cryptoapi.SYSTEMTIME() + if not cryptoapi.FileTimeToSystemTime(ctypes.byref(file_time), + ctypes.byref(new_system_time)): + raise cryptoapi.CryptoAPIException() + return new_system_time + def create_self_signed_cert(self, subject, validity_years=10, machine_keyset=True, store_name=STORE_NAME_MY): subject_encoded = None @@ -162,8 +183,8 @@ class CryptoAPICertManager(object): start_time = cryptoapi.SYSTEMTIME() cryptoapi.GetSystemTime(ctypes.byref(start_time)) - end_time = copy.copy(start_time) - end_time.wYear += validity_years + end_time = self._add_system_time_interval( + start_time, X509_END_DATE_INTERVAL) cert_context_p = cryptoapi.CertCreateSelfSignCertificate( None, ctypes.byref(subject_blob), 0,