
get_cmdline_url was passing a string to response.contents.startswith() where response.contents is now bytes. this changes it to convert input to text, and also to default to text.
221 lines
8.5 KiB
Python
221 lines
8.5 KiB
Python
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
try:
|
|
from unittest import mock
|
|
except ImportError:
|
|
import mock
|
|
try:
|
|
from contextlib import ExitStack
|
|
except ImportError:
|
|
from contextlib2 import ExitStack
|
|
|
|
from cloudinit import handlers
|
|
from cloudinit import helpers
|
|
from cloudinit import settings
|
|
from cloudinit import url_helper
|
|
from cloudinit import util
|
|
|
|
from .helpers import TestCase
|
|
|
|
|
|
class FakeModule(handlers.Handler):
|
|
def __init__(self):
|
|
handlers.Handler.__init__(self, settings.PER_ALWAYS)
|
|
self.types = []
|
|
|
|
def list_types(self):
|
|
return self.types
|
|
|
|
def handle_part(self, data, ctype, filename, payload, frequency):
|
|
pass
|
|
|
|
|
|
class TestWalkerHandleHandler(TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestWalkerHandleHandler, self).setUp()
|
|
tmpdir = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, tmpdir)
|
|
|
|
self.data = {
|
|
"handlercount": 0,
|
|
"frequency": "",
|
|
"handlerdir": tmpdir,
|
|
"handlers": helpers.ContentHandlers(),
|
|
"data": None}
|
|
|
|
self.expected_module_name = "part-handler-%03d" % (
|
|
self.data["handlercount"],)
|
|
expected_file_name = "%s.py" % self.expected_module_name
|
|
self.expected_file_fullname = os.path.join(
|
|
self.data["handlerdir"], expected_file_name)
|
|
self.module_fake = FakeModule()
|
|
self.ctype = None
|
|
self.filename = None
|
|
self.payload = "dummy payload"
|
|
|
|
# Mock the write_file() function. We'll assert that it got called as
|
|
# expected in each of the individual tests.
|
|
resources = ExitStack()
|
|
self.addCleanup(resources.close)
|
|
self.write_file_mock = resources.enter_context(
|
|
mock.patch('cloudinit.util.write_file'))
|
|
|
|
def test_no_errors(self):
|
|
"""Payload gets written to file and added to C{pdata}."""
|
|
with mock.patch('cloudinit.importer.import_module',
|
|
return_value=self.module_fake) as mockobj:
|
|
handlers.walker_handle_handler(self.data, self.ctype,
|
|
self.filename, self.payload)
|
|
mockobj.assert_called_with_once(self.expected_module_name)
|
|
self.write_file_mock.assert_called_with_once(
|
|
self.expected_file_fullname, self.payload, 0o600)
|
|
self.assertEqual(self.data['handlercount'], 1)
|
|
|
|
def test_import_error(self):
|
|
"""Module import errors are logged. No handler added to C{pdata}."""
|
|
with mock.patch('cloudinit.importer.import_module',
|
|
side_effect=ImportError) as mockobj:
|
|
handlers.walker_handle_handler(self.data, self.ctype,
|
|
self.filename, self.payload)
|
|
mockobj.assert_called_with_once(self.expected_module_name)
|
|
self.write_file_mock.assert_called_with_once(
|
|
self.expected_file_fullname, self.payload, 0o600)
|
|
self.assertEqual(self.data['handlercount'], 0)
|
|
|
|
def test_attribute_error(self):
|
|
"""Attribute errors are logged. No handler added to C{pdata}."""
|
|
with mock.patch('cloudinit.importer.import_module',
|
|
side_effect=AttributeError,
|
|
return_value=self.module_fake) as mockobj:
|
|
handlers.walker_handle_handler(self.data, self.ctype,
|
|
self.filename, self.payload)
|
|
mockobj.assert_called_with_once(self.expected_module_name)
|
|
self.write_file_mock.assert_called_with_once(
|
|
self.expected_file_fullname, self.payload, 0o600)
|
|
self.assertEqual(self.data['handlercount'], 0)
|
|
|
|
|
|
class TestHandlerHandlePart(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.data = "fake data"
|
|
self.ctype = "fake ctype"
|
|
self.filename = "fake filename"
|
|
self.payload = "fake payload"
|
|
self.frequency = settings.PER_INSTANCE
|
|
self.headers = {
|
|
'Content-Type': self.ctype,
|
|
}
|
|
|
|
def test_normal_version_1(self):
|
|
"""
|
|
C{handle_part} is called without C{frequency} for
|
|
C{handler_version} == 1.
|
|
"""
|
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
|
handler_version=1)
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
# Assert that the handle_part() method of the mock object got
|
|
# called with the expected arguments.
|
|
mod_mock.handle_part.assert_called_with_once(
|
|
self.data, self.ctype, self.filename, self.payload)
|
|
|
|
def test_normal_version_2(self):
|
|
"""
|
|
C{handle_part} is called with C{frequency} for
|
|
C{handler_version} == 2.
|
|
"""
|
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
|
handler_version=2)
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
# Assert that the handle_part() method of the mock object got
|
|
# called with the expected arguments.
|
|
mod_mock.handle_part.assert_called_with_once(
|
|
self.data, self.ctype, self.filename, self.payload)
|
|
|
|
def test_modfreq_per_always(self):
|
|
"""
|
|
C{handle_part} is called regardless of frequency if nofreq is always.
|
|
"""
|
|
self.frequency = "once"
|
|
mod_mock = mock.Mock(frequency=settings.PER_ALWAYS,
|
|
handler_version=1)
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
# Assert that the handle_part() method of the mock object got
|
|
# called with the expected arguments.
|
|
mod_mock.handle_part.assert_called_with_once(
|
|
self.data, self.ctype, self.filename, self.payload)
|
|
|
|
def test_no_handle_when_modfreq_once(self):
|
|
"""C{handle_part} is not called if frequency is once."""
|
|
self.frequency = "once"
|
|
mod_mock = mock.Mock(frequency=settings.PER_ONCE)
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
# Assert that the handle_part() method of the mock object got
|
|
# called with the expected arguments.
|
|
mod_mock.handle_part.assert_called_with_once(
|
|
self.data, self.ctype, self.filename, self.payload)
|
|
|
|
def test_exception_is_caught(self):
|
|
"""Exceptions within C{handle_part} are caught and logged."""
|
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
|
handler_version=1)
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
mod_mock.handle_part.side_effect = Exception
|
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
|
self.frequency, self.headers)
|
|
mod_mock.handle_part.assert_called_with_once(
|
|
self.data, self.ctype, self.filename, self.payload)
|
|
|
|
|
|
class TestCmdlineUrl(unittest.TestCase):
|
|
def test_invalid_content(self):
|
|
url = "http://example.com/foo"
|
|
key = "mykey"
|
|
payload = b"0"
|
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
|
|
|
with mock.patch('cloudinit.url_helper.readurl',
|
|
return_value=url_helper.StringResponse(payload)):
|
|
self.assertEqual(
|
|
util.get_cmdline_url(names=[key], starts="xxxxxx",
|
|
cmdline=cmdline),
|
|
(key, url, None))
|
|
|
|
def test_valid_content(self):
|
|
url = "http://example.com/foo"
|
|
key = "mykey"
|
|
payload = b"xcloud-config\nmydata: foo\nbar: wark\n"
|
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
|
|
|
with mock.patch('cloudinit.url_helper.readurl',
|
|
return_value=url_helper.StringResponse(payload)):
|
|
self.assertEqual(
|
|
util.get_cmdline_url(names=[key], starts=b"xcloud-config",
|
|
cmdline=cmdline),
|
|
(key, url, payload))
|
|
|
|
def test_no_key_found(self):
|
|
url = "http://example.com/foo"
|
|
key = "mykey"
|
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
|
|
|
with mock.patch('cloudinit.url_helper.readurl',
|
|
return_value=url_helper.StringResponse(b'')):
|
|
self.assertEqual(
|
|
util.get_cmdline_url(names=["does-not-appear"],
|
|
starts="#cloud-config", cmdline=cmdline),
|
|
(None, None, None))
|
|
|
|
|
|
# vi: ts=4 expandtab
|