
This patch fixes issues in Fedora 18 (and upcoming RHEL 7) which are present due to their use of systemd: - store locale configuration in /etc/locale.conf - store hostname in /etc/hostname - use a symlink for /etc/localtime (prior code would set the timezone but corrupt data in /usr/share/zoneinfo due to presence of symlink) It also contains fixes for issues unrelated to systemd adoption: - explicitly scan /dev/sr0 with blkid in order to get the optical drive in the blkid cache. This prevents an issue on systems running 2.6 kernels (such as RHEL 6) in which config disks on some devices won't be detected unless the device has previously been queried. (For reference, see https://patchwork.kernel.org/patch/1770241/) - append a newline when rewriting sysconfig files, as this is customary text configuration file formatting and is expected by some parsers (such as the ifcfg-rh plugin for NetworkManager)
335 lines
12 KiB
Python
335 lines
12 KiB
Python
from copy import copy
|
|
import json
|
|
import os
|
|
import os.path
|
|
|
|
import mocker
|
|
from mocker import MockerTestCase
|
|
|
|
from cloudinit import helpers
|
|
from cloudinit import settings
|
|
from cloudinit.sources import DataSourceConfigDrive as ds
|
|
from cloudinit import util
|
|
|
|
from tests.unittests import helpers as unit_helpers
|
|
|
|
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
|
|
EC2_META = {
|
|
'ami-id': 'ami-00000001',
|
|
'ami-launch-index': 0,
|
|
'ami-manifest-path': 'FIXME',
|
|
'block-device-mapping': {
|
|
'ami': 'sda1',
|
|
'ephemeral0': 'sda2',
|
|
'root': '/dev/sda1',
|
|
'swap': 'sda3'},
|
|
'hostname': 'sm-foo-test.novalocal',
|
|
'instance-action': 'none',
|
|
'instance-id': 'i-00000001',
|
|
'instance-type': 'm1.tiny',
|
|
'local-hostname': 'sm-foo-test.novalocal',
|
|
'local-ipv4': None,
|
|
'placement': {'availability-zone': 'nova'},
|
|
'public-hostname': 'sm-foo-test.novalocal',
|
|
'public-ipv4': '',
|
|
'public-keys': {'0': {'openssh-key': PUBKEY}},
|
|
'reservation-id': 'r-iru5qm4m',
|
|
'security-groups': ['default']
|
|
}
|
|
USER_DATA = '#!/bin/sh\necho This is user data\n'
|
|
OSTACK_META = {
|
|
'availability_zone': 'nova',
|
|
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
|
|
{'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
|
|
'hostname': 'sm-foo-test.novalocal',
|
|
'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
|
|
'name': 'sm-foo-test',
|
|
'public_keys': {'mykey': PUBKEY},
|
|
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
|
|
|
|
CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
|
|
CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
|
|
|
|
CFG_DRIVE_FILES_V2 = {
|
|
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
|
|
'ec2/2009-04-04/user-data': USER_DATA,
|
|
'ec2/latest/meta-data.json': json.dumps(EC2_META),
|
|
'ec2/latest/user-data': USER_DATA,
|
|
'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
|
|
'openstack/2012-08-10/user_data': USER_DATA,
|
|
'openstack/content/0000': CONTENT_0,
|
|
'openstack/content/0001': CONTENT_1,
|
|
'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
|
|
'openstack/latest/user_data': USER_DATA}
|
|
|
|
|
|
class TestConfigDriveDataSource(MockerTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestConfigDriveDataSource, self).setUp()
|
|
self.tmp = self.makeDir()
|
|
|
|
def test_ec2_metadata(self):
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
self.assertTrue('ec2-metadata' in found)
|
|
ec2_md = found['ec2-metadata']
|
|
self.assertEqual(EC2_META, ec2_md)
|
|
|
|
def test_dev_os_remap(self):
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
|
|
None,
|
|
helpers.Paths({}))
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
cfg_ds.metadata = found['metadata']
|
|
name_tests = {
|
|
'ami': '/dev/vda1',
|
|
'root': '/dev/vda1',
|
|
'ephemeral0': '/dev/vda2',
|
|
'swap': '/dev/vda3',
|
|
}
|
|
for name, dev_name in name_tests.items():
|
|
with unit_helpers.mocker() as my_mock:
|
|
find_mock = my_mock.replace(util.find_devs_with,
|
|
spec=False, passthrough=False)
|
|
provided_name = dev_name[len('/dev/'):]
|
|
provided_name = "s" + provided_name[1:]
|
|
find_mock(mocker.ARGS)
|
|
my_mock.result([provided_name])
|
|
exists_mock = my_mock.replace(os.path.exists,
|
|
spec=False, passthrough=False)
|
|
exists_mock(mocker.ARGS)
|
|
my_mock.result(False)
|
|
exists_mock(mocker.ARGS)
|
|
my_mock.result(True)
|
|
my_mock.replay()
|
|
device = cfg_ds.device_name_to_device(name)
|
|
self.assertEquals(dev_name, device)
|
|
|
|
def test_dev_os_map(self):
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
|
|
None,
|
|
helpers.Paths({}))
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
os_md = found['metadata']
|
|
cfg_ds.metadata = os_md
|
|
name_tests = {
|
|
'ami': '/dev/vda1',
|
|
'root': '/dev/vda1',
|
|
'ephemeral0': '/dev/vda2',
|
|
'swap': '/dev/vda3',
|
|
}
|
|
for name, dev_name in name_tests.items():
|
|
with unit_helpers.mocker() as my_mock:
|
|
find_mock = my_mock.replace(util.find_devs_with,
|
|
spec=False, passthrough=False)
|
|
find_mock(mocker.ARGS)
|
|
my_mock.result([dev_name])
|
|
exists_mock = my_mock.replace(os.path.exists,
|
|
spec=False, passthrough=False)
|
|
exists_mock(mocker.ARGS)
|
|
my_mock.result(True)
|
|
my_mock.replay()
|
|
device = cfg_ds.device_name_to_device(name)
|
|
self.assertEquals(dev_name, device)
|
|
|
|
def test_dev_ec2_remap(self):
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
|
|
None,
|
|
helpers.Paths({}))
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
ec2_md = found['ec2-metadata']
|
|
os_md = found['metadata']
|
|
cfg_ds.ec2_metadata = ec2_md
|
|
cfg_ds.metadata = os_md
|
|
name_tests = {
|
|
'ami': '/dev/vda1',
|
|
'root': '/dev/vda1',
|
|
'ephemeral0': '/dev/vda2',
|
|
'swap': '/dev/vda3',
|
|
None: None,
|
|
'bob': None,
|
|
'root2k': None,
|
|
}
|
|
for name, dev_name in name_tests.items():
|
|
with unit_helpers.mocker(verify_calls=False) as my_mock:
|
|
exists_mock = my_mock.replace(os.path.exists,
|
|
spec=False, passthrough=False)
|
|
exists_mock(mocker.ARGS)
|
|
my_mock.result(False)
|
|
exists_mock(mocker.ARGS)
|
|
my_mock.result(True)
|
|
my_mock.replay()
|
|
device = cfg_ds.device_name_to_device(name)
|
|
self.assertEquals(dev_name, device)
|
|
|
|
def test_dev_ec2_map(self):
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
|
|
None,
|
|
helpers.Paths({}))
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
exists_mock = self.mocker.replace(os.path.exists,
|
|
spec=False, passthrough=False)
|
|
exists_mock(mocker.ARGS)
|
|
self.mocker.count(0, None)
|
|
self.mocker.result(True)
|
|
self.mocker.replay()
|
|
ec2_md = found['ec2-metadata']
|
|
os_md = found['metadata']
|
|
cfg_ds.ec2_metadata = ec2_md
|
|
cfg_ds.metadata = os_md
|
|
name_tests = {
|
|
'ami': '/dev/sda1',
|
|
'root': '/dev/sda1',
|
|
'ephemeral0': '/dev/sda2',
|
|
'swap': '/dev/sda3',
|
|
None: None,
|
|
'bob': None,
|
|
'root2k': None,
|
|
}
|
|
for name, dev_name in name_tests.items():
|
|
device = cfg_ds.device_name_to_device(name)
|
|
self.assertEquals(dev_name, device)
|
|
|
|
def test_dir_valid(self):
|
|
"""Verify a dir is read as such."""
|
|
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
|
|
expected_md = copy(OSTACK_META)
|
|
expected_md['instance-id'] = expected_md['uuid']
|
|
|
|
self.assertEqual(USER_DATA, found['userdata'])
|
|
self.assertEqual(expected_md, found['metadata'])
|
|
self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
|
|
self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
|
|
|
|
def test_seed_dir_valid_extra(self):
|
|
"""Verify extra files do not affect datasource validity."""
|
|
|
|
data = copy(CFG_DRIVE_FILES_V2)
|
|
data["myfoofile.txt"] = "myfoocontent"
|
|
data["openstack/latest/random-file.txt"] = "random-content"
|
|
|
|
populate_dir(self.tmp, data)
|
|
|
|
found = ds.read_config_drive_dir(self.tmp)
|
|
|
|
expected_md = copy(OSTACK_META)
|
|
expected_md['instance-id'] = expected_md['uuid']
|
|
|
|
self.assertEqual(expected_md, found['metadata'])
|
|
|
|
def test_seed_dir_bad_json_metadata(self):
|
|
"""Verify that bad json in metadata raises BrokenConfigDriveDir."""
|
|
data = copy(CFG_DRIVE_FILES_V2)
|
|
|
|
data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
|
|
data["openstack/latest/meta_data.json"] = "non-json garbage {}"
|
|
|
|
populate_dir(self.tmp, data)
|
|
|
|
self.assertRaises(ds.BrokenConfigDriveDir,
|
|
ds.read_config_drive_dir, self.tmp)
|
|
|
|
def test_seed_dir_no_configdrive(self):
|
|
"""Verify that no metadata raises NonConfigDriveDir."""
|
|
|
|
my_d = os.path.join(self.tmp, "non-configdrive")
|
|
data = copy(CFG_DRIVE_FILES_V2)
|
|
data["myfoofile.txt"] = "myfoocontent"
|
|
data["openstack/latest/random-file.txt"] = "random-content"
|
|
data["content/foo"] = "foocontent"
|
|
|
|
self.assertRaises(ds.NonConfigDriveDir,
|
|
ds.read_config_drive_dir, my_d)
|
|
|
|
def test_seed_dir_missing(self):
|
|
"""Verify that missing seed_dir raises NonConfigDriveDir."""
|
|
my_d = os.path.join(self.tmp, "nonexistantdirectory")
|
|
self.assertRaises(ds.NonConfigDriveDir,
|
|
ds.read_config_drive_dir, my_d)
|
|
|
|
def test_find_candidates(self):
|
|
devs_with_answers = {}
|
|
|
|
def my_devs_with(*args, **kwargs):
|
|
criteria = args[0] if len(args) else kwargs.pop('criteria', None)
|
|
return devs_with_answers.get(criteria, [])
|
|
|
|
def my_is_partition(dev):
|
|
return dev[-1] in "0123456789" and not dev.startswith("sr")
|
|
|
|
try:
|
|
orig_find_devs_with = util.find_devs_with
|
|
util.find_devs_with = my_devs_with
|
|
|
|
orig_is_partition = util.is_partition
|
|
util.is_partition = my_is_partition
|
|
|
|
devs_with_answers = {"TYPE=vfat": [],
|
|
"TYPE=iso9660": ["/dev/vdb"],
|
|
"LABEL=config-2": ["/dev/vdb"],
|
|
}
|
|
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
|
|
|
|
# add a vfat item
|
|
# zdd reverse sorts after vdb, but config-2 label is preferred
|
|
devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
|
|
self.assertEqual(["/dev/vdb", "/dev/zdd"],
|
|
ds.find_candidate_devs())
|
|
|
|
# verify that partitions are not considered
|
|
devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
|
|
"TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
|
|
self.assertEqual([], ds.find_candidate_devs())
|
|
|
|
finally:
|
|
util.find_devs_with = orig_find_devs_with
|
|
util.is_partition = orig_is_partition
|
|
|
|
def test_pubkeys_v2(self):
|
|
"""Verify that public-keys work in config-drive-v2."""
|
|
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
|
|
myds = cfg_ds_from_dir(self.tmp)
|
|
self.assertEqual(myds.get_public_ssh_keys(),
|
|
[OSTACK_META['public_keys']['mykey']])
|
|
|
|
|
|
def cfg_ds_from_dir(seed_d):
|
|
found = ds.read_config_drive_dir(seed_d)
|
|
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
|
|
helpers.Paths({}))
|
|
populate_ds_from_read_config(cfg_ds, seed_d, found)
|
|
return cfg_ds
|
|
|
|
|
|
def populate_ds_from_read_config(cfg_ds, source, results):
|
|
"""Patch the DataSourceConfigDrive from the results of
|
|
read_config_drive_dir hopefully in line with what it would have
|
|
if cfg_ds.get_data had been successfully called"""
|
|
cfg_ds.source = source
|
|
cfg_ds.metadata = results.get('metadata')
|
|
cfg_ds.ec2_metadata = results.get('ec2-metadata')
|
|
cfg_ds.userdata_raw = results.get('userdata')
|
|
cfg_ds.version = results.get('cfgdrive_ver')
|
|
|
|
|
|
def populate_dir(seed_dir, files):
|
|
for (name, content) in files.iteritems():
|
|
path = os.path.join(seed_dir, name)
|
|
dirname = os.path.dirname(path)
|
|
if not os.path.isdir(dirname):
|
|
os.makedirs(dirname)
|
|
with open(path, "w") as fp:
|
|
fp.write(content)
|
|
fp.close()
|
|
|
|
# vi: ts=4 expandtab
|