Merge "Add ephemeral disk plugin"
This commit is contained in:
commit
33244dd4ff
@ -279,6 +279,18 @@ class GlobalOptions(conf_base.Options):
|
||||
'metadata_report_provisioning_completed', default=False,
|
||||
help='Reports to the metadata service that provisioning '
|
||||
'completed successfully or failed'),
|
||||
cfg.StrOpt(
|
||||
'ephemeral_disk_volume_label', default=None,
|
||||
help='Ephemeral disk volume label, e.g.: "Temporary Storage"'),
|
||||
cfg.StrOpt(
|
||||
'ephemeral_disk_volume_mount_point', default=None,
|
||||
help='Ephemeral disk volume mount point, e.g.:'
|
||||
'"\\\\?\\GLOBALROOT\\device\\Harddisk1\\Partition1\\"'),
|
||||
cfg.StrOpt(
|
||||
'ephemeral_disk_data_loss_warning_path', default=None,
|
||||
help='Ephemeral disk data loss warning path, relative to the '
|
||||
'ephemeral disk volume path. E.g.: '
|
||||
'DATALOSS_WARNING_README.txt'),
|
||||
]
|
||||
|
||||
self._cli_options = [
|
||||
|
@ -215,6 +215,9 @@ class BaseMetadataService(object):
|
||||
"""Check if the metadata provider enforces automatic updates."""
|
||||
pass
|
||||
|
||||
def get_ephemeral_disk_data_loss_warning(self):
|
||||
raise NotExistingMetadataException()
|
||||
|
||||
|
||||
class BaseHTTPMetadataService(BaseMetadataService):
|
||||
|
||||
|
94
cloudbaseinit/plugins/common/ephemeraldisk.py
Normal file
94
cloudbaseinit/plugins/common/ephemeraldisk.py
Normal file
@ -0,0 +1,94 @@
|
||||
# Copyright (c) 2017 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from oslo_log import log as oslo_logging
|
||||
|
||||
from cloudbaseinit import conf as cloudbaseinit_conf
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.metadata.services import base as metadata_services_base
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
from cloudbaseinit.plugins.common import base
|
||||
|
||||
CONF = cloudbaseinit_conf.CONF
|
||||
LOG = oslo_logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EphemeralDiskPlugin(base.BasePlugin):
|
||||
@staticmethod
|
||||
def _get_ephemeral_disk_volume_by_mount_point(osutils):
|
||||
if CONF.ephemeral_disk_volume_mount_point:
|
||||
try:
|
||||
paths = osutils.get_volume_path_names_by_mount_point(
|
||||
CONF.ephemeral_disk_volume_mount_point)
|
||||
if paths:
|
||||
return paths[0]
|
||||
except exception.ItemNotFoundException:
|
||||
LOG.debug("Ephemeral disk mount point not found: %s",
|
||||
CONF.ephemeral_disk_volume_mount_point)
|
||||
|
||||
@staticmethod
|
||||
def _get_ephemeral_disk_volume_by_label(osutils):
|
||||
if CONF.ephemeral_disk_volume_label:
|
||||
logical_drives = osutils.get_logical_drives()
|
||||
for logical_drive in logical_drives:
|
||||
label = osutils.get_volume_label(logical_drive)
|
||||
if not label:
|
||||
continue
|
||||
if label.upper() == CONF.ephemeral_disk_volume_label.upper():
|
||||
return logical_drive
|
||||
|
||||
def _get_ephemeral_disk_volume_path(self, osutils):
|
||||
return (self._get_ephemeral_disk_volume_by_mount_point(osutils) or
|
||||
self._get_ephemeral_disk_volume_by_label(osutils))
|
||||
|
||||
def _set_ephemeral_disk_data_loss_warning(self, service,
|
||||
disk_warning_path):
|
||||
LOG.debug("Setting ephemeral disk data loss warning: %s",
|
||||
disk_warning_path)
|
||||
data_loss_warning = b''
|
||||
try:
|
||||
data_loss_warning = service.get_ephemeral_disk_data_loss_warning()
|
||||
except metadata_services_base.NotExistingMetadataException:
|
||||
LOG.debug("Metadata service does not provide an ephemeral "
|
||||
"disk data loss warning content")
|
||||
with open(disk_warning_path, 'wb') as f:
|
||||
f.write(data_loss_warning)
|
||||
|
||||
def execute(self, service, shared_data):
|
||||
|
||||
try:
|
||||
service.get_ephemeral_disk_data_loss_warning()
|
||||
except metadata_services_base.NotExistingMetadataException:
|
||||
return base.PLUGIN_EXECUTION_DONE, False
|
||||
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
ephemeral_disk_volume_path = self._get_ephemeral_disk_volume_path(
|
||||
osutils)
|
||||
|
||||
if not ephemeral_disk_volume_path:
|
||||
LOG.info("Ephemeral disk volume not found")
|
||||
else:
|
||||
if CONF.ephemeral_disk_data_loss_warning_path:
|
||||
disk_warning_path = os.path.join(
|
||||
ephemeral_disk_volume_path,
|
||||
CONF.ephemeral_disk_data_loss_warning_path)
|
||||
self._set_ephemeral_disk_data_loss_warning(
|
||||
service, disk_warning_path)
|
||||
|
||||
return base.PLUGIN_EXECUTION_DONE, False
|
||||
|
||||
def get_os_requirements(self):
|
||||
return 'win32', (5, 2)
|
266
cloudbaseinit/tests/plugins/common/test_ephemeraldisk.py
Normal file
266
cloudbaseinit/tests/plugins/common/test_ephemeraldisk.py
Normal file
@ -0,0 +1,266 @@
|
||||
# Copyright (c) 2017 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
try:
|
||||
import unittest.mock as mock
|
||||
except ImportError:
|
||||
import mock
|
||||
|
||||
from cloudbaseinit import conf as cloudbaseinit_conf
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.metadata.services import base as metadata_services_base
|
||||
from cloudbaseinit.plugins.common import base
|
||||
from cloudbaseinit.plugins.common import ephemeraldisk
|
||||
from cloudbaseinit.tests import testutils
|
||||
|
||||
|
||||
CONF = cloudbaseinit_conf.CONF
|
||||
MODULE_PATH = 'cloudbaseinit.plugins.common.ephemeraldisk'
|
||||
|
||||
|
||||
class TestEphemeralDiskPlugin(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._disk = ephemeraldisk.EphemeralDiskPlugin()
|
||||
|
||||
def _test_get_ephemeral_disk_volume_by_mount_point(self, mount_point,
|
||||
paths, exception_raise):
|
||||
mock_osutils = mock.MagicMock()
|
||||
eclass = exception.ItemNotFoundException
|
||||
if exception_raise:
|
||||
mock_osutils.get_volume_path_names_by_mount_point.side_effect = \
|
||||
eclass()
|
||||
else:
|
||||
mock_osutils.get_volume_path_names_by_mount_point.return_value = \
|
||||
paths
|
||||
|
||||
with testutils.ConfPatcher('ephemeral_disk_volume_mount_point',
|
||||
mount_point):
|
||||
if exception_raise:
|
||||
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
|
||||
self._disk._get_ephemeral_disk_volume_by_mount_point(
|
||||
mock_osutils)
|
||||
expected_logging = [
|
||||
"Ephemeral disk mount point not found: %s" % mount_point
|
||||
]
|
||||
else:
|
||||
result = None
|
||||
result = self._disk._get_ephemeral_disk_volume_by_mount_point(
|
||||
mock_osutils)
|
||||
if mount_point:
|
||||
(mock_osutils.get_volume_path_names_by_mount_point.
|
||||
assert_called_once_with(str(mount_point)))
|
||||
if not exception_raise:
|
||||
if paths:
|
||||
self.assertEqual(result, paths[0])
|
||||
else:
|
||||
self.assertEqual(result, None)
|
||||
else:
|
||||
self.assertEqual(snatcher.output, expected_logging)
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_mount_point_no_mount_point(self):
|
||||
self._test_get_ephemeral_disk_volume_by_mount_point(
|
||||
mount_point=None, paths=None, exception_raise=False)
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_mount_point_no_paths(self):
|
||||
self._test_get_ephemeral_disk_volume_by_mount_point(
|
||||
mount_point=True, paths=None, exception_raise=False)
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_mount_point_exception(self):
|
||||
self._test_get_ephemeral_disk_volume_by_mount_point(
|
||||
mount_point=True, paths=None, exception_raise=True)
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_mount_point(self):
|
||||
self._test_get_ephemeral_disk_volume_by_mount_point(
|
||||
mount_point=True, paths=[mock.sentinel.paths],
|
||||
exception_raise=False)
|
||||
|
||||
def _test_get_ephemeral_disk_volume_by_label(self, label,
|
||||
ephemeral_disk_volume_label):
|
||||
expected_result = None
|
||||
mock_osutils = mock.MagicMock()
|
||||
if ephemeral_disk_volume_label:
|
||||
labels = [None, str(mock.sentinel.label)] * 2
|
||||
labels += [label] + [None, str(mock.sentinel.label)]
|
||||
mock_osutils.get_logical_drives.return_value = range(len(labels))
|
||||
mock_osutils.get_volume_label.side_effect = labels
|
||||
if label.upper() == ephemeral_disk_volume_label.upper():
|
||||
expected_result = labels.index(label)
|
||||
with testutils.ConfPatcher('ephemeral_disk_volume_label',
|
||||
ephemeral_disk_volume_label):
|
||||
result = self._disk._get_ephemeral_disk_volume_by_label(
|
||||
mock_osutils)
|
||||
self.assertEqual(result, expected_result)
|
||||
if ephemeral_disk_volume_label:
|
||||
mock_osutils.get_logical_drives.assert_called_once_with()
|
||||
if expected_result is not None:
|
||||
self.assertEqual(mock_osutils.get_volume_label.call_count,
|
||||
expected_result + 1)
|
||||
else:
|
||||
self.assertEqual(mock_osutils.get_volume_label.call_count,
|
||||
len(labels))
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_label_no_disk_volume_label(self):
|
||||
self._test_get_ephemeral_disk_volume_by_label(
|
||||
label=None, ephemeral_disk_volume_label=None)
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_label_no_label(self):
|
||||
self._test_get_ephemeral_disk_volume_by_label(
|
||||
label=str(mock.sentinel.label),
|
||||
ephemeral_disk_volume_label=str(mock.sentinel.disk_volume_label))
|
||||
|
||||
def test_get_ephemeral_disk_volume_by_label(self):
|
||||
self._test_get_ephemeral_disk_volume_by_label(
|
||||
label=str(mock.sentinel.same_label),
|
||||
ephemeral_disk_volume_label=str(mock.sentinel.same_label))
|
||||
|
||||
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
|
||||
'_get_ephemeral_disk_volume_by_label')
|
||||
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
|
||||
'_get_ephemeral_disk_volume_by_mount_point')
|
||||
def _test_get_ephemeral_disk_volume_path(self, mock_by_mount_point,
|
||||
mock_by_label,
|
||||
by_mount_point, by_label):
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_by_mount_point.return_value = by_mount_point
|
||||
mock_by_label.return_value = by_label
|
||||
result = self._disk._get_ephemeral_disk_volume_path(mock_osutils)
|
||||
if by_mount_point:
|
||||
expected_result = mock_by_mount_point.return_value
|
||||
elif by_label:
|
||||
expected_result = mock_by_label.return_value
|
||||
else:
|
||||
expected_result = None
|
||||
|
||||
self.assertEqual(result, expected_result)
|
||||
mock_by_mount_point.assert_called_once_with(mock_osutils)
|
||||
if not by_mount_point:
|
||||
mock_by_label.assert_called_once_with(mock_osutils)
|
||||
|
||||
def test_get_ephemeral_disk_volume_path_by_mount_point(self):
|
||||
self._test_get_ephemeral_disk_volume_path(
|
||||
by_mount_point=True, by_label=True)
|
||||
|
||||
def test_get_ephemeral_disk_volume_path_by_label(self):
|
||||
self._test_get_ephemeral_disk_volume_path(
|
||||
by_mount_point=None, by_label=True)
|
||||
|
||||
def test_get_ephemeral_disk_volume_path_None(self):
|
||||
self._test_get_ephemeral_disk_volume_path(
|
||||
by_mount_point=None, by_label=None)
|
||||
|
||||
def _test_set_ephemeral_disk_data_loss_warning(self, exception_raised):
|
||||
mock_service = mock.MagicMock()
|
||||
disk_warning_path = str(mock.sentinel.disk_warning_path)
|
||||
expected_logging = [
|
||||
"Setting ephemeral disk data loss warning: %s" % disk_warning_path
|
||||
]
|
||||
if exception_raised:
|
||||
eclass = metadata_services_base.NotExistingMetadataException
|
||||
mock_service.get_ephemeral_disk_data_loss_warning.side_effect = \
|
||||
eclass
|
||||
expected_logging += [
|
||||
"Metadata service does not provide an ephemeral "
|
||||
"disk data loss warning content"
|
||||
]
|
||||
else:
|
||||
mock_service.get_ephemeral_disk_data_loss_warning.return_value = \
|
||||
str(mock.sentinel.data_loss_warning)
|
||||
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
|
||||
with mock.patch(MODULE_PATH + '.open', mock.mock_open(),
|
||||
create=True) as mock_open:
|
||||
self._disk._set_ephemeral_disk_data_loss_warning(
|
||||
mock_service, disk_warning_path)
|
||||
self.assertEqual(snatcher.output, expected_logging)
|
||||
mock_open.assert_called_once_with(disk_warning_path, 'wb')
|
||||
|
||||
def test_set_ephemeral_disk_data_loss_warning(self):
|
||||
self._test_set_ephemeral_disk_data_loss_warning(exception_raised=False)
|
||||
|
||||
def test_set_ephemeral_disk_data_loss_warning_exception(self):
|
||||
self._test_set_ephemeral_disk_data_loss_warning(exception_raised=True)
|
||||
|
||||
@mock.patch('os.path.join')
|
||||
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
|
||||
'_set_ephemeral_disk_data_loss_warning')
|
||||
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
|
||||
'_get_ephemeral_disk_volume_path')
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
@mock.patch('cloudbaseinit.plugins.common.base.PLUGIN_EXECUTION_DONE',
|
||||
mock.sentinel)
|
||||
def _test_execute(self, mock_get_osutils, mock_get_volume_path,
|
||||
mock_set_volume_path, mock_join,
|
||||
not_existing_exception, disk_volume_path,
|
||||
disk_warning_path):
|
||||
mock_service = mock.MagicMock()
|
||||
shared_data = mock.sentinel.shared_data
|
||||
eclass = metadata_services_base.NotExistingMetadataException
|
||||
expected_result = base.PLUGIN_EXECUTION_DONE, False
|
||||
expected_logging = []
|
||||
if not_existing_exception:
|
||||
mock_service.get_ephemeral_disk_data_loss_warning.side_effect = \
|
||||
eclass()
|
||||
|
||||
else:
|
||||
mock_osutils = mock.MagicMock()
|
||||
mock_get_osutils.return_value = mock_osutils
|
||||
mock_get_volume_path.return_value = disk_volume_path
|
||||
if not disk_volume_path:
|
||||
expected_logging += [
|
||||
"Ephemeral disk volume not found"
|
||||
]
|
||||
else:
|
||||
mock_join.return_value = disk_warning_path
|
||||
with testutils.ConfPatcher('ephemeral_disk_data_loss_warning_path',
|
||||
disk_warning_path):
|
||||
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
|
||||
result = self._disk.execute(mock_service, shared_data)
|
||||
self.assertEqual(result, expected_result)
|
||||
self.assertEqual(snatcher.output, expected_logging)
|
||||
(mock_service.get_ephemeral_disk_data_loss_warning.
|
||||
assert_called_once_with())
|
||||
if not not_existing_exception:
|
||||
mock_get_osutils.assert_called_once_with()
|
||||
mock_get_volume_path.assert_called_once_with(mock_osutils)
|
||||
if disk_volume_path and disk_warning_path:
|
||||
mock_join.assert_called_once_with(
|
||||
disk_volume_path, disk_warning_path)
|
||||
mock_set_volume_path.assert_called_once_with(
|
||||
mock_service, disk_warning_path)
|
||||
|
||||
def test_execute_no_metadata(self):
|
||||
self._test_execute(not_existing_exception=True, disk_volume_path=None,
|
||||
disk_warning_path=None)
|
||||
|
||||
def test_execute_no_ephemeral_disk_volume_path(self):
|
||||
self._test_execute(not_existing_exception=None, disk_volume_path=None,
|
||||
disk_warning_path=None)
|
||||
|
||||
def test_execute_no_ephemeral_disk_data_loss_warning_path(self):
|
||||
self._test_execute(
|
||||
not_existing_exception=None,
|
||||
disk_volume_path=str(mock.sentinel.disk_volume_path),
|
||||
disk_warning_path=None)
|
||||
|
||||
def test_execute(self):
|
||||
self._test_execute(
|
||||
not_existing_exception=None,
|
||||
disk_volume_path=str(mock.sentinel.disk_volume_path),
|
||||
disk_warning_path=str(mock.sentinel.path))
|
||||
|
||||
def test_get_os_requirements(self):
|
||||
result = self._disk.get_os_requirements()
|
||||
self.assertEqual(result, ('win32', (5, 2)))
|
Loading…
x
Reference in New Issue
Block a user