Get both of these tests back in working shape.

This commit is contained in:
Joshua Harlow 2012-06-21 17:37:00 -07:00
parent 236c293e53
commit 539a05e141
2 changed files with 131 additions and 132 deletions

View File

@ -1,18 +1,42 @@
from mocker import MockerTestCase, ANY, ARGS, KWARGS
import StringIO
import logging
import os
import sys
from cloudinit import (partwalker_handle_handler, handler_handle_part,
handler_register, get_cmdline_url)
from cloudinit.util import write_file, logexc, readurl
from mocker import MockerTestCase, ANY, ARGS, KWARGS
from cloudinit import handlers
from cloudinit import helpers
from cloudinit import importer
from cloudinit import log
from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
class TestPartwalkerHandleHandler(MockerTestCase):
class FakeModule(handlers.Handler):
def __init__(self):
handlers.Handler.__init__(self, settings.PER_ALWAYS)
self.types = []
def list_types(self):
return self.types
def _handle_part(self, data, ctype, filename, payload, frequency):
pass
class TestWalkerHandleHandler(MockerTestCase):
def setUp(self):
MockerTestCase.setUp(self)
self.data = {
"handlercount": 0,
"frequency": "?",
"handlerdir": "?",
"handlers": [],
"frequency": "",
"handlerdir": self.makeDir(),
"handlers": helpers.ContentHandlers(),
"data": None}
self.expected_module_name = "part-handler-%03d" % (
@ -20,179 +44,138 @@ class TestPartwalkerHandleHandler(MockerTestCase):
expected_file_name = "%s.py" % self.expected_module_name
expected_file_fullname = os.path.join(self.data["handlerdir"],
expected_file_name)
self.module_fake = "fake module handle"
self.module_fake = FakeModule()
self.ctype = None
self.filename = None
self.payload = "dummy payload"
# Mock the write_file function
write_file_mock = self.mocker.replace(write_file, passthrough=False)
write_file_mock = self.mocker.replace(util.write_file, passthrough=False)
write_file_mock(expected_file_fullname, self.payload, 0600)
def test_no_errors(self):
"""Payload gets written to file and added to C{pdata}."""
# Mock the __import__ builtin
import_mock = self.mocker.replace("__builtin__.__import__")
import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
# Mock the handle_register function
handle_reg_mock = self.mocker.replace(handler_register,
passthrough=False)
handle_reg_mock(self.module_fake, self.data["handlers"],
self.data["data"], self.data["frequency"])
# Activate mocks
self.mocker.replay()
partwalker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
self.assertEqual(1, self.data["handlercount"])
def test_import_error(self):
"""Module import errors are logged. No handler added to C{pdata}"""
# Mock the __import__ builtin
import_mock = self.mocker.replace("__builtin__.__import__")
import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.throw(ImportError())
# Mock log function
logexc_mock = self.mocker.replace(logexc, passthrough=False)
logexc_mock(ANY)
# Mock the print_exc function
print_exc_mock = self.mocker.replace("traceback.print_exc",
passthrough=False)
print_exc_mock(ARGS, KWARGS)
# Activate mocks
self.mocker.replay()
partwalker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
self.assertEqual(0, self.data["handlercount"])
def test_attribute_error(self):
"""Attribute errors are logged. No handler added to C{pdata}"""
# Mock the __import__ builtin
import_mock = self.mocker.replace("__builtin__.__import__")
import_mock = self.mocker.replace(importer.import_module, passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
# Mock the handle_register function
handle_reg_mock = self.mocker.replace(handler_register,
passthrough=False)
handle_reg_mock(self.module_fake, self.data["handlers"],
self.data["data"], self.data["frequency"])
self.mocker.throw(AttributeError())
# Mock log function
logexc_mock = self.mocker.replace(logexc, passthrough=False)
logexc_mock(ANY)
# Mock the print_exc function
print_exc_mock = self.mocker.replace("traceback.print_exc",
passthrough=False)
print_exc_mock(ARGS, KWARGS)
# Activate mocks
self.mocker.replay()
partwalker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
self.assertEqual(0, self.data["handlercount"])
class TestHandlerHandlePart(MockerTestCase):
def setUp(self):
self.data = "fake data"
self.ctype = "fake ctype"
self.filename = "fake filename"
self.payload = "fake payload"
self.frequency = "once-per-instance"
self.frequency = settings.PER_INSTANCE
def test_normal_version_1(self):
"""
C{handle_part} is called without C{frequency} for
C{handler_version} == 1.
"""
# Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
self.mocker.result("once-per-instance")
self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.replay()
handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
def test_normal_version_2(self):
"""
C{handle_part} is called with C{frequency} for
C{handler_version} == 2.
"""
# Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
self.mocker.result("once-per-instance")
self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(2)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload, self.frequency)
self.mocker.replay()
handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
def test_modfreq_per_always(self):
"""
C{handle_part} is called regardless of frequency if nofreq is always.
"""
self.frequency = "once"
# Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
self.mocker.result("always")
self.mocker.result(settings.PER_ALWAYS)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.replay()
handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once"""
self.frequency = "once"
# Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
self.mocker.result("once-per-instance")
self.mocker.result(settings.PER_ONCE)
self.mocker.replay()
handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
# Build a mock part-handler module
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
self.mocker.result("once-per-instance")
self.mocker.result(settings.PER_INSTANCE)
getattr(mod_mock, "handler_version")
self.mocker.result(1)
mod_mock.handle_part(self.data, self.ctype, self.filename,
self.payload)
self.mocker.throw(Exception())
# Mock log function
logexc_mock = self.mocker.replace(logexc, passthrough=False)
logexc_mock(ANY)
# Mock the print_exc function
print_exc_mock = self.mocker.replace("traceback.print_exc",
passthrough=False)
print_exc_mock(ARGS, KWARGS)
self.mocker.replay()
handler_handle_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
self.payload, self.frequency)
class TestCmdlineUrl(MockerTestCase):
@ -202,14 +185,13 @@ class TestCmdlineUrl(MockerTestCase):
payload = "0"
cmdline = "ro %s=%s bar=1" % (key, url)
mock_readurl = self.mocker.replace(readurl, passthrough=False)
mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
mock_readurl(url)
self.mocker.result(payload)
self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
self.assertEqual((key, url, None),
get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline))
util.get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline))
def test_valid_content(self):
url = "http://example.com/foo"
@ -217,14 +199,13 @@ class TestCmdlineUrl(MockerTestCase):
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
mock_readurl = self.mocker.replace(readurl, passthrough=False)
mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
mock_readurl(url)
self.mocker.result(payload)
self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
self.assertEqual((key, url, payload),
get_cmdline_url(names=[key], starts="xcloud-config",
util.get_cmdline_url(names=[key], starts="xcloud-config",
cmdline=cmdline))
def test_no_key_found(self):
@ -232,11 +213,12 @@ class TestCmdlineUrl(MockerTestCase):
key = "mykey"
cmdline = "ro %s=%s bar=1" % (key, url)
self.mocker.replace(readurl, passthrough=False)
self.mocker.replace(url_helper.readurl, passthrough=False)
self.mocker.result(url_helper.UrlResponse(400))
self.mocker.replay()
self.assertEqual((None, None, None),
get_cmdline_url(names=["does-not-appear"],
util.get_cmdline_url(names=["does-not-appear"],
starts="#cloud-config", cmdline=cmdline))
# vi: ts=4 expandtab

View File

@ -1,9 +1,12 @@
from mocker import MockerTestCase
from cloudinit.util import write_file, delete_dir_contents
from cloudinit.CloudConfig.cc_ca_certs import (
handle, update_ca_certs, add_ca_certs, remove_default_ca_certs)
from logging import getLogger
from cloudinit import util
from cloudinit import cloud
from cloudinit import helpers
from cloudinit.config import cc_ca_certs
import logging
class TestNoConfig(MockerTestCase):
@ -11,36 +14,37 @@ class TestNoConfig(MockerTestCase):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
self.cloud_init = None
self.log = getLogger("TestNoConfig")
self.log = logging.getLogger("TestNoConfig")
self.args = []
def test_no_config(self):
"""
Test that nothing is done if no ca-certs configuration is provided.
"""
config = {"unknown-key": "value"}
self.mocker.replace(write_file, passthrough=False)
self.mocker.replace(update_ca_certs, passthrough=False)
config = util.get_builtin_cfg()
self.mocker.replace(util.write_file, passthrough=False)
self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, self.args)
class TestConfig(MockerTestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
self.cloud_init = None
self.log = getLogger("TestNoConfig")
self.paths = None
self.cloud = cloud.Cloud(None, self.paths, None, None, None)
self.log = logging.getLogger("TestNoConfig")
self.args = []
# Mock out the functions that actually modify the system
self.mock_add = self.mocker.replace(add_ca_certs, passthrough=False)
self.mock_update = self.mocker.replace(update_ca_certs,
self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, passthrough=False)
self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
passthrough=False)
self.mock_remove = self.mocker.replace(remove_default_ca_certs,
self.mock_remove = self.mocker.replace(cc_ca_certs.remove_default_ca_certs,
passthrough=False)
# Order must be correct
self.mocker.order()
@ -55,7 +59,7 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty"""
@ -65,37 +69,37 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1"]}}
self.mock_add(["CERT1"])
self.mock_add(self.paths, ["CERT1"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
self.mock_add(["CERT1", "CERT2"])
self.mock_add(self.paths, ["CERT1", "CERT2"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected"""
config = {"ca-certs": {"remove-defaults": True}}
self.mock_remove()
self.mock_remove(self.paths)
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False"""
@ -104,72 +108,85 @@ class TestConfig(MockerTestCase):
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False"""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
self.mock_remove()
self.mock_add(["CERT1"])
self.mock_remove(self.paths)
self.mock_add(self.paths, ["CERT1"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
class TestAddCaCerts(MockerTestCase):
def setUp(self):
super(TestAddCaCerts, self).setUp()
self.paths = helpers.Paths({
'cloud_dir': self.makeDir()
})
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
self.mocker.replace(write_file, passthrough=False)
self.mocker.replace(util.write_file, passthrough=False)
self.mocker.replay()
add_ca_certs([])
cc_ca_certs.add_ca_certs(self.paths, [])
def test_single_cert(self):
"""Test adding a single certificate to the trusted CAs"""
cert = "CERT1\nLINE2\nLINE3"
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
mock_write("/etc/ca-certificates.conf",
"\ncloud-init-ca-certs.crt", omode="a")
"\ncloud-init-ca-certs.crt", omode="ab")
self.mocker.replay()
add_ca_certs([cert])
cc_ca_certs.add_ca_certs(self.paths, [cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs"""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
mock_write("/etc/ca-certificates.conf",
"\ncloud-init-ca-certs.crt", omode="a")
"\ncloud-init-ca-certs.crt", omode="ab")
self.mocker.replay()
add_ca_certs(certs)
cc_ca_certs.add_ca_certs(self.paths, certs)
class TestUpdateCaCerts(MockerTestCase):
def test_commands(self):
mock_check_call = self.mocker.replace("subprocess.check_call",
mock_check_call = self.mocker.replace(util.subp,
passthrough=False)
mock_check_call(["update-ca-certificates"])
self.mocker.replay()
update_ca_certs()
cc_ca_certs.update_ca_certs()
class TestRemoveDefaultCaCerts(MockerTestCase):
def setUp(self):
super(TestRemoveDefaultCaCerts, self).setUp()
self.paths = helpers.Paths({
'cloud_dir': self.makeDir()
})
def test_commands(self):
mock_delete_dir_contents = self.mocker.replace(delete_dir_contents,
mock_delete_dir_contents = self.mocker.replace(util.delete_dir_contents,
passthrough=False)
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_subp = self.mocker.replace("cloudinit.util.subp",
mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_subp = self.mocker.replace(util.subp,
passthrough=False)
mock_delete_dir_contents("/usr/share/ca-certificates/")
@ -179,4 +196,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase):
"ca-certificates ca-certificates/trust_new_crts select no")
self.mocker.replay()
remove_default_ca_certs()
cc_ca_certs.remove_default_ca_certs(self.paths)