Add test + remove jsonschema (for now)

This commit is contained in:
Joshua Harlow 2013-09-08 22:36:28 -07:00
parent 2206cdb92d
commit ef88386038
3 changed files with 153 additions and 54 deletions

View File

@ -34,7 +34,3 @@ boto
# For patching pieces of cloud-config together
jsonpatch
# For validating that a config modules needed configuration specified
# in a correct format that the module can understand
jsonschema

View File

@ -19,64 +19,18 @@
import base64
from StringIO import StringIO
import jsonschema
from jsonschema import exceptions as js_exc
from cloudinit import exceptions as exc
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
frequency = PER_INSTANCE
schema = {
'type': 'object',
'properties': {
"random_seed": {
"type": "object",
"oneOf": [
{"$ref": "#/definitions/random_seed"},
],
},
},
"required": ["random_seed"],
"additionalProperties": True,
"definitions": {
'random_seed': {
'type': 'object',
"properties" : {
'data': {
'type': "string",
},
'file': {
'type': 'string',
},
'encoding': {
"enum": ["base64", 'gzip', 'b64', 'gz', ''],
},
},
"additionalProperties": True,
},
},
}
def validate(cfg):
"""Method that can be used to ask if the given configuration will be
accepted as valid by this module, without having to actually activate this
module."""
if not cfg or "random_seed" not in cfg:
return
try:
jsonschema.validate(cfg, schema)
except js_exc.ValidationError as e:
raise exc.FormatValidationError("Invalid configuration: %s" % str(e))
def _decode(data, encoding=None):
if not encoding:
return data
if not data:
return ''
if encoding.lower() in ['base64', 'b64']:
if not encoding or encoding.lower() in ['raw']:
return data
elif encoding.lower() in ['base64', 'b64']:
return base64.b64decode(data)
elif encoding.lower() in ['gzip', 'gz']:
return util.decomp_gzip(data, quiet=False)
@ -90,7 +44,6 @@ def handle(name, cfg, cloud, log, _args):
"no 'random_seed' configuration found"), name)
return
validate(cfg)
my_cfg = cfg['random_seed']
seed_path = my_cfg.get('file', '/dev/urandom')
seed_buf = StringIO()

View File

@ -0,0 +1,150 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Juerg Haefliger <juerg.haefliger@hp.com>
#
# Based on test_handler_set_hostname.py
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cloudinit.config import cc_seed_random
import base64
import tempfile
import gzip
from StringIO import StringIO
from cloudinit import cloud
from cloudinit import distros
from cloudinit import helpers
from cloudinit import util
from cloudinit.sources import DataSourceNone
from tests.unittests import helpers as t_help
import logging
LOG = logging.getLogger(__name__)
class TestRandomSeed(t_help.TestCase):
def setUp(self):
super(TestRandomSeed, self).setUp()
self._seed_file = tempfile.mktemp()
def tearDown(self):
util.del_file(self._seed_file)
def _compress(self, text):
contents = StringIO()
gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
gz_fh.write(text)
gz_fh.close()
return contents.getvalue()
def _get_cloud(self, distro, metadata=None):
paths = helpers.Paths({})
cls = distros.fetch(distro)
ubuntu_distro = cls(distro, {}, paths)
ds = DataSourceNone.DataSourceNone({}, ubuntu_distro, paths)
if metadata:
ds.metadata = metadata
return cloud.Cloud(ds, paths, {}, ubuntu_distro, None)
def test_append_random(self):
cfg = {
'random_seed': {
'file': self._seed_file,
'data': 'tiny-tim-was-here',
}
}
cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals("tiny-tim-was-here", contents)
def test_append_random_unknown_encoding(self):
data = self._compress("tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
'data': data,
'encoding': 'special_encoding',
}
}
self.assertRaises(IOError, cc_seed_random.handle, 'test', cfg,
self._get_cloud('ubuntu'), LOG, [])
def test_append_random_gzip(self):
data = self._compress("tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
'data': data,
'encoding': 'gzip',
}
}
cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals("tiny-toe", contents)
def test_append_random_gz(self):
data = self._compress("big-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
'data': data,
'encoding': 'gz',
}
}
cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals("big-toe", contents)
def test_append_random_base64(self):
data = base64.b64encode('bubbles')
cfg = {
'random_seed': {
'file': self._seed_file,
'data': data,
'encoding': 'base64',
}
}
cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals("bubbles", contents)
def test_append_random_b64(self):
data = base64.b64encode('kit-kat')
cfg = {
'random_seed': {
'file': self._seed_file,
'data': data,
'encoding': 'b64',
}
}
cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals("kit-kat", contents)
def test_append_random_metadata(self):
cfg = {
'random_seed': {
'file': self._seed_file,
'data': 'tiny-tim-was-here',
}
}
c = self._get_cloud('ubuntu', {'random_seed': '-so-was-josh'})
cc_seed_random.handle('test', cfg, c, LOG, [])
contents = util.load_file(self._seed_file)
self.assertEquals('tiny-tim-was-here-so-was-josh', contents)