Add a bunch of new tests

This commit is contained in:
Joshua Harlow 2014-02-07 16:34:04 -08:00
parent f6c2b3e6b5
commit 16be954cd6
3 changed files with 190 additions and 16 deletions

View File

@ -121,7 +121,8 @@ class DataSourceOpenStack(openstack.SourceMixin, sources.DataSource):
'Crawl of openstack metadata service', 'Crawl of openstack metadata service',
read_metadata_service, read_metadata_service,
args=[self.metadata_address], args=[self.metadata_address],
kwargs={'ssl_details': self.ssl_details}) kwargs={'ssl_details': self.ssl_details,
'version': openstack.OS_LATEST})
except openstack.NonReadable: except openstack.NonReadable:
return False return False
except openstack.BrokenMetadata: except openstack.BrokenMetadata:

View File

@ -29,6 +29,12 @@ if (_PY_MAJOR, _PY_MINOR) <= (2, 6):
standardMsg = standardMsg % (member, container) standardMsg = standardMsg % (member, container)
self.fail(self._formatMessage(msg, standardMsg)) self.fail(self._formatMessage(msg, standardMsg))
def assertIsNone(self, value, msg=None):
if value is not None:
standardMsg = '%r is not None'
standardMsg = standardMsg % (value)
self.fail(self._formatMessage(msg, standardMsg))
else: else:
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
pass pass

View File

@ -1,12 +1,33 @@
import re # vi: ts=4 expandtab
#
# Copyright (C) 2014 Yahoo! Inc.
#
# Author: Joshua Harlow <harlowja@yahoo-inc.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import json import json
import re
from StringIO import StringIO from StringIO import StringIO
from urlparse import urlparse from urlparse import urlparse
from tests.unittests import helpers from tests.unittests import helpers as test_helpers
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceOpenStack as ds from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources.helpers import openstack from cloudinit.sources.helpers import openstack
from cloudinit import util from cloudinit import util
@ -17,7 +38,7 @@ BASE_URL = "http://169.254.169.254"
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = { EC2_META = {
'ami-id': 'ami-00000001', 'ami-id': 'ami-00000001',
'ami-launch-index': 0, 'ami-launch-index': '0',
'ami-manifest-path': 'FIXME', 'ami-manifest-path': 'FIXME',
'hostname': 'sm-foo-test.novalocal', 'hostname': 'sm-foo-test.novalocal',
'instance-action': 'none', 'instance-action': 'none',
@ -59,15 +80,17 @@ EC2_FILES = {
} }
def _register_uris(version): def _register_uris(version, ec2_files, ec2_meta, os_files):
"""Registers a set of url patterns into httpretty that will mimic the
same data returned by the openstack metadata service (and ec2 service)."""
def match_ec2_url(uri, headers): def match_ec2_url(uri, headers):
path = uri.path.lstrip("/") path = uri.path.lstrip("/")
if path in EC2_FILES: if path in ec2_files:
return (200, headers, EC2_FILES.get(path)) return (200, headers, ec2_files.get(path))
if path == 'latest/meta-data': if path == 'latest/meta-data':
buf = StringIO() buf = StringIO()
for (k, v) in EC2_META.items(): for (k, v) in ec2_meta.items():
if isinstance(v, (list, tuple)): if isinstance(v, (list, tuple)):
buf.write("%s/" % (k)) buf.write("%s/" % (k))
else: else:
@ -79,10 +102,10 @@ def _register_uris(version):
pieces = path.split("/") pieces = path.split("/")
if path.endswith("/"): if path.endswith("/"):
pieces = pieces[2:-1] pieces = pieces[2:-1]
value = util.get_cfg_by_path(EC2_META, pieces) value = util.get_cfg_by_path(ec2_meta, pieces)
else: else:
pieces = pieces[2:] pieces = pieces[2:]
value = util.get_cfg_by_path(EC2_META, pieces) value = util.get_cfg_by_path(ec2_meta, pieces)
if value is not None: if value is not None:
return (200, headers, str(value)) return (200, headers, str(value))
return (404, headers, '') return (404, headers, '')
@ -90,14 +113,14 @@ def _register_uris(version):
def get_request_callback(method, uri, headers): def get_request_callback(method, uri, headers):
uri = urlparse(uri) uri = urlparse(uri)
path = uri.path.lstrip("/") path = uri.path.lstrip("/")
if path in OS_FILES: if path in os_files:
return (200, headers, OS_FILES.get(path)) return (200, headers, os_files.get(path))
return match_ec2_url(uri, headers) return match_ec2_url(uri, headers)
def head_request_callback(method, uri, headers): def head_request_callback(method, uri, headers):
uri = urlparse(uri) uri = urlparse(uri)
path = uri.path.lstrip("/") path = uri.path.lstrip("/")
for key in OS_FILES.keys(): for key in os_files.keys():
if key.startswith(path): if key.startswith(path):
return (200, headers, '') return (200, headers, '')
return (404, headers, '') return (404, headers, '')
@ -109,14 +132,158 @@ def _register_uris(version):
body=head_request_callback) body=head_request_callback)
class TestOpenStackDataSource(helpers.TestCase): class TestOpenStackDataSource(test_helpers.TestCase):
VERSION = 'latest' VERSION = 'latest'
@hp.activate @hp.activate
def test_fetch(self): def test_successful(self):
_register_uris(self.VERSION) _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertEquals(2, len(f['files']))
self.assertEquals(USER_DATA, f.get('userdata'))
self.assertEquals(EC2_META, f.get('ec2-metadata'))
self.assertEquals(2, f.get('version'))
metadata = f['metadata']
self.assertEquals('nova', metadata.get('availability_zone'))
self.assertEquals('sm-foo-test.novalocal', metadata.get('hostname'))
self.assertEquals('sm-foo-test.novalocal',
metadata.get('local-hostname'))
self.assertEquals('sm-foo-test', metadata.get('name'))
self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
metadata.get('uuid'))
self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
metadata.get('instance-id'))
@hp.activate
def test_no_ec2(self):
_register_uris(self.VERSION, {}, {}, OS_FILES)
f = ds.read_metadata_service(BASE_URL, version=self.VERSION) f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
self.assertEquals(VENDOR_DATA, f.get('vendordata')) self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertEquals(USER_DATA, f.get('userdata')) self.assertEquals(USER_DATA, f.get('userdata'))
self.assertEquals({}, f.get('ec2-metadata'))
self.assertEquals(2, f.get('version'))
@hp.activate
def test_bad_metadata(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('meta_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.NonReadable, ds.read_metadata_service,
BASE_URL, version=self.VERSION)
@hp.activate
def test_bad_uuid(self):
os_files = copy.deepcopy(OS_FILES)
os_meta = copy.deepcopy(OSTACK_META)
os_meta.pop('uuid')
for k in list(os_files.keys()):
if k.endswith('meta_data.json'):
os_files[k] = json.dumps(os_meta)
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
BASE_URL, version=self.VERSION)
@hp.activate
def test_userdata_empty(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('user_data'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('userdata'))
@hp.activate
def test_vendordata_empty(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('vendor_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('vendordata'))
@hp.activate
def test_vendordata_invalid(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('vendor_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
BASE_URL, version=self.VERSION)
@hp.activate
def test_metadata_invalid(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('meta_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
BASE_URL, version=self.VERSION)
@hp.activate
def test_datasource(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertTrue(found)
self.assertEquals(2, ds_os.version)
md = dict(ds_os.metadata)
md.pop('instance-id', None)
md.pop('local-hostname', None)
self.assertEquals(OSTACK_META, md)
self.assertEquals(EC2_META, ds_os.ec2_metadata)
self.assertEquals(USER_DATA, ds_os.userdata_raw)
self.assertEquals(2, len(ds_os.files))
self.assertEquals(VENDOR_DATA, ds_os.vendordata_raw)
@hp.activate
def test_bad_datasource_meta(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('meta_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertFalse(found)
self.assertIsNone(ds_os.version)
@hp.activate
def test_no_datasource(self):
os_files = copy.deepcopy(OS_FILES)
for k in list(os_files.keys()):
if k.endswith('meta_data.json'):
os_files.pop(k)
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
helpers.Paths({}))
ds_os.ds_cfg = {
'max_wait': 0,
'timeout': 0,
}
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertFalse(found)
self.assertIsNone(ds_os.version)