
functions so that they can be 'retargeted' to a temporary directory, which allows us the ability to run a full set of cloud-init stages. Neat things: 1. All cloud-init code is unchanged (as long as it goes through the utils functions for most functionality) 2. Allows for a natural way to setup a temporary directory then patch the new directory as the new 'root' and then run cloud-init stages and then check the contents of what was placed as desired.
149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
import os
|
|
|
|
import mocker
|
|
|
|
from mocker import MockerTestCase
|
|
|
|
from cloudinit import helpers as ch
|
|
from cloudinit import util
|
|
|
|
import shutil
|
|
|
|
# Makes the old path start
|
|
# with new base instead of whatever
|
|
# it previously had
|
|
def rebase_path(old_path, new_base):
|
|
if old_path.startswith(new_base):
|
|
# Already handled...
|
|
return old_path
|
|
# Retarget the base of that path
|
|
# to the new base instead of the
|
|
# old one...
|
|
path = os.path.join(new_base, old_path.lstrip("/"))
|
|
path = os.path.abspath(path)
|
|
return path
|
|
|
|
|
|
# Can work on anything that takes a path
|
|
# as first argument
|
|
def retarget_many_wrapper(new_base, am, old_func):
|
|
def wrapper(*args, **kwds):
|
|
n_args = list(args)
|
|
nam = am
|
|
if am == -1:
|
|
nam = len(n_args)
|
|
for i in range(0, nam):
|
|
path = args[i]
|
|
n_args[i] = rebase_path(path, new_base)
|
|
return old_func(*n_args, **kwds)
|
|
return wrapper
|
|
|
|
|
|
class ResourceUsingTestCase(MockerTestCase):
|
|
def __init__(self, methodName="runTest"):
|
|
MockerTestCase.__init__(self, methodName)
|
|
self.resource_path = None
|
|
|
|
def resourceLocation(self, subname=None):
|
|
if self.resource_path is None:
|
|
paths = [
|
|
os.path.join('tests', 'data'),
|
|
os.path.join('data'),
|
|
os.path.join(os.pardir, 'tests', 'data'),
|
|
os.path.join(os.pardir, 'data'),
|
|
]
|
|
for p in paths:
|
|
if os.path.isdir(p):
|
|
self.resource_path = p
|
|
break
|
|
self.assertTrue((self.resource_path and
|
|
os.path.isdir(self.resource_path)),
|
|
msg="Unable to locate test resource data path!")
|
|
if not subname:
|
|
return self.resource_path
|
|
return os.path.join(self.resource_path, subname)
|
|
|
|
def readResource(self, name):
|
|
where = self.resourceLocation(name)
|
|
with open(where, 'r') as fh:
|
|
return fh.read()
|
|
|
|
def getCloudPaths(self):
|
|
cp = ch.Paths({
|
|
'cloud_dir': self.makeDir(),
|
|
'templates_dir': self.resourceLocation(),
|
|
})
|
|
return cp
|
|
|
|
|
|
class FilesystemMockingTestCase(ResourceUsingTestCase):
|
|
def __init__(self, methodName="runTest"):
|
|
ResourceUsingTestCase.__init__(self, methodName)
|
|
self.patched_funcs = []
|
|
|
|
def replicateTestRoot(self, example_root, target_root):
|
|
real_root = self.resourceLocation()
|
|
real_root = os.path.join(real_root, 'roots', example_root)
|
|
for (dir_path, dirnames, filenames) in os.walk(real_root):
|
|
real_path = dir_path
|
|
make_path = rebase_path(real_path[len(real_root):], target_root)
|
|
util.ensure_dir(make_path)
|
|
for f in filenames:
|
|
real_path = util.abs_join(real_path, f)
|
|
make_path = util.abs_join(make_path, f)
|
|
shutil.copy(real_path, make_path)
|
|
|
|
def tearDown(self):
|
|
self.restore()
|
|
ResourceUsingTestCase.tearDown(self)
|
|
|
|
def restore(self):
|
|
for (mod, f, func) in self.patched_funcs:
|
|
setattr(mod, f, func)
|
|
self.patched_funcs = []
|
|
|
|
def patchUtils(self, new_root):
|
|
patch_funcs = {
|
|
util: [('write_file', 1),
|
|
('load_file', 1),
|
|
('ensure_dir', 1),
|
|
('chmod', 1),
|
|
('delete_dir_contents', 1),
|
|
('del_file', 1),
|
|
('sym_link', -1)],
|
|
}
|
|
for (mod, funcs) in patch_funcs.items():
|
|
for (f, am) in funcs:
|
|
func = getattr(mod, f)
|
|
trap_func = retarget_many_wrapper(new_root, am, func)
|
|
setattr(mod, f, trap_func)
|
|
self.patched_funcs.append((mod, f, func))
|
|
|
|
# Handle subprocess calls
|
|
func = getattr(util, 'subp')
|
|
|
|
def nsubp(*args, **kwargs):
|
|
return ('', '')
|
|
|
|
setattr(util, 'subp', nsubp)
|
|
self.patched_funcs.append((util, 'subp', func))
|
|
|
|
def null_func(*args, **kwargs):
|
|
return None
|
|
|
|
for f in ['chownbyid', 'chownbyname']:
|
|
func = getattr(util, f)
|
|
setattr(util, f, null_func)
|
|
self.patched_funcs.append((util, f, func))
|
|
|
|
def patchOS(self, new_root):
|
|
patch_funcs = {
|
|
os.path: ['isfile', 'exists', 'islink', 'isdir'],
|
|
}
|
|
for (mod, funcs) in patch_funcs.items():
|
|
for f in funcs:
|
|
func = getattr(mod, f)
|
|
trap_func = retarget_many_wrapper(new_root, 1, func)
|
|
setattr(mod, f, trap_func)
|
|
self.patched_funcs.append((mod, f, func))
|