Change the behavior of write_file plugin

For the moment there is not any difference between the case
when the encoding argument is missing and the case where it
is invalid.

This patch will change this behavior and those scenarios will
be treated separately.

Change-Id: Ic79983d0b150b63435c2ab6c821a83ad9acb0bee
Implements-Blueprint: change-write-file-plugin
Co-Authored-By: Stefan Caraiman <scaraiman@cloudbasesolutions.com>
This commit is contained in:
Alexandru Coman 2016-04-20 17:33:42 +03:00 committed by Stefan Caraiman
parent 6bbffaf075
commit c7632ea731
4 changed files with 51 additions and 23 deletions

View File

@ -63,20 +63,24 @@ def _process_content(content, encoding):
# At this point, content will be string, which is wrong for Python 3.
result = result.encode()
steps = _decode_steps(encoding)
if not steps:
LOG.error("Unknown encoding, doing nothing.")
if not encoding:
# No action is required for this scenario
return result
for mime_type in _decode_steps(encoding):
if mime_type == GZIP_MIME:
steps = _decode_steps(encoding)
if not steps:
LOG.warning("Unknown encoding, assuming plain text.")
return result
for step in steps:
if step == GZIP_MIME:
bufferio = io.BytesIO(result)
with gzip.GzipFile(fileobj=bufferio, mode='rb') as file_handle:
try:
result = file_handle.read()
except (IOError, ValueError):
LOG.exception("Fail to decompress gzip content.")
elif mime_type == BASE64_MIME:
elif step == BASE64_MIME:
try:
result = base64.b64decode(result)
except (ValueError, TypeError):

View File

@ -313,36 +313,34 @@ class TestCloudConfig(unittest.TestCase):
cls.userdata = pkgutil.get_data('cloudbaseinit.tests.resources',
'cloud_config_userdata').decode()
def create_tempfiles(self, number):
for _ in range(number):
tmp = _create_tempfile()
self.addCleanup(os.remove, tmp)
yield tmp
def test_cloud_config_multipart(self):
b64, b64_binary, gz, gz_binary = list(self.create_tempfiles(4))
scenarios = {}
for key in ("b64", "b64_binary", "gzip", "gzip_binary",
"invalid_encoding", "missing_encoding"):
temp_file = _create_tempfile()
scenarios[key] = temp_file
self.addCleanup(os.remove, temp_file)
service = FakeService(self.userdata.format(b64=b64,
b64_binary=b64_binary,
gzip=gz,
gzip_binary=gz_binary))
service = FakeService(self.userdata.format(**scenarios))
with testutils.LogSnatcher('cloudbaseinit.plugins.'
'common.userdataplugins.'
'cloudconfigplugins') as snatcher:
status, reboot = self.plugin.execute(service, {})
for path in (b64, b64_binary, gz, gz_binary):
for path in scenarios.values():
self.assertTrue(os.path.exists(path),
"Path {} should exist.".format(path))
with open(path) as stream:
self.assertEqual('42', stream.read())
self.assertEqual('42', stream.read(), path)
self.assertEqual(status, 1)
self.assertFalse(reboot)
expected_logging = [
'Unknown encoding, doing nothing.',
'Fail to process permissions None, assuming 420',
'Fail to process permissions None, assuming 420',
'Fail to process permissions None, assuming 420'
'Fail to process permissions None, assuming 420',
'Unknown encoding, assuming plain text.',
'Fail to process permissions None, assuming 420',
'Fail to process permissions None, assuming 420',
]
self.assertEqual(expected_logging, snatcher.output)

View File

@ -210,6 +210,7 @@ class WriteFilesPluginTests(unittest.TestCase):
write_files:
- content: NDI=
path: {}
encoding: unknown_encoding
permissions: '0o466'
""".format(tmp))
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
@ -222,9 +223,29 @@ class WriteFilesPluginTests(unittest.TestCase):
with open(tmp) as stream:
self.assertEqual('NDI=', stream.read())
self.assertEqual(["Unknown encoding, doing nothing."],
self.assertEqual(["Unknown encoding, assuming plain text."],
snatcher.output)
def test_missing_encoding(self):
tmp = self._get_tempfile()
code = textwrap.dedent("""
write_files:
- content: NDI=
path: {}
permissions: '0o466'
""".format(tmp))
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'userdataplugins.cloudconfigplugins.'
'write_files') as snatcher:
self.plugin.process_non_multipart(code)
self.assertTrue(os.path.exists(tmp),
"Expected path does not exist.")
with open(tmp) as stream:
self.assertEqual('NDI=', stream.read())
self.assertEqual([], snatcher.output)
def test_invalid_object_passed(self):
with self.assertRaises(exception.CloudbaseInitException) as cm:
write_files.WriteFilesPlugin().process(1)

View File

@ -21,4 +21,9 @@ write_files:
- path: {gzip_binary}
encoding: gzip
content: !!binary |
H4sIAGUfoFQC/zMxAgCIsCQyAgAAAA==
H4sIAGUfoFQC/zMxAgCIsCQyAgAAAA==
- path: {invalid_encoding}
encoding: invalid_encoding
content: '42'
- path: {missing_encoding}
content: '42'