328 lines
12 KiB
Python
328 lines
12 KiB
Python
from cloudinit import helpers
|
|
from cloudinit.sources import DataSourceAzure
|
|
from tests.unittests.helpers import populate_dir
|
|
|
|
import base64
|
|
from mocker import MockerTestCase
|
|
import os
|
|
import yaml
|
|
|
|
|
|
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
|
|
if data is None:
|
|
data = {'HostName': 'FOOHOST'}
|
|
if pubkeys is None:
|
|
pubkeys = {}
|
|
|
|
content = """<?xml version="1.0" encoding="utf-8"?>
|
|
<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
|
|
xmlns:oe="http://schemas.dmtf.org/ovf/environment/1"
|
|
xmlns:wa="http://schemas.microsoft.com/windowsazure"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
|
|
|
<wa:ProvisioningSection><wa:Version>1.0</wa:Version>
|
|
<LinuxProvisioningConfigurationSet
|
|
xmlns="http://schemas.microsoft.com/windowsazure"
|
|
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
|
|
<ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType>
|
|
"""
|
|
for key, dval in data.items():
|
|
if isinstance(dval, dict):
|
|
val = dval.get('text')
|
|
attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
|
|
if k != 'text'])
|
|
else:
|
|
val = dval
|
|
attrs = ""
|
|
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
|
|
|
|
if userdata:
|
|
content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
|
|
|
|
if pubkeys:
|
|
content += "<SSH><PublicKeys>\n"
|
|
for fp, path in pubkeys:
|
|
content += " <PublicKey>"
|
|
content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
|
|
(fp, path))
|
|
content += "</PublicKey>\n"
|
|
content += "</PublicKeys></SSH>"
|
|
content += """
|
|
</LinuxProvisioningConfigurationSet>
|
|
</wa:ProvisioningSection>
|
|
<wa:PlatformSettingsSection><wa:Version>1.0</wa:Version>
|
|
<PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure"
|
|
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
|
|
<KmsServerHostname>kms.core.windows.net</KmsServerHostname>
|
|
<ProvisionGuestAgent>false</ProvisionGuestAgent>
|
|
<GuestAgentPackageName i:nil="true" />
|
|
</PlatformSettings></wa:PlatformSettingsSection>
|
|
</Environment>
|
|
"""
|
|
|
|
return content
|
|
|
|
|
|
class TestAzureDataSource(MockerTestCase):
|
|
|
|
def setUp(self):
|
|
# makeDir comes from MockerTestCase
|
|
self.tmp = self.makeDir()
|
|
|
|
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
|
|
self.paths = helpers.Paths({'cloud_dir': self.tmp})
|
|
|
|
self.unapply = []
|
|
super(TestAzureDataSource, self).setUp()
|
|
|
|
def tearDown(self):
|
|
apply_patches([i for i in reversed(self.unapply)])
|
|
super(TestAzureDataSource, self).tearDown()
|
|
|
|
def apply_patches(self, patches):
|
|
ret = apply_patches(patches)
|
|
self.unapply += ret
|
|
|
|
def _get_ds(self, data):
|
|
|
|
def dsdevs():
|
|
return data.get('dsdevs', [])
|
|
|
|
def _invoke_agent(cmd):
|
|
data['agent_invoked'] = cmd
|
|
|
|
def _write_files(datadir, files, dirmode):
|
|
data['files'] = {}
|
|
data['datadir'] = datadir
|
|
data['datadir_mode'] = dirmode
|
|
for (fname, content) in files.items():
|
|
data['files'][fname] = content
|
|
|
|
def _wait_for_files(flist, _maxwait=None, _naplen=None):
|
|
data['waited'] = flist
|
|
return []
|
|
|
|
def _pubkeys_from_crt_files(flist):
|
|
data['pubkey_files'] = flist
|
|
return ["pubkey_from: %s" % f for f in flist]
|
|
|
|
def _iid_from_shared_config(path):
|
|
data['iid_from_shared_cfg'] = path
|
|
return 'i-my-azure-id'
|
|
|
|
def _apply_hostname_bounce(**kwargs):
|
|
data['apply_hostname_bounce'] = kwargs
|
|
|
|
if data.get('ovfcontent') is not None:
|
|
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
|
|
{'ovf-env.xml': data['ovfcontent']})
|
|
|
|
mod = DataSourceAzure
|
|
|
|
if data.get('dsdevs'):
|
|
self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
|
|
|
|
self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
|
|
(mod, 'write_files', _write_files),
|
|
(mod, 'wait_for_files', _wait_for_files),
|
|
(mod, 'pubkeys_from_crt_files',
|
|
_pubkeys_from_crt_files),
|
|
(mod, 'iid_from_shared_config',
|
|
_iid_from_shared_config),
|
|
(mod, 'apply_hostname_bounce',
|
|
_apply_hostname_bounce), ])
|
|
|
|
dsrc = mod.DataSourceAzureNet(
|
|
data.get('sys_cfg', {}), distro=None, paths=self.paths)
|
|
|
|
return dsrc
|
|
|
|
def test_basic_seed_dir(self):
|
|
odata = {'HostName': "myhost", 'UserName': "myuser"}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
|
|
'sys_cfg': {}}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(dsrc.userdata_raw, "")
|
|
self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
|
|
self.assertTrue('ovf-env.xml' in data['files'])
|
|
self.assertEqual(0700, data['datadir_mode'])
|
|
self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
|
|
|
|
def test_user_cfg_set_agent_command_plain(self):
|
|
# set dscfg in via plaintext
|
|
cfg = {'agent_command': "my_command"}
|
|
odata = {'HostName': "myhost", 'UserName': "myuser",
|
|
'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(data['agent_invoked'], cfg['agent_command'])
|
|
|
|
def test_user_cfg_set_agent_command(self):
|
|
# set dscfg in via base64 encoded yaml
|
|
cfg = {'agent_command': "my_command"}
|
|
odata = {'HostName': "myhost", 'UserName': "myuser",
|
|
'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
|
|
'encoding': 'base64'}}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(data['agent_invoked'], cfg['agent_command'])
|
|
|
|
def test_sys_cfg_set_agent_command(self):
|
|
sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data={}),
|
|
'sys_cfg': sys_cfg}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(data['agent_invoked'], '_COMMAND')
|
|
|
|
def test_username_used(self):
|
|
odata = {'HostName': "myhost", 'UserName': "myuser"}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(dsrc.cfg['system_info']['default_user']['name'],
|
|
"myuser")
|
|
|
|
def test_password_given(self):
|
|
odata = {'HostName': "myhost", 'UserName': "myuser",
|
|
'UserPassword': "mypass"}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertTrue('default_user' in dsrc.cfg['system_info'])
|
|
defuser = dsrc.cfg['system_info']['default_user']
|
|
|
|
# default user shoudl be updated for password and username
|
|
# and should not be locked.
|
|
self.assertEqual(defuser['name'], odata['UserName'])
|
|
self.assertEqual(defuser['password'], odata['UserPassword'])
|
|
self.assertFalse(defuser['lock_passwd'])
|
|
|
|
def test_userdata_found(self):
|
|
mydata = "FOOBAR"
|
|
odata = {'UserData': base64.b64encode(mydata)}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
self.assertEqual(dsrc.userdata_raw, mydata)
|
|
|
|
def test_no_datasource_expected(self):
|
|
#no source should be found if no seed_dir and no devs
|
|
data = {}
|
|
dsrc = self._get_ds({})
|
|
ret = dsrc.get_data()
|
|
self.assertFalse(ret)
|
|
self.assertFalse('agent_invoked' in data)
|
|
|
|
def test_cfg_has_pubkeys(self):
|
|
odata = {'HostName': "myhost", 'UserName': "myuser"}
|
|
mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
|
|
pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
|
|
pubkeys=pubkeys)}
|
|
|
|
dsrc = self._get_ds(data)
|
|
ret = dsrc.get_data()
|
|
self.assertTrue(ret)
|
|
for mypk in mypklist:
|
|
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
|
|
|
|
def test_disabled_bounce(self):
|
|
pass
|
|
|
|
def test_apply_bounce_call_1(self):
|
|
# hostname needs to get through to apply_hostname_bounce
|
|
mydata = "FOOBAR"
|
|
odata = {'HostName': 'my-random-hostname'}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
|
|
self._get_ds(data).get_data()
|
|
self.assertIn('hostname', data['apply_hostname_bounce'])
|
|
self.assertEqual(data['apply_hostname_bounce']['hostname'],
|
|
odata['HostName'])
|
|
|
|
def test_apply_bounce_call_configurable(self):
|
|
# hostname_bounce should be configurable in datasource cfg
|
|
cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
|
|
'command': 'my-bounce-command',
|
|
'hostname_command': 'my-hostname-command'}}
|
|
odata = {'HostName': "xhost",
|
|
'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
|
|
'encoding': 'base64'}}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
self._get_ds(data).get_data()
|
|
|
|
for k in cfg['hostname_bounce']:
|
|
self.assertIn(k, data['apply_hostname_bounce'])
|
|
|
|
for k, v in cfg['hostname_bounce'].items():
|
|
self.assertEqual(data['apply_hostname_bounce'][k], v)
|
|
|
|
def test_set_hostname_disabled(self):
|
|
# config specifying set_hostname off should not bounce
|
|
cfg = {'set_hostname': False}
|
|
odata = {'HostName': "xhost",
|
|
'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
|
|
'encoding': 'base64'}}
|
|
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
|
|
self._get_ds(data).get_data()
|
|
|
|
self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
|
|
|
|
|
|
class TestReadAzureOvf(MockerTestCase):
|
|
def test_invalid_xml_raises_non_azure_ds(self):
|
|
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
|
|
self.assertRaises(DataSourceAzure.NonAzureDataSource,
|
|
DataSourceAzure.read_azure_ovf, invalid_xml)
|
|
|
|
def test_load_with_pubkeys(self):
|
|
mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
|
|
pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
|
|
content = construct_valid_ovf_env(pubkeys=pubkeys)
|
|
(_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
|
|
for mypk in mypklist:
|
|
self.assertIn(mypk, cfg['_pubkeys'])
|
|
|
|
|
|
class TestReadAzureSharedConfig(MockerTestCase):
|
|
def test_valid_content(self):
|
|
xml = """<?xml version="1.0" encoding="utf-8"?>
|
|
<SharedConfig>
|
|
<Deployment name="MY_INSTANCE_ID">
|
|
<Service name="myservice"/>
|
|
<ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
|
|
</Deployment>
|
|
<Incarnation number="1"/>
|
|
</SharedConfig>"""
|
|
ret = DataSourceAzure.iid_from_shared_config_content(xml)
|
|
self.assertEqual("MY_INSTANCE_ID", ret)
|
|
|
|
|
|
def apply_patches(patches):
|
|
ret = []
|
|
for (ref, name, replace) in patches:
|
|
if replace is None:
|
|
continue
|
|
orig = getattr(ref, name)
|
|
setattr(ref, name, replace)
|
|
ret.append((ref, name, orig))
|
|
return ret
|