readurl, read_file_or_url returns bytes, user must convert as necessary
* explicitly test compressed user-data. * userdata_raw is now bytes * add load_tfile_or_url for loading text file or url * ec2_utils: all meta-data is text, remove non-obvious string translations * DigitalOcean: adjust for ec2_utils * DataSourceGCE, DataSourceMAAS: user-data is binary other fields are text. * openstack.py: read paths without decoding to text. This is ok as paths other than user-data are json, and load_json will handle * load_file still returns text, and that is what most things use.
This commit is contained in:
commit
af5c00b27a
@ -26,6 +26,7 @@
|
||||
- Azure: utilze gpt support for ephemeral formating [Daniel Watkins]
|
||||
- CloudStack: support fetching password from virtual router [Daniel Watkins]
|
||||
(LP: #1422388)
|
||||
- readurl, read_file_or_url returns bytes, user must convert as necessary
|
||||
0.7.6:
|
||||
- open 0.7.6
|
||||
- Enable vendordata on CloudSigma datasource (LP: #1303986)
|
||||
|
@ -505,6 +505,8 @@ def status_wrapper(name, args, data_d=None, link_d=None):
|
||||
v1[mode]['errors'] = [str(e) for e in errors]
|
||||
|
||||
except Exception as e:
|
||||
util.logexc(LOG, "failed of stage %s", mode)
|
||||
print_exc("failed run of stage %s", mode)
|
||||
v1[mode]['errors'] = [str(e)]
|
||||
|
||||
v1[mode]['finished'] = time.time()
|
||||
|
@ -41,6 +41,10 @@ class MetadataLeafDecoder(object):
|
||||
def __call__(self, field, blob):
|
||||
if not blob:
|
||||
return blob
|
||||
try:
|
||||
blob = util.decode_binary(blob)
|
||||
except UnicodeDecodeError:
|
||||
return blob
|
||||
if self._maybe_json_object(blob):
|
||||
try:
|
||||
# Assume it's json, unless it fails parsing...
|
||||
@ -69,6 +73,8 @@ class MetadataMaterializer(object):
|
||||
def _parse(self, blob):
|
||||
leaves = {}
|
||||
children = []
|
||||
blob = util.decode_binary(blob)
|
||||
|
||||
if not blob:
|
||||
return (leaves, children)
|
||||
|
||||
@ -117,12 +123,12 @@ class MetadataMaterializer(object):
|
||||
child_url = url_helper.combine_url(base_url, c)
|
||||
if not child_url.endswith("/"):
|
||||
child_url += "/"
|
||||
child_blob = str(self._caller(child_url))
|
||||
child_blob = self._caller(child_url)
|
||||
child_contents[c] = self._materialize(child_blob, child_url)
|
||||
leaf_contents = {}
|
||||
for (field, resource) in leaves.items():
|
||||
leaf_url = url_helper.combine_url(base_url, resource)
|
||||
leaf_blob = self._caller(leaf_url).contents
|
||||
leaf_blob = self._caller(leaf_url)
|
||||
leaf_contents[field] = self._leaf_decoder(field, leaf_blob)
|
||||
joined = {}
|
||||
joined.update(child_contents)
|
||||
@ -179,11 +185,13 @@ def get_instance_metadata(api_version='latest',
|
||||
caller = functools.partial(util.read_file_or_url,
|
||||
ssl_details=ssl_details, timeout=timeout,
|
||||
retries=retries)
|
||||
def mcaller(url):
|
||||
return caller(url).contents
|
||||
|
||||
try:
|
||||
response = caller(md_url)
|
||||
materializer = MetadataMaterializer(response.contents,
|
||||
md_url, caller,
|
||||
md_url, mcaller,
|
||||
leaf_decoder=leaf_decoder)
|
||||
md = materializer.materialize()
|
||||
if not isinstance(md, (dict)):
|
||||
|
@ -54,9 +54,13 @@ class DataSourceDigitalOcean(sources.DataSource):
|
||||
def get_data(self):
|
||||
caller = functools.partial(util.read_file_or_url,
|
||||
timeout=self.timeout, retries=self.retries)
|
||||
md = ec2_utils.MetadataMaterializer(str(caller(self.metadata_address)),
|
||||
|
||||
def mcaller(url):
|
||||
return caller(url).contents
|
||||
|
||||
md = ec2_utils.MetadataMaterializer(mcaller(self.metadata_address),
|
||||
base_url=self.metadata_address,
|
||||
caller=caller)
|
||||
caller=mcaller)
|
||||
|
||||
self.metadata = md.materialize()
|
||||
|
||||
|
@ -53,15 +53,15 @@ class DataSourceGCE(sources.DataSource):
|
||||
# GCE metadata server requires a custom header since v1
|
||||
headers = {'X-Google-Metadata-Request': True}
|
||||
|
||||
# url_map: (our-key, path, required)
|
||||
# url_map: (our-key, path, required, is_text)
|
||||
url_map = [
|
||||
('instance-id', 'instance/id', True),
|
||||
('availability-zone', 'instance/zone', True),
|
||||
('local-hostname', 'instance/hostname', True),
|
||||
('public-keys', 'project/attributes/sshKeys', False),
|
||||
('user-data', 'instance/attributes/user-data', False),
|
||||
('instance-id', 'instance/id', True, True),
|
||||
('availability-zone', 'instance/zone', True, True),
|
||||
('local-hostname', 'instance/hostname', True, True),
|
||||
('public-keys', 'project/attributes/sshKeys', False, True),
|
||||
('user-data', 'instance/attributes/user-data', False, False),
|
||||
('user-data-encoding', 'instance/attributes/user-data-encoding',
|
||||
False),
|
||||
False, True),
|
||||
]
|
||||
|
||||
# if we cannot resolve the metadata server, then no point in trying
|
||||
@ -71,12 +71,15 @@ class DataSourceGCE(sources.DataSource):
|
||||
|
||||
# iterate over url_map keys to get metadata items
|
||||
found = False
|
||||
for (mkey, path, required) in url_map:
|
||||
for (mkey, path, required, is_text) in url_map:
|
||||
try:
|
||||
resp = url_helper.readurl(url=self.metadata_address + path,
|
||||
headers=headers)
|
||||
if resp.code == 200:
|
||||
found = True
|
||||
if is_text:
|
||||
self.metadata[mkey] = util.decode_binary(resp.contents)
|
||||
else:
|
||||
self.metadata[mkey] = resp.contents
|
||||
else:
|
||||
if required:
|
||||
|
@ -36,6 +36,8 @@ from cloudinit import util
|
||||
LOG = logging.getLogger(__name__)
|
||||
MD_VERSION = "2012-03-01"
|
||||
|
||||
BINARY_FIELDS = ('user-data',)
|
||||
|
||||
|
||||
class DataSourceMAAS(sources.DataSource):
|
||||
"""
|
||||
@ -185,7 +187,9 @@ def read_maas_seed_dir(seed_d):
|
||||
md = {}
|
||||
for fname in files:
|
||||
try:
|
||||
md[fname] = util.load_file(os.path.join(seed_d, fname))
|
||||
print("fname: %s / %s" % (fname, fname not in BINARY_FIELDS))
|
||||
md[fname] = util.load_file(os.path.join(seed_d, fname),
|
||||
decode=fname not in BINARY_FIELDS)
|
||||
except IOError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
@ -218,6 +222,7 @@ def read_maas_seed_url(seed_url, header_cb=None, timeout=None,
|
||||
'public-keys': "%s/%s" % (base_url, 'meta-data/public-keys'),
|
||||
'user-data': "%s/%s" % (base_url, 'user-data'),
|
||||
}
|
||||
|
||||
md = {}
|
||||
for name in file_order:
|
||||
url = files.get(name)
|
||||
@ -238,7 +243,10 @@ def read_maas_seed_url(seed_url, header_cb=None, timeout=None,
|
||||
timeout=timeout,
|
||||
ssl_details=ssl_details)
|
||||
if resp.ok():
|
||||
md[name] = str(resp)
|
||||
if name in BINARY_FIELDS:
|
||||
md[name] = resp.contents
|
||||
else:
|
||||
md[name] = util.decode_binary(resp.contents)
|
||||
else:
|
||||
LOG.warn(("Fetching from %s resulted in"
|
||||
" an invalid http code %s"), url, resp.code)
|
||||
@ -263,7 +271,7 @@ def check_seed_contents(content, seed):
|
||||
if len(missing):
|
||||
raise MAASSeedDirMalformed("%s: missing files %s" % (seed, missing))
|
||||
|
||||
userdata = content.get('user-data', "")
|
||||
userdata = content.get('user-data', b"")
|
||||
md = {}
|
||||
for (key, val) in content.items():
|
||||
if key == 'user-data':
|
||||
|
@ -327,7 +327,7 @@ class ConfigDriveReader(BaseReader):
|
||||
return os.path.join(*components)
|
||||
|
||||
def _path_read(self, path):
|
||||
return util.load_file(path)
|
||||
return util.load_file(path, decode=False)
|
||||
|
||||
def _fetch_available_versions(self):
|
||||
if self._versions is None:
|
||||
|
@ -119,7 +119,7 @@ class UrlResponse(object):
|
||||
|
||||
@property
|
||||
def contents(self):
|
||||
return self._response.text
|
||||
return self._response.content
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
|
@ -237,9 +237,9 @@ class UserDataProcessor(object):
|
||||
resp = util.read_file_or_url(include_url,
|
||||
ssl_details=self.ssl_details)
|
||||
if include_once_on and resp.ok():
|
||||
util.write_file(include_once_fn, resp, mode=0o600)
|
||||
util.write_file(include_once_fn, resp.contents, mode=0o600)
|
||||
if resp.ok():
|
||||
content = str(resp)
|
||||
content = resp.contents
|
||||
else:
|
||||
LOG.warn(("Fetching from %s resulted in"
|
||||
" a invalid http code of %s"),
|
||||
|
@ -739,6 +739,10 @@ def fetch_ssl_details(paths=None):
|
||||
return ssl_details
|
||||
|
||||
|
||||
def load_tfile_or_url(*args, **kwargs):
|
||||
return(decode_binary(read_file_or_url(*args, **kwargs).contents))
|
||||
|
||||
|
||||
def read_file_or_url(url, timeout=5, retries=10,
|
||||
headers=None, data=None, sec_between=1, ssl_details=None,
|
||||
headers_cb=None, exception_cb=None):
|
||||
@ -750,7 +754,7 @@ def read_file_or_url(url, timeout=5, retries=10,
|
||||
LOG.warn("Unable to post data to file resource %s", url)
|
||||
file_path = url[len("file://"):]
|
||||
try:
|
||||
contents = load_file(file_path)
|
||||
contents = load_file(file_path, decode=False)
|
||||
except IOError as e:
|
||||
code = e.errno
|
||||
if e.errno == errno.ENOENT:
|
||||
@ -806,7 +810,7 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0):
|
||||
ud_url = "%s%s%s" % (base, "user-data", ext)
|
||||
md_url = "%s%s%s" % (base, "meta-data", ext)
|
||||
|
||||
md_resp = read_file_or_url(md_url, timeout, retries, file_retries)
|
||||
md_resp = load_tfile_or_url(md_url, timeout, retries, file_retries)
|
||||
md = None
|
||||
if md_resp.ok():
|
||||
md = load_yaml(md_resp.contents, default={})
|
||||
@ -815,6 +819,7 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0):
|
||||
ud = None
|
||||
if ud_resp.ok():
|
||||
ud = ud_resp.contents
|
||||
print("returning %s (%s)" % (ud_resp.contents.__class__, ud_resp.contents))
|
||||
|
||||
return (md, ud)
|
||||
|
||||
@ -2030,7 +2035,7 @@ def pathprefix2dict(base, required=None, optional=None, delim=os.path.sep):
|
||||
ret = {}
|
||||
for f in required + optional:
|
||||
try:
|
||||
ret[f] = load_file(base + delim + f, quiet=False)
|
||||
ret[f] = load_file(base + delim + f, quiet=False, decode=False)
|
||||
except IOError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
@ -288,6 +288,9 @@ def populate_dir(path, files):
|
||||
os.makedirs(path)
|
||||
for (name, content) in files.items():
|
||||
with open(os.path.join(path, name), "wb") as fp:
|
||||
if isinstance(content, six.binary_type):
|
||||
fp.write(content)
|
||||
else:
|
||||
fp.write(content.encode('utf-8'))
|
||||
fp.close()
|
||||
|
||||
|
@ -2,6 +2,7 @@ from copy import copy
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import six
|
||||
import tempfile
|
||||
|
||||
try:
|
||||
@ -45,7 +46,7 @@ EC2_META = {
|
||||
'reservation-id': 'r-iru5qm4m',
|
||||
'security-groups': ['default']
|
||||
}
|
||||
USER_DATA = '#!/bin/sh\necho This is user data\n'
|
||||
USER_DATA = b'#!/bin/sh\necho This is user data\n'
|
||||
OSTACK_META = {
|
||||
'availability_zone': 'nova',
|
||||
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
|
||||
@ -56,8 +57,8 @@ OSTACK_META = {
|
||||
'public_keys': {'mykey': PUBKEY},
|
||||
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
|
||||
|
||||
CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
|
||||
CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
|
||||
CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
|
||||
CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
|
||||
|
||||
CFG_DRIVE_FILES_V2 = {
|
||||
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
|
||||
@ -346,8 +347,12 @@ def populate_dir(seed_dir, files):
|
||||
dirname = os.path.dirname(path)
|
||||
if not os.path.isdir(dirname):
|
||||
os.makedirs(dirname)
|
||||
with open(path, "w") as fp:
|
||||
if isinstance(content, six.text_type):
|
||||
mode = "w"
|
||||
else:
|
||||
mode = "wb"
|
||||
|
||||
with open(path, mode) as fp:
|
||||
fp.write(content)
|
||||
fp.close()
|
||||
|
||||
# vi: ts=4 expandtab
|
||||
|
@ -32,7 +32,7 @@ GCE_META = {
|
||||
'instance/zone': 'foo/bar',
|
||||
'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
|
||||
'instance/hostname': 'server.project-foo.local',
|
||||
'instance/attributes/user-data': '/bin/echo foo\n',
|
||||
'instance/attributes/user-data': b'/bin/echo foo\n',
|
||||
}
|
||||
|
||||
GCE_META_PARTIAL = {
|
||||
|
@ -26,7 +26,7 @@ class TestMAASDataSource(TestCase):
|
||||
|
||||
data = {'instance-id': 'i-valid01',
|
||||
'local-hostname': 'valid01-hostname',
|
||||
'user-data': 'valid01-userdata',
|
||||
'user-data': b'valid01-userdata',
|
||||
'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
|
||||
|
||||
my_d = os.path.join(self.tmp, "valid")
|
||||
@ -46,7 +46,7 @@ class TestMAASDataSource(TestCase):
|
||||
|
||||
data = {'instance-id': 'i-valid-extra',
|
||||
'local-hostname': 'valid-extra-hostname',
|
||||
'user-data': 'valid-extra-userdata', 'foo': 'bar'}
|
||||
'user-data': b'valid-extra-userdata', 'foo': 'bar'}
|
||||
|
||||
my_d = os.path.join(self.tmp, "valid_extra")
|
||||
populate_dir(my_d, data)
|
||||
@ -103,7 +103,7 @@ class TestMAASDataSource(TestCase):
|
||||
'meta-data/instance-id': 'i-instanceid',
|
||||
'meta-data/local-hostname': 'test-hostname',
|
||||
'meta-data/public-keys': 'test-hostname',
|
||||
'user-data': 'foodata',
|
||||
'user-data': b'foodata',
|
||||
}
|
||||
valid_order = [
|
||||
'meta-data/local-hostname',
|
||||
@ -143,7 +143,7 @@ class TestMAASDataSource(TestCase):
|
||||
userdata, metadata = DataSourceMAAS.read_maas_seed_url(
|
||||
my_seed, header_cb=my_headers_cb, version=my_ver)
|
||||
|
||||
self.assertEqual("foodata", userdata)
|
||||
self.assertEqual(b"foodata", userdata)
|
||||
self.assertEqual(metadata['instance-id'],
|
||||
valid['meta-data/instance-id'])
|
||||
self.assertEqual(metadata['local-hostname'],
|
||||
|
@ -37,7 +37,7 @@ class TestNoCloudDataSource(TestCase):
|
||||
|
||||
def test_nocloud_seed_dir(self):
|
||||
md = {'instance-id': 'IID', 'dsmode': 'local'}
|
||||
ud = "USER_DATA_HERE"
|
||||
ud = b"USER_DATA_HERE"
|
||||
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
|
||||
{'user-data': ud, 'meta-data': yaml.safe_dump(md)})
|
||||
|
||||
@ -92,20 +92,20 @@ class TestNoCloudDataSource(TestCase):
|
||||
data = {
|
||||
'fs_label': None,
|
||||
'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
|
||||
'user-data': "USER_DATA_RAW",
|
||||
'user-data': b"USER_DATA_RAW",
|
||||
}
|
||||
|
||||
sys_cfg = {'datasource': {'NoCloud': data}}
|
||||
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
|
||||
ret = dsrc.get_data()
|
||||
self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
|
||||
self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
|
||||
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
|
||||
self.assertTrue(ret)
|
||||
|
||||
def test_nocloud_seed_with_vendordata(self):
|
||||
md = {'instance-id': 'IID', 'dsmode': 'local'}
|
||||
ud = "USER_DATA_HERE"
|
||||
vd = "THIS IS MY VENDOR_DATA"
|
||||
ud = b"USER_DATA_HERE"
|
||||
vd = b"THIS IS MY VENDOR_DATA"
|
||||
|
||||
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
|
||||
{'user-data': ud, 'meta-data': yaml.safe_dump(md),
|
||||
@ -126,7 +126,7 @@ class TestNoCloudDataSource(TestCase):
|
||||
|
||||
def test_nocloud_no_vendordata(self):
|
||||
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
|
||||
{'user-data': "ud", 'meta-data': "instance-id: IID\n"})
|
||||
{'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
|
||||
|
||||
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
|
||||
|
||||
@ -134,7 +134,7 @@ class TestNoCloudDataSource(TestCase):
|
||||
|
||||
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
|
||||
ret = dsrc.get_data()
|
||||
self.assertEqual(dsrc.userdata_raw, "ud")
|
||||
self.assertEqual(dsrc.userdata_raw, b"ud")
|
||||
self.assertFalse(dsrc.vendordata)
|
||||
self.assertTrue(ret)
|
||||
|
||||
|
@ -49,7 +49,7 @@ EC2_META = {
|
||||
'public-ipv4': '0.0.0.1',
|
||||
'reservation-id': 'r-iru5qm4m',
|
||||
}
|
||||
USER_DATA = '#!/bin/sh\necho This is user data\n'
|
||||
USER_DATA = b'#!/bin/sh\necho This is user data\n'
|
||||
VENDOR_DATA = {
|
||||
'magic': '',
|
||||
}
|
||||
@ -63,8 +63,8 @@ OSTACK_META = {
|
||||
'public_keys': {'mykey': PUBKEY},
|
||||
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c',
|
||||
}
|
||||
CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
|
||||
CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
|
||||
CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
|
||||
CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
|
||||
OS_FILES = {
|
||||
'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
|
||||
'openstack/latest/user_data': USER_DATA,
|
||||
|
@ -16,7 +16,7 @@ class TestEc2Util(helpers.HttprettyTestCase):
|
||||
body='stuff',
|
||||
status=200)
|
||||
userdata = eu.get_instance_userdata(self.VERSION)
|
||||
self.assertEquals('stuff', userdata)
|
||||
self.assertEquals('stuff', userdata.decode('utf-8'))
|
||||
|
||||
@hp.activate
|
||||
def test_userdata_fetch_fail_not_found(self):
|
||||
|
@ -30,7 +30,7 @@ class TestAptProxyConfig(TestCase):
|
||||
self.assertTrue(os.path.isfile(self.pfile))
|
||||
self.assertFalse(os.path.isfile(self.cfile))
|
||||
|
||||
contents = str(util.read_file_or_url(self.pfile))
|
||||
contents = util.load_tfile_or_url(self.pfile)
|
||||
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
|
||||
|
||||
def test_apt_http_proxy_written(self):
|
||||
@ -40,7 +40,7 @@ class TestAptProxyConfig(TestCase):
|
||||
self.assertTrue(os.path.isfile(self.pfile))
|
||||
self.assertFalse(os.path.isfile(self.cfile))
|
||||
|
||||
contents = str(util.read_file_or_url(self.pfile))
|
||||
contents = util.load_tfile_or_url(self.pfile)
|
||||
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
|
||||
|
||||
def test_apt_all_proxy_written(self):
|
||||
@ -58,7 +58,7 @@ class TestAptProxyConfig(TestCase):
|
||||
self.assertTrue(os.path.isfile(self.pfile))
|
||||
self.assertFalse(os.path.isfile(self.cfile))
|
||||
|
||||
contents = str(util.read_file_or_url(self.pfile))
|
||||
contents = util.load_tfile_or_url(self.pfile)
|
||||
|
||||
for ptype, pval in values.items():
|
||||
self.assertTrue(self._search_apt_config(contents, ptype, pval))
|
||||
@ -74,7 +74,7 @@ class TestAptProxyConfig(TestCase):
|
||||
cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
|
||||
self.pfile, self.cfile)
|
||||
self.assertTrue(os.path.isfile(self.pfile))
|
||||
contents = str(util.read_file_or_url(self.pfile))
|
||||
contents = util.load_tfile_or_url(self.pfile)
|
||||
self.assertTrue(self._search_apt_config(contents, "http", "foo"))
|
||||
|
||||
def test_config_written(self):
|
||||
@ -86,14 +86,14 @@ class TestAptProxyConfig(TestCase):
|
||||
self.assertTrue(os.path.isfile(self.cfile))
|
||||
self.assertFalse(os.path.isfile(self.pfile))
|
||||
|
||||
self.assertEqual(str(util.read_file_or_url(self.cfile)), payload)
|
||||
self.assertEqual(util.load_tfile_or_url(self.cfile), payload)
|
||||
|
||||
def test_config_replaced(self):
|
||||
util.write_file(self.pfile, "content doesnt matter")
|
||||
cc_apt_configure.apply_apt_config({'apt_config': "foo"},
|
||||
self.pfile, self.cfile)
|
||||
self.assertTrue(os.path.isfile(self.cfile))
|
||||
self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo")
|
||||
self.assertEqual(util.load_tfile_or_url(self.cfile), "foo")
|
||||
|
||||
def test_config_deleted(self):
|
||||
# if no 'apt_config' is provided, delete any previously written file
|
||||
|
@ -14,20 +14,20 @@ class TestPathPrefix2Dict(TestCase):
|
||||
self.addCleanup(shutil.rmtree, self.tmp)
|
||||
|
||||
def test_required_only(self):
|
||||
dirdata = {'f1': 'f1content', 'f2': 'f2content'}
|
||||
dirdata = {'f1': b'f1content', 'f2': b'f2content'}
|
||||
populate_dir(self.tmp, dirdata)
|
||||
|
||||
ret = util.pathprefix2dict(self.tmp, required=['f1', 'f2'])
|
||||
self.assertEqual(dirdata, ret)
|
||||
|
||||
def test_required_missing(self):
|
||||
dirdata = {'f1': 'f1content'}
|
||||
dirdata = {'f1': b'f1content'}
|
||||
populate_dir(self.tmp, dirdata)
|
||||
kwargs = {'required': ['f1', 'f2']}
|
||||
self.assertRaises(ValueError, util.pathprefix2dict, self.tmp, **kwargs)
|
||||
|
||||
def test_no_required_and_optional(self):
|
||||
dirdata = {'f1': 'f1c', 'f2': 'f2c'}
|
||||
dirdata = {'f1': b'f1c', 'f2': b'f2c'}
|
||||
populate_dir(self.tmp, dirdata)
|
||||
|
||||
ret = util.pathprefix2dict(self.tmp, required=None,
|
||||
@ -35,7 +35,7 @@ class TestPathPrefix2Dict(TestCase):
|
||||
self.assertEqual(dirdata, ret)
|
||||
|
||||
def test_required_and_optional(self):
|
||||
dirdata = {'f1': 'f1c', 'f2': 'f2c'}
|
||||
dirdata = {'f1': b'f1c', 'f2': b'f2c'}
|
||||
populate_dir(self.tmp, dirdata)
|
||||
|
||||
ret = util.pathprefix2dict(self.tmp, required=['f1'], optional=['f2'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user