Add OpenNebula datasource.

This reads the context disk from OpenNebula.
This commit is contained in:
Scott Moser 2013-09-11 08:35:06 -04:00
commit 843d099752
8 changed files with 882 additions and 2 deletions

View File

@ -17,6 +17,7 @@
- prefer growpart resizer to 'parted resizepart' (LP: #1212492)
- support random data seed from config drive or azure, and a module
'seed_random' to read that and write it to /dev/urandom.
- add OpenNebula Datasource [Vlastimil Holer]
0.7.2:
- add a debian watch file
- add 'sudo' entry to ubuntu's default user (LP: #1080717)

View File

@ -31,6 +31,7 @@ CFG_BUILTIN = {
'datasource_list': [
'NoCloud',
'ConfigDrive',
'OpenNebula',
'Azure',
'AltCloud',
'OVF',

View File

@ -0,0 +1,442 @@
# vi: ts=4 expandtab
#
# Copyright (C) 2012 Canonical Ltd.
# Copyright (C) 2012 Yahoo! Inc.
# Copyright (C) 2012-2013 CERIT Scientific Cloud
# Copyright (C) 2012-2013 OpenNebula.org
#
# Author: Scott Moser <scott.moser@canonical.com>
# Author: Joshua Harlow <harlowja@yahoo-inc.com>
# Author: Vlastimil Holer <xholer@mail.muni.cz>
# Author: Javier Fontan <jfontan@opennebula.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import pwd
import re
import string # pylint: disable=W0402
from cloudinit import log as logging
from cloudinit import sources
from cloudinit import util
LOG = logging.getLogger(__name__)
DEFAULT_IID = "iid-dsopennebula"
DEFAULT_MODE = 'net'
DEFAULT_PARSEUSER = 'nobody'
CONTEXT_DISK_FILES = ["context.sh"]
VALID_DSMODES = ("local", "net", "disabled")
class DataSourceOpenNebula(sources.DataSource):
def __init__(self, sys_cfg, distro, paths):
sources.DataSource.__init__(self, sys_cfg, distro, paths)
self.dsmode = 'local'
self.seed = None
self.seed_dir = os.path.join(paths.seed_dir, 'opennebula')
def __str__(self):
root = sources.DataSource.__str__(self)
return "%s [seed=%s][dsmode=%s]" % (root, self.seed, self.dsmode)
def get_data(self):
defaults = {"instance-id": DEFAULT_IID}
results = None
seed = None
# decide parseuser for context.sh shell reader
parseuser = DEFAULT_PARSEUSER
if 'parseuser' in self.ds_cfg:
parseuser = self.ds_cfg.get('parseuser')
candidates = [self.seed_dir]
candidates.extend(find_candidate_devs())
for cdev in candidates:
try:
if os.path.isdir(self.seed_dir):
results = read_context_disk_dir(cdev, asuser=parseuser)
elif cdev.startswith("/dev"):
results = util.mount_cb(cdev, read_context_disk_dir,
data=parseuser)
except NonContextDiskDir:
continue
except BrokenContextDiskDir as exc:
raise exc
except util.MountFailedError:
LOG.warn("%s was not mountable" % cdev)
if results:
seed = cdev
LOG.debug("found datasource in %s", cdev)
break
if not seed:
return False
# merge fetched metadata with datasource defaults
md = results['metadata']
md = util.mergemanydict([md, defaults])
# check for valid user specified dsmode
user_dsmode = results['metadata'].get('DSMODE', None)
if user_dsmode not in VALID_DSMODES + (None,):
LOG.warn("user specified invalid mode: %s", user_dsmode)
user_dsmode = None
# decide dsmode
if user_dsmode:
dsmode = user_dsmode
elif self.ds_cfg.get('dsmode'):
dsmode = self.ds_cfg.get('dsmode')
else:
dsmode = DEFAULT_MODE
if dsmode == "disabled":
# most likely user specified
return False
# apply static network configuration only in 'local' dsmode
if ('network-interfaces' in results and self.dsmode == "local"):
LOG.debug("Updating network interfaces from %s", self)
self.distro.apply_network(results['network-interfaces'])
if dsmode != self.dsmode:
LOG.debug("%s: not claiming datasource, dsmode=%s", self, dsmode)
return False
self.seed = seed
self.metadata = md
self.userdata_raw = results.get('userdata')
return True
def get_hostname(self, fqdn=False, resolve_ip=None):
if resolve_ip is None:
if self.dsmode == 'net':
resolve_ip = True
else:
resolve_ip = False
return sources.DataSource.get_hostname(self, fqdn, resolve_ip)
class DataSourceOpenNebulaNet(DataSourceOpenNebula):
def __init__(self, sys_cfg, distro, paths):
DataSourceOpenNebula.__init__(self, sys_cfg, distro, paths)
self.dsmode = 'net'
class NonContextDiskDir(Exception):
pass
class BrokenContextDiskDir(Exception):
pass
class OpenNebulaNetwork(object):
REG_DEV_MAC = re.compile(
r'^\d+: (eth\d+):.*?link\/ether (..:..:..:..:..:..) ?',
re.MULTILINE | re.DOTALL)
def __init__(self, ip, context):
self.ip = ip
self.context = context
self.ifaces = self.get_ifaces()
def get_ifaces(self):
return self.REG_DEV_MAC.findall(self.ip)
def mac2ip(self, mac):
components = mac.split(':')[2:]
return [str(int(c, 16)) for c in components]
def get_ip(self, dev, components):
var_name = dev.upper() + '_IP'
if var_name in self.context:
return self.context[var_name]
else:
return '.'.join(components)
def get_mask(self, dev):
var_name = dev.upper() + '_MASK'
if var_name in self.context:
return self.context[var_name]
else:
return '255.255.255.0'
def get_network(self, dev, components):
var_name = dev.upper() + '_NETWORK'
if var_name in self.context:
return self.context[var_name]
else:
return '.'.join(components[:-1]) + '.0'
def get_gateway(self, dev):
var_name = dev.upper() + '_GATEWAY'
if var_name in self.context:
return self.context[var_name]
else:
return None
def get_dns(self, dev):
var_name = dev.upper() + '_DNS'
if var_name in self.context:
return self.context[var_name]
else:
return None
def get_domain(self, dev):
var_name = dev.upper() + '_DOMAIN'
if var_name in self.context:
return self.context[var_name]
else:
return None
def gen_conf(self):
global_dns = []
if 'DNS' in self.context:
global_dns.append(self.context['DNS'])
conf = []
conf.append('auto lo')
conf.append('iface lo inet loopback')
conf.append('')
for i in self.ifaces:
dev = i[0]
mac = i[1]
ip_components = self.mac2ip(mac)
conf.append('auto ' + dev)
conf.append('iface ' + dev + ' inet static')
conf.append(' address ' + self.get_ip(dev, ip_components))
conf.append(' network ' + self.get_network(dev, ip_components))
conf.append(' netmask ' + self.get_mask(dev))
gateway = self.get_gateway(dev)
if gateway:
conf.append(' gateway ' + gateway)
domain = self.get_domain(dev)
if domain:
conf.append(' dns-search ' + domain)
# add global DNS servers to all interfaces
dns = self.get_dns(dev)
if global_dns or dns:
all_dns = global_dns
if dns:
all_dns.append(dns)
conf.append(' dns-nameservers ' + ' '.join(all_dns))
conf.append('')
return "\n".join(conf)
def find_candidate_devs():
"""
Return a list of devices that may contain the context disk.
"""
combined = []
for f in ('LABEL=CONTEXT', 'LABEL=CDROM', 'TYPE=iso9660'):
devs = util.find_devs_with(f)
devs.sort()
for d in devs:
if d not in combined:
combined.append(d)
return combined
def switch_user_cmd(user):
return ['sudo', '-u', user]
def parse_shell_config(content, keylist=None, bash=None, asuser=None,
switch_user_cb=None):
if isinstance(bash, str):
bash = [bash]
elif bash is None:
bash = ['bash', '-e']
if switch_user_cb is None:
switch_user_cb = switch_user_cmd
# allvars expands to all existing variables by using '${!x*}' notation
# where x is lower or upper case letters or '_'
allvars = ["${!%s*}" % x for x in string.letters + "_"]
keylist_in = keylist
if keylist is None:
keylist = allvars
keylist_in = []
setup = '\n'.join(('__v="";', '',))
def varprinter(vlist):
# output '\0'.join(['_start_', key=value NULL for vars in vlist]
return '\n'.join((
'printf "%s\\0" _start_',
'for __v in %s; do' % ' '.join(vlist),
' printf "%s=%s\\0" "$__v" "${!__v}";',
'done',
''
))
# the rendered 'bcmd' is bash syntax that does
# setup: declare variables we use (so they show up in 'all')
# varprinter(allvars): print all variables known at beginning
# content: execute the provided content
# varprinter(keylist): print all variables known after content
#
# output is then a null terminated array of:
# literal '_start_'
# key=value (for each preset variable)
# literal '_start_'
# key=value (for each post set variable)
bcmd = ('unset IFS\n' +
setup +
varprinter(allvars) +
'{\n%s\n\n:\n} > /dev/null\n' % content +
'unset IFS\n' +
varprinter(keylist) + "\n")
cmd = []
if asuser is not None:
cmd = switch_user_cb(asuser)
cmd.extend(bash)
(output, _error) = util.subp(cmd, data=bcmd)
# exclude vars in bash that change on their own or that we used
excluded = ("RANDOM", "LINENO", "_", "__v")
preset = {}
ret = {}
target = None
output = output[0:-1] # remove trailing null
# go through output. First _start_ is for 'preset', second for 'target'.
# Add to target only things were changed and not in volitile
for line in output.split("\x00"):
try:
(key, val) = line.split("=", 1)
if target is preset:
target[key] = val
elif (key not in excluded and
(key in keylist_in or preset.get(key) != val)):
ret[key] = val
except ValueError:
if line != "_start_":
raise
if target is None:
target = preset
elif target is preset:
target = ret
return ret
def read_context_disk_dir(source_dir, asuser=None):
"""
read_context_disk_dir(source_dir):
read source_dir and return a tuple with metadata dict and user-data
string populated. If not a valid dir, raise a NonContextDiskDir
"""
found = {}
for af in CONTEXT_DISK_FILES:
fn = os.path.join(source_dir, af)
if os.path.isfile(fn):
found[af] = fn
if not found:
raise NonContextDiskDir("%s: %s" % (source_dir, "no files found"))
context = {}
results = {'userdata': None, 'metadata': {}}
if "context.sh" in found:
if asuser is not None:
try:
pwd.getpwnam(asuser)
except KeyError as e:
raise BrokenContextDiskDir("configured user '%s' "
"does not exist", asuser)
try:
with open(os.path.join(source_dir, 'context.sh'), 'r') as f:
content = f.read().strip()
context = parse_shell_config(content, asuser=asuser)
except util.ProcessExecutionError as e:
raise BrokenContextDiskDir("Error processing context.sh: %s" % (e))
except IOError as e:
raise NonContextDiskDir("Error reading context.sh: %s" % (e))
else:
raise NonContextDiskDir("Missing context.sh")
if not context:
return results
results['metadata'] = context
# process single or multiple SSH keys
ssh_key_var = None
if "SSH_KEY" in context:
ssh_key_var = "SSH_KEY"
elif "SSH_PUBLIC_KEY" in context:
ssh_key_var = "SSH_PUBLIC_KEY"
if ssh_key_var:
lines = context.get(ssh_key_var).splitlines()
results['metadata']['public-keys'] = [l for l in lines
if len(l) and not l.startswith("#")]
# custom hostname -- try hostname or leave cloud-init
# itself create hostname from IP address later
for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
if k in context:
results['metadata']['local-hostname'] = context[k]
break
# raw user data
if "USER_DATA" in context:
results['userdata'] = context["USER_DATA"]
elif "USERDATA" in context:
results['userdata'] = context["USERDATA"]
# generate static /etc/network/interfaces
# only if there are any required context variables
# http://opennebula.org/documentation:rel3.8:cong#network_configuration
for k in context.keys():
if re.match(r'^ETH\d+_IP$', k):
(out, _) = util.subp(['/sbin/ip', 'link'])
net = OpenNebulaNetwork(out, context)
results['network-interfaces'] = net.gen_conf()
break
return results
# Used to match classes to dependencies
datasources = [
(DataSourceOpenNebula, (sources.DEP_FILESYSTEM, )),
(DataSourceOpenNebulaNet, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)),
]
# Return a list of data sources that match this set of dependencies
def get_datasource_list(depends):
return sources.list_from_depends(depends, datasources)

View File

@ -53,9 +53,16 @@ class DataSource(object):
self.userdata = None
self.metadata = None
self.userdata_raw = None
# find the datasource config name.
# remove 'DataSource' from classname on front, and remove 'Net' on end.
# Both Foo and FooNet sources expect config in cfg['sources']['Foo']
name = type_utils.obj_name(self)
if name.startswith(DS_PREFIX):
name = name[len(DS_PREFIX):]
if name.endswith('Net'):
name = name[0:-3]
self.ds_cfg = util.get_cfg_by_path(self.sys_cfg,
("datasource", name), {})
if not ud_proc:
@ -144,7 +151,7 @@ class DataSource(object):
return "iid-datasource"
return str(self.metadata['instance-id'])
def get_hostname(self, fqdn=False):
def get_hostname(self, fqdn=False, resolve_ip=False):
defdomain = "localdomain"
defhost = "localhost"
domain = defdomain
@ -168,7 +175,14 @@ class DataSource(object):
# make up a hostname (LP: #475354) in format ip-xx.xx.xx.xx
lhost = self.metadata['local-hostname']
if util.is_ipv4(lhost):
toks = ["ip-%s" % lhost.replace(".", "-")]
toks = []
if resolve_ip:
toks = util.gethostbyaddr(lhost)
if toks:
toks = str(toks).split('.')
else:
toks = ["ip-%s" % lhost.replace(".", "-")]
else:
toks = lhost.split(".")

View File

@ -955,6 +955,13 @@ def get_hostname():
return hostname
def gethostbyaddr(ip):
try:
return socket.gethostbyaddr(ip)[0]
except socket.herror:
return None
def is_resolvable_url(url):
"""determine if this url is resolvable (existing or ip)."""
return (is_resolvable(urlparse.urlparse(url).hostname))

View File

@ -140,6 +140,12 @@ Config Drive
.. include:: ../../sources/configdrive/README.rst
---------------------------
OpenNebula
---------------------------
.. include:: ../../sources/opennebula/README.rst
---------------------------
Alt cloud
---------------------------

View File

@ -0,0 +1,142 @@
The `OpenNebula`_ (ON) datasource supports the contextualization disk.
See `contextualization overview`_, `contextualizing VMs`_ and
`network configuration`_ in the public documentation for
more information.
OpenNebula's virtual machines are contextualized (parametrized) by
CD-ROM image, which contains a shell script *context.sh* with
custom variables defined on virtual machine start. There are no
fixed contextualization variables, but the datasource accepts
many used and recommended across the documentation.
Datasource configuration
~~~~~~~~~~~~~~~~~~~~~~~~~
Datasource accepts following configuration options.
::
dsmode:
values: local, net, disabled
default: net
Tells if this datasource will be processed in 'local' (pre-networking) or
'net' (post-networking) stage or even completely 'disabled'.
::
parseuser:
default: nobody
Unprivileged system user used for contextualization script
processing.
Contextualization disk
~~~~~~~~~~~~~~~~~~~~~~
The following criteria are required:
1. Must be formatted with `iso9660`_ filesystem
or have a *filesystem* label of **CONTEXT** or **CDROM**
2. Must contain file *context.sh* with contextualization variables.
File is generated by OpenNebula, it has a KEY='VALUE' format and
can be easily read by bash
Contextualization variables
~~~~~~~~~~~~~~~~~~~~~~~~~~~
There are no fixed contextualization variables in OpenNebula, no standard.
Following variables were found on various places and revisions of
the OpenNebula documentation. Where multiple similar variables are
specified, only first found is taken.
::
DSMODE
Datasource mode configuration override. Values: local, net, disabled.
::
DNS
ETH<x>_IP
ETH<x>_NETWORK
ETH<x>_MASK
ETH<x>_GATEWAY
ETH<x>_DOMAIN
ETH<x>_DNS
Static `network configuration`_.
::
HOSTNAME
Instance hostname.
::
PUBLIC_IP
IP_PUBLIC
ETH0_IP
If no hostname has been specified, cloud-init will try to create hostname
from instance's IP address in 'local' dsmode. In 'net' dsmode, cloud-init
tries to resolve one of its IP addresses to get hostname.
::
SSH_KEY
SSH_PUBLIC_KEY
One or multiple SSH keys (separated by newlines) can be specified.
::
USER_DATA
USERDATA
cloud-init user data.
Example configuration
~~~~~~~~~~~~~~~~~~~~~
This example cloud-init configuration (*cloud.cfg*) enables
OpenNebula datasource only in 'net' mode.
::
disable_ec2_metadata: True
datasource_list: ['OpenNebula']
datasource:
OpenNebula:
dsmode: net
parseuser: nobody
Example VM's context section
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::
CONTEXT=[
PUBLIC_IP="$NIC[IP]",
SSH_KEY="$USER[SSH_KEY]
$USER[SSH_KEY1]
$USER[SSH_KEY2] ",
USER_DATA="#cloud-config
# see https://help.ubuntu.com/community/CloudInit
packages: []
mounts:
- [vdc,none,swap,sw,0,0]
runcmd:
- echo 'Instance has been configured by cloud-init.' | wall
" ]
.. _OpenNebula: http://opennebula.org/
.. _contextualization overview: http://opennebula.org/documentation:documentation:context_overview
.. _contextualizing VMs: http://opennebula.org/documentation:documentation:cong
.. _network configuration: http://opennebula.org/documentation:documentation:cong#network_configuration
.. _iso9660: https://en.wikipedia.org/wiki/ISO_9660

View File

@ -0,0 +1,267 @@
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import helpers
from cloudinit import util
from mocker import MockerTestCase
from tests.unittests.helpers import populate_dir
import os
import pwd
TEST_VARS = {
'VAR1': 'single',
'VAR2': 'double word',
'VAR3': 'multi\nline\n',
'VAR4': "'single'",
'VAR5': "'double word'",
'VAR6': "'multi\nline\n'",
'VAR7': 'single\\t',
'VAR8': 'double\\tword',
'VAR9': 'multi\\t\nline\n',
'VAR10': '\\', # expect \
'VAR11': '\'', # expect '
'VAR12': '$', # expect $
}
INVALID_CONTEXT = ';'
USER_DATA = '#cloud-config\napt_upgrade: true'
SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
HOSTNAME = 'foo.example.com'
PUBLIC_IP = '10.0.0.3'
CMD_IP_OUT = '''\
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
'''
class TestOpenNebulaDataSource(MockerTestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
self.tmp = self.makeDir()
self.paths = helpers.Paths({'cloud_dir': self.tmp})
# defaults for few tests
self.ds = ds.DataSourceOpenNebula
self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
# we don't want 'sudo' called in tests. so we patch switch_user_cmd
def my_switch_user_cmd(user):
self.parsed_user = user
return []
self.switch_user_cmd_real = ds.switch_user_cmd
ds.switch_user_cmd = my_switch_user_cmd
def tearDown(self):
ds.switch_user_cmd = self.switch_user_cmd_real
super(TestOpenNebulaDataSource, self).tearDown()
def test_get_data_non_contextdisk(self):
orig_find_devs_with = util.find_devs_with
try:
# dont' try to lookup for CDs
util.find_devs_with = lambda n: []
dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertFalse(ret)
finally:
util.find_devs_with = orig_find_devs_with
def test_get_data_broken_contextdisk(self):
orig_find_devs_with = util.find_devs_with
try:
# dont' try to lookup for CDs
util.find_devs_with = lambda n: []
populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
finally:
util.find_devs_with = orig_find_devs_with
def test_get_data_invalid_identity(self):
orig_find_devs_with = util.find_devs_with
try:
# generate non-existing system user name
sys_cfg = self.sys_cfg
invalid_user = 'invalid'
while not sys_cfg['datasource']['OpenNebula'].get('parseuser'):
try:
pwd.getpwnam(invalid_user)
invalid_user += 'X'
except KeyError:
sys_cfg['datasource']['OpenNebula']['parseuser'] = \
invalid_user
# dont' try to lookup for CDs
util.find_devs_with = lambda n: []
populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
finally:
util.find_devs_with = orig_find_devs_with
def test_get_data(self):
orig_find_devs_with = util.find_devs_with
try:
# dont' try to lookup for CDs
util.find_devs_with = lambda n: []
populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
finally:
util.find_devs_with = orig_find_devs_with
def test_seed_dir_non_contextdisk(self):
self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
self.seed_dir)
def test_seed_dir_empty1_context(self):
populate_dir(self.seed_dir, {'context.sh': ''})
results = ds.read_context_disk_dir(self.seed_dir)
self.assertEqual(results['userdata'], None)
self.assertEqual(results['metadata'], {})
def test_seed_dir_empty2_context(self):
populate_context_dir(self.seed_dir, {})
results = ds.read_context_disk_dir(self.seed_dir)
self.assertEqual(results['userdata'], None)
self.assertEqual(results['metadata'], {})
def test_seed_dir_broken_context(self):
populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
self.assertRaises(ds.BrokenContextDiskDir,
ds.read_context_disk_dir,
self.seed_dir)
def test_context_parser(self):
populate_context_dir(self.seed_dir, TEST_VARS)
results = ds.read_context_disk_dir(self.seed_dir)
self.assertTrue('metadata' in results)
self.assertEqual(TEST_VARS, results['metadata'])
def test_ssh_key(self):
public_keys = ['first key', 'second key']
for c in range(4):
for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
populate_context_dir(my_d, {k: '\n'.join(public_keys)})
results = ds.read_context_disk_dir(my_d)
self.assertTrue('metadata' in results)
self.assertTrue('public-keys' in results['metadata'])
self.assertEqual(public_keys,
results['metadata']['public-keys'])
public_keys.append(SSH_KEY % (c + 1,))
def test_user_data(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: USER_DATA})
results = ds.read_context_disk_dir(my_d)
self.assertTrue('userdata' in results)
self.assertEqual(USER_DATA, results['userdata'])
def test_hostname(self):
for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: PUBLIC_IP})
results = ds.read_context_disk_dir(my_d)
self.assertTrue('metadata' in results)
self.assertTrue('local-hostname' in results['metadata'])
self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
def test_network_interfaces(self):
populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
results = ds.read_context_disk_dir(self.seed_dir)
self.assertTrue('network-interfaces' in results)
def test_find_candidates(self):
def my_devs_with(criteria):
return {
"LABEL=CONTEXT": ["/dev/sdb"],
"LABEL=CDROM": ["/dev/sr0"],
"TYPE=iso9660": ["/dev/vdb"],
}.get(criteria, [])
orig_find_devs_with = util.find_devs_with
try:
util.find_devs_with = my_devs_with
self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"],
ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
class TestOpenNebulaNetwork(MockerTestCase):
def setUp(self):
super(TestOpenNebulaNetwork, self).setUp()
def test_lo(self):
net = ds.OpenNebulaNetwork('', {})
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
''')
def test_eth0(self):
net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
auto eth0
iface eth0 inet static
address 10.18.1.1
network 10.18.1.0
netmask 255.255.255.0
''')
def test_eth0_override(self):
context = {
'DNS': '1.2.3.8',
'ETH0_IP': '1.2.3.4',
'ETH0_NETWORK': '1.2.3.0',
'ETH0_MASK': '255.255.0.0',
'ETH0_GATEWAY': '1.2.3.5',
'ETH0_DOMAIN': 'example.com',
'ETH0_DNS': '1.2.3.6 1.2.3.7'
}
net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
auto eth0
iface eth0 inet static
address 1.2.3.4
network 1.2.3.0
netmask 255.255.0.0
gateway 1.2.3.5
dns-search example.com
dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
''')
def populate_context_dir(path, variables):
data = "# Context variables generated by OpenNebula\n"
for (k, v) in variables.iteritems():
data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
populate_dir(path, {'context.sh': data})
# vi: ts=4 expandtab