103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
"""Tests for handling of userdata within cloud init"""
|
|
|
|
import logging
|
|
import StringIO
|
|
|
|
from email.mime.base import MIMEBase
|
|
|
|
from mocker import MockerTestCase
|
|
|
|
import cloudinit
|
|
from cloudinit.DataSource import DataSource
|
|
|
|
|
|
instance_id = "i-testing"
|
|
|
|
|
|
class FakeDataSource(DataSource):
|
|
|
|
def __init__(self, userdata):
|
|
self.metadata = {'instance-id': instance_id}
|
|
self.userdata_raw = userdata
|
|
|
|
|
|
class TestConsumeUserData(MockerTestCase):
|
|
|
|
def setUp(self):
|
|
self.mock_write = self.mocker.replace("cloudinit.util.write_file",
|
|
passthrough=False)
|
|
self.mock_write(self.get_ipath("cloud_config"), "", 0600)
|
|
self.capture_log()
|
|
|
|
def tearDown(self):
|
|
self._log.removeHandler(self._log_handler)
|
|
|
|
@staticmethod
|
|
def get_ipath(name):
|
|
return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id,
|
|
cloudinit.pathmap[name])
|
|
|
|
def capture_log(self):
|
|
self.log_file = StringIO.StringIO()
|
|
self._log_handler = logging.StreamHandler(self.log_file)
|
|
self._log_handler.setLevel(logging.DEBUG)
|
|
self._log = logging.getLogger(cloudinit.logger_name)
|
|
self._log.addHandler(self._log_handler)
|
|
|
|
def test_unhandled_type_warning(self):
|
|
"""Raw text without magic is ignored but shows warning"""
|
|
self.mocker.replay()
|
|
ci = cloudinit.CloudInit()
|
|
ci.datasource = FakeDataSource("arbitrary text\n")
|
|
ci.consume_userdata()
|
|
self.assertEqual(
|
|
"Unhandled non-multipart userdata starting 'arbitrary text...'\n",
|
|
self.log_file.getvalue())
|
|
|
|
def test_mime_text_plain(self):
|
|
"""Mime message of type text/plain is ignored without warning"""
|
|
self.mocker.replay()
|
|
ci = cloudinit.CloudInit()
|
|
message = MIMEBase("text", "plain")
|
|
message.set_payload("Just text")
|
|
ci.datasource = FakeDataSource(message.as_string())
|
|
ci.consume_userdata()
|
|
self.assertEqual("", self.log_file.getvalue())
|
|
|
|
def test_shellscript(self):
|
|
"""Raw text starting #!/bin/sh is treated as script"""
|
|
script = "#!/bin/sh\necho hello\n"
|
|
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
|
self.mock_write(outpath, script, 0700)
|
|
self.mocker.replay()
|
|
ci = cloudinit.CloudInit()
|
|
ci.datasource = FakeDataSource(script)
|
|
ci.consume_userdata()
|
|
self.assertEqual("", self.log_file.getvalue())
|
|
|
|
def test_mime_text_x_shellscript(self):
|
|
"""Mime message of type text/x-shellscript is treated as script"""
|
|
script = "#!/bin/sh\necho hello\n"
|
|
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
|
self.mock_write(outpath, script, 0700)
|
|
self.mocker.replay()
|
|
ci = cloudinit.CloudInit()
|
|
message = MIMEBase("text", "x-shellscript")
|
|
message.set_payload(script)
|
|
ci.datasource = FakeDataSource(message.as_string())
|
|
ci.consume_userdata()
|
|
self.assertEqual("", self.log_file.getvalue())
|
|
|
|
def test_mime_text_plain_shell(self):
|
|
"""Mime type text/plain starting #!/bin/sh is treated as script"""
|
|
script = "#!/bin/sh\necho hello\n"
|
|
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
|
self.mock_write(outpath, script, 0700)
|
|
self.mocker.replay()
|
|
ci = cloudinit.CloudInit()
|
|
message = MIMEBase("text", "plain")
|
|
message.set_payload(script)
|
|
ci.datasource = FakeDataSource(message.as_string())
|
|
ci.consume_userdata()
|
|
self.assertEqual("", self.log_file.getvalue())
|