2012-10-05 15:46:20 -07:00

304 lines
11 KiB
Python

from copy import copy
import json
import os
import os.path
import mocker
from mocker import MockerTestCase
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import settings
from cloudinit import util
from cloudinit import helpers
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
'ami-id': 'ami-00000001',
'ami-launch-index': 0,
'ami-manifest-path': 'FIXME',
'block-device-mapping': {
'ami': 'sda1',
'ephemeral0': 'sda2',
'root': '/dev/sda1',
'swap': 'sda3'},
'hostname': 'sm-foo-test.novalocal',
'instance-action': 'none',
'instance-id': 'i-00000001',
'instance-type': 'm1.tiny',
'local-hostname': 'sm-foo-test.novalocal',
'local-ipv4': None,
'placement': {'availability-zone': 'nova'},
'public-hostname': 'sm-foo-test.novalocal',
'public-ipv4': '',
'public-keys': {'0': {'openssh-key': PUBKEY}},
'reservation-id': 'r-iru5qm4m',
'security-groups': ['default']
}
USER_DATA = '#!/bin/sh\necho This is user data\n'
OSTACK_META = {
'availability_zone': 'nova',
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
{'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
'hostname': 'sm-foo-test.novalocal',
'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
'name': 'sm-foo-test',
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
CFG_DRIVE_FILES_V2 = {
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
'ec2/2009-04-04/user-data': USER_DATA,
'ec2/latest/meta-data.json': json.dumps(EC2_META),
'ec2/latest/user-data': USER_DATA,
'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
'openstack/2012-08-10/user_data': USER_DATA,
'openstack/content/0000': CONTENT_0,
'openstack/content/0001': CONTENT_1,
'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
'openstack/latest/user_data': USER_DATA}
class TestConfigDriveDataSource(MockerTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
self.tmp = self.makeDir()
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
found = ds.read_config_drive_dir(self.tmp)
self.assertTrue('ec2-metadata' in found)
ec2_md = found['ec2-metadata']
self.assertEqual(EC2_META, ec2_md)
def test_dev_os_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
found = ds.read_config_drive_dir(self.tmp)
cfg_ds.metadata = found['metadata']
name_tests = {
'ami': '/dev/vda1',
'root': '/dev/vda1',
'ephemeral0': '/dev/vda2',
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
my_mock = mocker.Mocker()
find_mock = my_mock.replace(util.find_devs_with,
spec=False, passthrough=False)
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
find_mock(mocker.ARGS)
my_mock.result([provided_name])
exists_mock = my_mock.replace(os.path.exists,
spec=False, passthrough=False)
exists_mock(mocker.ARGS)
my_mock.result(False)
exists_mock(mocker.ARGS)
my_mock.result(True)
my_mock.replay()
device = cfg_ds.device_name_to_device(name)
my_mock.restore()
self.assertEquals(dev_name, device)
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
found = ds.read_config_drive_dir(self.tmp)
os_md = found['metadata']
cfg_ds.metadata = os_md
name_tests = {
'ami': '/dev/vda1',
'root': '/dev/vda1',
'ephemeral0': '/dev/vda2',
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
my_mock = mocker.Mocker()
find_mock = my_mock.replace(util.find_devs_with,
spec=False, passthrough=False)
find_mock(mocker.ARGS)
my_mock.result([dev_name])
exists_mock = my_mock.replace(os.path.exists,
spec=False, passthrough=False)
exists_mock(mocker.ARGS)
my_mock.result(True)
my_mock.replay()
device = cfg_ds.device_name_to_device(name)
my_mock.restore()
self.assertEquals(dev_name, device)
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
found = ds.read_config_drive_dir(self.tmp)
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
cfg_ds.metadata = os_md
name_tests = {
'ami': '/dev/vda1',
'root': '/dev/vda1',
'ephemeral0': '/dev/vda2',
'swap': '/dev/vda3',
None: None,
'bob': None,
'root2k': None,
}
for name, dev_name in name_tests.items():
my_mock = mocker.Mocker()
exists_mock = my_mock.replace(os.path.exists,
spec=False, passthrough=False)
exists_mock(mocker.ARGS)
my_mock.result(False)
exists_mock(mocker.ARGS)
my_mock.result(True)
my_mock.replay()
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
my_mock.restore()
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
found = ds.read_config_drive_dir(self.tmp)
exists_mock = self.mocker.replace(os.path.exists,
spec=False, passthrough=False)
exists_mock(mocker.ARGS)
self.mocker.count(0, None)
self.mocker.result(True)
self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
cfg_ds.metadata = os_md
name_tests = {
'ami': '/dev/sda1',
'root': '/dev/sda1',
'ephemeral0': '/dev/sda2',
'swap': '/dev/sda3',
None: None,
'bob': None,
'root2k': None,
}
for name, dev_name in name_tests.items():
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
found = ds.read_config_drive_dir(self.tmp)
expected_md = copy(OSTACK_META)
expected_md['instance-id'] = expected_md['uuid']
self.assertEqual(USER_DATA, found['userdata'])
self.assertEqual(expected_md, found['metadata'])
self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
def test_seed_dir_valid_extra(self):
"""Verify extra files do not affect datasource validity."""
data = copy(CFG_DRIVE_FILES_V2)
data["myfoofile.txt"] = "myfoocontent"
data["openstack/latest/random-file.txt"] = "random-content"
populate_dir(self.tmp, data)
found = ds.read_config_drive_dir(self.tmp)
expected_md = copy(OSTACK_META)
expected_md['instance-id'] = expected_md['uuid']
self.assertEqual(expected_md, found['metadata'])
def test_seed_dir_bad_json_metadata(self):
"""Verify that bad json in metadata raises BrokenConfigDriveDir."""
data = copy(CFG_DRIVE_FILES_V2)
data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
data["openstack/latest/meta_data.json"] = "non-json garbage {}"
populate_dir(self.tmp, data)
self.assertRaises(ds.BrokenConfigDriveDir,
ds.read_config_drive_dir, self.tmp)
def test_seed_dir_no_configdrive(self):
"""Verify that no metadata raises NonConfigDriveDir."""
my_d = os.path.join(self.tmp, "non-configdrive")
data = copy(CFG_DRIVE_FILES_V2)
data["myfoofile.txt"] = "myfoocontent"
data["openstack/latest/random-file.txt"] = "random-content"
data["content/foo"] = "foocontent"
self.assertRaises(ds.NonConfigDriveDir,
ds.read_config_drive_dir, my_d)
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises NonConfigDriveDir."""
my_d = os.path.join(self.tmp, "nonexistantdirectory")
self.assertRaises(ds.NonConfigDriveDir,
ds.read_config_drive_dir, my_d)
def test_find_candidates(self):
devs_with_answers = {
"TYPE=vfat": [],
"TYPE=iso9660": ["/dev/vdb"],
"LABEL=config-2": ["/dev/vdb"],
}
def my_devs_with(criteria):
return devs_with_answers[criteria]
try:
orig_find_devs_with = util.find_devs_with
util.find_devs_with = my_devs_with
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
# zdd reverse sorts after vdb, but config-2 label is preferred
devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
self.assertEqual(["/dev/vdb", "/dev/zdd"],
ds.find_candidate_devs())
# verify that partitions are not considered
devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
"TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
self.assertEqual([], ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
def populate_dir(seed_dir, files):
for (name, content) in files.iteritems():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
with open(path, "w") as fp:
fp.write(content)
fp.close()
# vi: ts=4 expandtab