Port test__init__.py to unittest.mock.
This commit is contained in:
parent
474b7f259d
commit
ab0bff66f6
@ -2,11 +2,6 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from contextlib import contextmanager
|
|
||||||
|
|
||||||
from mocker import Mocker
|
|
||||||
from mocker import MockerTestCase
|
|
||||||
|
|
||||||
from cloudinit import helpers as ch
|
from cloudinit import helpers as ch
|
||||||
from cloudinit import util
|
from cloudinit import util
|
||||||
|
|
||||||
@ -86,17 +81,6 @@ else:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def mocker(verify_calls=True):
|
|
||||||
m = Mocker()
|
|
||||||
try:
|
|
||||||
yield m
|
|
||||||
finally:
|
|
||||||
m.restore()
|
|
||||||
if verify_calls:
|
|
||||||
m.verify()
|
|
||||||
|
|
||||||
|
|
||||||
# Makes the old path start
|
# Makes the old path start
|
||||||
# with new base instead of whatever
|
# with new base instead of whatever
|
||||||
# it previously had
|
# it previously had
|
||||||
@ -126,9 +110,8 @@ def retarget_many_wrapper(new_base, am, old_func):
|
|||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
class ResourceUsingTestCase(MockerTestCase):
|
class ResourceUsingTestCase(unittest.TestCase):
|
||||||
def __init__(self, methodName="runTest"):
|
def __init__(self, methodName="runTest"):
|
||||||
MockerTestCase.__init__(self, methodName)
|
|
||||||
self.resource_path = None
|
self.resource_path = None
|
||||||
|
|
||||||
def resourceLocation(self, subname=None):
|
def resourceLocation(self, subname=None):
|
||||||
|
@ -1,10 +1,19 @@
|
|||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
|
||||||
from mocker import MockerTestCase, ARGS, KWARGS
|
try:
|
||||||
|
from unittest import mock
|
||||||
|
except ImportError:
|
||||||
|
import mock
|
||||||
|
try:
|
||||||
|
from contextlib import ExitStack
|
||||||
|
except ImportError:
|
||||||
|
from contextlib2 import ExitStack
|
||||||
|
|
||||||
from cloudinit import handlers
|
from cloudinit import handlers
|
||||||
from cloudinit import helpers
|
from cloudinit import helpers
|
||||||
from cloudinit import importer
|
|
||||||
from cloudinit import settings
|
from cloudinit import settings
|
||||||
from cloudinit import url_helper
|
from cloudinit import url_helper
|
||||||
from cloudinit import util
|
from cloudinit import util
|
||||||
@ -22,76 +31,76 @@ class FakeModule(handlers.Handler):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestWalkerHandleHandler(MockerTestCase):
|
class TestWalkerHandleHandler(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
unittest.TestCase.setUp(self)
|
||||||
MockerTestCase.setUp(self)
|
tmpdir = tempfile.mkdtemp()
|
||||||
|
self.addCleanup(shutil.rmtree, tmpdir)
|
||||||
|
|
||||||
self.data = {
|
self.data = {
|
||||||
"handlercount": 0,
|
"handlercount": 0,
|
||||||
"frequency": "",
|
"frequency": "",
|
||||||
"handlerdir": self.makeDir(),
|
"handlerdir": tmpdir,
|
||||||
"handlers": helpers.ContentHandlers(),
|
"handlers": helpers.ContentHandlers(),
|
||||||
"data": None}
|
"data": None}
|
||||||
|
|
||||||
self.expected_module_name = "part-handler-%03d" % (
|
self.expected_module_name = "part-handler-%03d" % (
|
||||||
self.data["handlercount"],)
|
self.data["handlercount"],)
|
||||||
expected_file_name = "%s.py" % self.expected_module_name
|
expected_file_name = "%s.py" % self.expected_module_name
|
||||||
expected_file_fullname = os.path.join(self.data["handlerdir"],
|
self.expected_file_fullname = os.path.join(
|
||||||
expected_file_name)
|
self.data["handlerdir"], expected_file_name)
|
||||||
self.module_fake = FakeModule()
|
self.module_fake = FakeModule()
|
||||||
self.ctype = None
|
self.ctype = None
|
||||||
self.filename = None
|
self.filename = None
|
||||||
self.payload = "dummy payload"
|
self.payload = "dummy payload"
|
||||||
|
|
||||||
# Mock the write_file function
|
# Mock the write_file() function. We'll assert that it got called as
|
||||||
write_file_mock = self.mocker.replace(util.write_file,
|
# expected in each of the individual tests.
|
||||||
passthrough=False)
|
self.resources = ExitStack()
|
||||||
write_file_mock(expected_file_fullname, self.payload, 0o600)
|
self.write_file_mock = self.resources.enter_context(
|
||||||
|
mock.patch('cloudinit.util.write_file'))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.resources.close()
|
||||||
|
unittest.TestCase.tearDown(self)
|
||||||
|
|
||||||
def test_no_errors(self):
|
def test_no_errors(self):
|
||||||
"""Payload gets written to file and added to C{pdata}."""
|
"""Payload gets written to file and added to C{pdata}."""
|
||||||
import_mock = self.mocker.replace(importer.import_module,
|
with mock.patch('cloudinit.importer.import_module',
|
||||||
passthrough=False)
|
return_value=self.module_fake) as mockobj:
|
||||||
import_mock(self.expected_module_name)
|
handlers.walker_handle_handler(self.data, self.ctype,
|
||||||
self.mocker.result(self.module_fake)
|
self.filename, self.payload)
|
||||||
self.mocker.replay()
|
mockobj.assert_called_with_once(self.expected_module_name)
|
||||||
|
self.write_file_mock.assert_called_with_once(
|
||||||
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
|
self.expected_file_fullname, self.payload, 0o600)
|
||||||
self.payload)
|
self.assertEqual(self.data['handlercount'], 1)
|
||||||
|
|
||||||
self.assertEqual(1, self.data["handlercount"])
|
|
||||||
|
|
||||||
def test_import_error(self):
|
def test_import_error(self):
|
||||||
"""Module import errors are logged. No handler added to C{pdata}."""
|
"""Module import errors are logged. No handler added to C{pdata}."""
|
||||||
import_mock = self.mocker.replace(importer.import_module,
|
with mock.patch('cloudinit.importer.import_module',
|
||||||
passthrough=False)
|
side_effect=ImportError) as mockobj:
|
||||||
import_mock(self.expected_module_name)
|
handlers.walker_handle_handler(self.data, self.ctype,
|
||||||
self.mocker.throw(ImportError())
|
self.filename, self.payload)
|
||||||
self.mocker.replay()
|
mockobj.assert_called_with_once(self.expected_module_name)
|
||||||
|
self.write_file_mock.assert_called_with_once(
|
||||||
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
|
self.expected_file_fullname, self.payload, 0o600)
|
||||||
self.payload)
|
self.assertEqual(self.data['handlercount'], 0)
|
||||||
|
|
||||||
self.assertEqual(0, self.data["handlercount"])
|
|
||||||
|
|
||||||
def test_attribute_error(self):
|
def test_attribute_error(self):
|
||||||
"""Attribute errors are logged. No handler added to C{pdata}."""
|
"""Attribute errors are logged. No handler added to C{pdata}."""
|
||||||
import_mock = self.mocker.replace(importer.import_module,
|
with mock.patch('cloudinit.importer.import_module',
|
||||||
passthrough=False)
|
side_effect=AttributeError,
|
||||||
import_mock(self.expected_module_name)
|
return_value=self.module_fake) as mockobj:
|
||||||
self.mocker.result(self.module_fake)
|
handlers.walker_handle_handler(self.data, self.ctype,
|
||||||
self.mocker.throw(AttributeError())
|
self.filename, self.payload)
|
||||||
self.mocker.replay()
|
mockobj.assert_called_with_once(self.expected_module_name)
|
||||||
|
self.write_file_mock.assert_called_with_once(
|
||||||
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
|
self.expected_file_fullname, self.payload, 0o600)
|
||||||
self.payload)
|
self.assertEqual(self.data['handlercount'], 0)
|
||||||
|
|
||||||
self.assertEqual(0, self.data["handlercount"])
|
|
||||||
|
|
||||||
|
|
||||||
class TestHandlerHandlePart(MockerTestCase):
|
class TestHandlerHandlePart(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.data = "fake data"
|
self.data = "fake data"
|
||||||
@ -108,95 +117,80 @@ class TestHandlerHandlePart(MockerTestCase):
|
|||||||
C{handle_part} is called without C{frequency} for
|
C{handle_part} is called without C{frequency} for
|
||||||
C{handler_version} == 1.
|
C{handler_version} == 1.
|
||||||
"""
|
"""
|
||||||
mod_mock = self.mocker.mock()
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
||||||
getattr(mod_mock, "frequency")
|
handler_version=1)
|
||||||
self.mocker.result(settings.PER_INSTANCE)
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
getattr(mod_mock, "handler_version")
|
self.frequency, self.headers)
|
||||||
self.mocker.result(1)
|
# Assert that the handle_part() method of the mock object got
|
||||||
mod_mock.handle_part(self.data, self.ctype, self.filename,
|
# called with the expected arguments.
|
||||||
self.payload)
|
mod_mock.handle_part.assert_called_with_once(
|
||||||
self.mocker.replay()
|
self.data, self.ctype, self.filename, self.payload)
|
||||||
|
|
||||||
handlers.run_part(mod_mock, self.data, self.filename,
|
|
||||||
self.payload, self.frequency, self.headers)
|
|
||||||
|
|
||||||
def test_normal_version_2(self):
|
def test_normal_version_2(self):
|
||||||
"""
|
"""
|
||||||
C{handle_part} is called with C{frequency} for
|
C{handle_part} is called with C{frequency} for
|
||||||
C{handler_version} == 2.
|
C{handler_version} == 2.
|
||||||
"""
|
"""
|
||||||
mod_mock = self.mocker.mock()
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
||||||
getattr(mod_mock, "frequency")
|
handler_version=2)
|
||||||
self.mocker.result(settings.PER_INSTANCE)
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
getattr(mod_mock, "handler_version")
|
self.frequency, self.headers)
|
||||||
self.mocker.result(2)
|
# Assert that the handle_part() method of the mock object got
|
||||||
mod_mock.handle_part(self.data, self.ctype, self.filename,
|
# called with the expected arguments.
|
||||||
self.payload, self.frequency)
|
mod_mock.handle_part.assert_called_with_once(
|
||||||
self.mocker.replay()
|
self.data, self.ctype, self.filename, self.payload)
|
||||||
|
|
||||||
handlers.run_part(mod_mock, self.data, self.filename,
|
|
||||||
self.payload, self.frequency, self.headers)
|
|
||||||
|
|
||||||
def test_modfreq_per_always(self):
|
def test_modfreq_per_always(self):
|
||||||
"""
|
"""
|
||||||
C{handle_part} is called regardless of frequency if nofreq is always.
|
C{handle_part} is called regardless of frequency if nofreq is always.
|
||||||
"""
|
"""
|
||||||
self.frequency = "once"
|
self.frequency = "once"
|
||||||
mod_mock = self.mocker.mock()
|
mod_mock = mock.Mock(frequency=settings.PER_ALWAYS,
|
||||||
getattr(mod_mock, "frequency")
|
handler_version=1)
|
||||||
self.mocker.result(settings.PER_ALWAYS)
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
getattr(mod_mock, "handler_version")
|
self.frequency, self.headers)
|
||||||
self.mocker.result(1)
|
# Assert that the handle_part() method of the mock object got
|
||||||
mod_mock.handle_part(self.data, self.ctype, self.filename,
|
# called with the expected arguments.
|
||||||
self.payload)
|
mod_mock.handle_part.assert_called_with_once(
|
||||||
self.mocker.replay()
|
self.data, self.ctype, self.filename, self.payload)
|
||||||
|
|
||||||
handlers.run_part(mod_mock, self.data, self.filename,
|
|
||||||
self.payload, self.frequency, self.headers)
|
|
||||||
|
|
||||||
def test_no_handle_when_modfreq_once(self):
|
def test_no_handle_when_modfreq_once(self):
|
||||||
"""C{handle_part} is not called if frequency is once."""
|
"""C{handle_part} is not called if frequency is once."""
|
||||||
self.frequency = "once"
|
self.frequency = "once"
|
||||||
mod_mock = self.mocker.mock()
|
mod_mock = mock.Mock(frequency=settings.PER_ONCE)
|
||||||
getattr(mod_mock, "frequency")
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
self.mocker.result(settings.PER_ONCE)
|
self.frequency, self.headers)
|
||||||
self.mocker.replay()
|
# Assert that the handle_part() method of the mock object got
|
||||||
|
# called with the expected arguments.
|
||||||
handlers.run_part(mod_mock, self.data, self.filename,
|
mod_mock.handle_part.assert_called_with_once(
|
||||||
self.payload, self.frequency, self.headers)
|
self.data, self.ctype, self.filename, self.payload)
|
||||||
|
|
||||||
def test_exception_is_caught(self):
|
def test_exception_is_caught(self):
|
||||||
"""Exceptions within C{handle_part} are caught and logged."""
|
"""Exceptions within C{handle_part} are caught and logged."""
|
||||||
mod_mock = self.mocker.mock()
|
mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
|
||||||
getattr(mod_mock, "frequency")
|
handler_version=1)
|
||||||
self.mocker.result(settings.PER_INSTANCE)
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
getattr(mod_mock, "handler_version")
|
self.frequency, self.headers)
|
||||||
self.mocker.result(1)
|
mod_mock.handle_part.side_effect = Exception
|
||||||
mod_mock.handle_part(self.data, self.ctype, self.filename,
|
handlers.run_part(mod_mock, self.data, self.filename, self.payload,
|
||||||
self.payload)
|
self.frequency, self.headers)
|
||||||
self.mocker.throw(Exception())
|
mod_mock.handle_part.assert_called_with_once(
|
||||||
self.mocker.replay()
|
self.data, self.ctype, self.filename, self.payload)
|
||||||
|
|
||||||
handlers.run_part(mod_mock, self.data, self.filename,
|
|
||||||
self.payload, self.frequency, self.headers)
|
|
||||||
|
|
||||||
|
|
||||||
class TestCmdlineUrl(MockerTestCase):
|
class TestCmdlineUrl(unittest.TestCase):
|
||||||
def test_invalid_content(self):
|
def test_invalid_content(self):
|
||||||
url = "http://example.com/foo"
|
url = "http://example.com/foo"
|
||||||
key = "mykey"
|
key = "mykey"
|
||||||
payload = "0"
|
payload = "0"
|
||||||
cmdline = "ro %s=%s bar=1" % (key, url)
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
||||||
|
|
||||||
mock_readurl = self.mocker.replace(url_helper.readurl,
|
with mock.patch('cloudinit.url_helper.readurl',
|
||||||
passthrough=False)
|
return_value=url_helper.StringResponse(payload)):
|
||||||
mock_readurl(url, ARGS, KWARGS)
|
self.assertEqual(
|
||||||
self.mocker.result(url_helper.StringResponse(payload))
|
util.get_cmdline_url(names=[key], starts="xxxxxx",
|
||||||
self.mocker.replay()
|
cmdline=cmdline),
|
||||||
|
(key, url, None))
|
||||||
self.assertEqual((key, url, None),
|
|
||||||
util.get_cmdline_url(names=[key], starts="xxxxxx",
|
|
||||||
cmdline=cmdline))
|
|
||||||
|
|
||||||
def test_valid_content(self):
|
def test_valid_content(self):
|
||||||
url = "http://example.com/foo"
|
url = "http://example.com/foo"
|
||||||
@ -204,27 +198,24 @@ class TestCmdlineUrl(MockerTestCase):
|
|||||||
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
|
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
|
||||||
cmdline = "ro %s=%s bar=1" % (key, url)
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
||||||
|
|
||||||
mock_readurl = self.mocker.replace(url_helper.readurl,
|
with mock.patch('cloudinit.url_helper.readurl',
|
||||||
passthrough=False)
|
return_value=url_helper.StringResponse(payload)):
|
||||||
mock_readurl(url, ARGS, KWARGS)
|
self.assertEqual(
|
||||||
self.mocker.result(url_helper.StringResponse(payload))
|
util.get_cmdline_url(names=[key], starts="xcloud-config",
|
||||||
self.mocker.replay()
|
cmdline=cmdline),
|
||||||
|
(key, url, payload))
|
||||||
self.assertEqual((key, url, payload),
|
|
||||||
util.get_cmdline_url(names=[key], starts="xcloud-config",
|
|
||||||
cmdline=cmdline))
|
|
||||||
|
|
||||||
def test_no_key_found(self):
|
def test_no_key_found(self):
|
||||||
url = "http://example.com/foo"
|
url = "http://example.com/foo"
|
||||||
key = "mykey"
|
key = "mykey"
|
||||||
cmdline = "ro %s=%s bar=1" % (key, url)
|
cmdline = "ro %s=%s bar=1" % (key, url)
|
||||||
|
|
||||||
self.mocker.replace(url_helper.readurl, passthrough=False)
|
with mock.patch('cloudinit.url_helper.readurl',
|
||||||
self.mocker.result(url_helper.StringResponse(""))
|
return_value=url_helper.StringResponse('')):
|
||||||
self.mocker.replay()
|
self.assertEqual(
|
||||||
|
util.get_cmdline_url(names=["does-not-appear"],
|
||||||
|
starts="#cloud-config", cmdline=cmdline),
|
||||||
|
(None, None, None))
|
||||||
|
|
||||||
self.assertEqual((None, None, None),
|
|
||||||
util.get_cmdline_url(names=["does-not-appear"],
|
|
||||||
starts="#cloud-config", cmdline=cmdline))
|
|
||||||
|
|
||||||
# vi: ts=4 expandtab
|
# vi: ts=4 expandtab
|
||||||
|
@ -2,7 +2,6 @@ import os
|
|||||||
import stat
|
import stat
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from mocker import MockerTestCase
|
|
||||||
from . import helpers
|
from . import helpers
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
@ -61,7 +60,7 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):
|
|||||||
self.assertEqual([], result)
|
self.assertEqual([], result)
|
||||||
|
|
||||||
|
|
||||||
class TestWriteFile(MockerTestCase):
|
class TestWriteFile(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestWriteFile, self).setUp()
|
super(TestWriteFile, self).setUp()
|
||||||
self.tmp = self.makeDir(prefix="unittest_")
|
self.tmp = self.makeDir(prefix="unittest_")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user