Remove oslo namespace package

Blueprint remove-namespace-packages

Depends-on: Id86b74fb492e1b86d29197b4e127d8368affa3fd
for openstack/congress

Change-Id: I3db0b8da385cb4ca475129260725a66413b25854
This commit is contained in:
Doug Hellmann 2015-05-05 19:20:28 +00:00
parent dd139b84c0
commit 7ebc48cf9d
29 changed files with 1 additions and 3164 deletions

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__import__('pkg_resources').declare_namespace(__name__)

View File

@ -1,26 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=3,
)
deprecated()

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.api import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.constants import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.exceptions import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.image_transfer import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.objects.datacenter import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.objects.datastore import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.pbm import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.rw_handles import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.service import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.vim import * # noqa

View File

@ -1,13 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_vmware.vim_util import * # noqa

View File

@ -23,7 +23,7 @@ import mock
from oslo_i18n import fixture as i18n_fixture
import suds
from oslo.vmware import exceptions
from oslo_vmware import exceptions
from oslo_vmware.tests import base
from oslo_vmware import vim

View File

@ -21,11 +21,7 @@ classifier =
[files]
packages =
oslo
oslo.vmware
oslo_vmware
namespace_packages =
oslo
[build_sphinx]
source-dir = doc/source

View File

@ -1,11 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,53 +0,0 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
import testtools
_TRUE_VALUES = ('true', '1', 'yes')
# FIXME(dhellmann) Update this to use oslo.test library
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())

View File

@ -1,30 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo.vmware.objects import datacenter
from tests import base
class DatacenterTestCase(base.TestCase):
"""Test the Datacenter object."""
def test_dc(self):
self.assertRaises(ValueError, datacenter.Datacenter, None, 'dc-1')
self.assertRaises(ValueError, datacenter.Datacenter, mock.Mock(), None)
dc = datacenter.Datacenter('ref', 'name')
self.assertEqual('ref', dc.ref)
self.assertEqual('name', dc.name)

View File

@ -1,385 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import units
import six.moves.urllib.parse as urlparse
from oslo.vmware import constants
from oslo.vmware.objects import datastore
from oslo.vmware import vim_util
from oslo_vmware import vim_util as new_vim_util
from tests import base
class HostMount(object):
def __init__(self, key, mountInfo):
self.key = key
self.mountInfo = mountInfo
class MountInfo(object):
def __init__(self, accessMode, mounted, accessible):
self.accessMode = accessMode
self.mounted = mounted
self.accessible = accessible
class DatastoreTestCase(base.TestCase):
"""Test the Datastore object."""
def test_ds(self):
ds = datastore.Datastore(
"fake_ref", "ds_name", 2 * units.Gi, 1 * units.Gi)
self.assertEqual('ds_name', ds.name)
self.assertEqual('fake_ref', ds.ref)
self.assertEqual(2 * units.Gi, ds.capacity)
self.assertEqual(1 * units.Gi, ds.freespace)
def test_ds_invalid_space(self):
self.assertRaises(ValueError, datastore.Datastore,
"fake_ref", "ds_name", 1 * units.Gi, 2 * units.Gi)
self.assertRaises(ValueError, datastore.Datastore,
"fake_ref", "ds_name", None, 2 * units.Gi)
def test_ds_no_capacity_no_freespace(self):
ds = datastore.Datastore("fake_ref", "ds_name")
self.assertIsNone(ds.capacity)
self.assertIsNone(ds.freespace)
def test_ds_invalid(self):
self.assertRaises(ValueError, datastore.Datastore, None, "ds_name")
self.assertRaises(ValueError, datastore.Datastore, "fake_ref", None)
def test_build_path(self):
ds = datastore.Datastore("fake_ref", "ds_name")
ds_path = ds.build_path("some_dir", "foo.vmdk")
self.assertEqual('[ds_name] some_dir/foo.vmdk', str(ds_path))
def test_build_url(self):
ds = datastore.Datastore("fake_ref", "ds_name")
path = 'images/ubuntu.vmdk'
self.assertRaises(ValueError, ds.build_url, 'https', '10.0.0.2', path)
ds.datacenter = mock.Mock()
ds.datacenter.name = "dc_path"
ds_url = ds.build_url('https', '10.0.0.2', path)
self.assertEqual(ds_url.datastore_name, "ds_name")
self.assertEqual(ds_url.datacenter_path, "dc_path")
self.assertEqual(ds_url.path, path)
def test_get_summary(self):
ds_ref = vim_util.get_moref('ds-0', 'Datastore')
ds = datastore.Datastore(ds_ref, 'ds-name')
summary = mock.sentinel.summary
session = mock.Mock()
session.invoke_api = mock.Mock()
session.invoke_api.return_value = summary
ret = ds.get_summary(session)
self.assertEqual(summary, ret)
session.invoke_api.assert_called_once_with(new_vim_util,
'get_object_property',
session.vim,
ds.ref, 'summary')
def test_get_connected_hosts(self):
session = mock.Mock()
ds_ref = vim_util.get_moref('ds-0', 'Datastore')
ds = datastore.Datastore(ds_ref, 'ds-name')
ds.get_summary = mock.Mock()
ds.get_summary.return_value.accessible = False
self.assertEqual([], ds.get_connected_hosts(session))
ds.get_summary.return_value.accessible = True
m1 = HostMount("m1", MountInfo('readWrite', True, True))
m2 = HostMount("m2", MountInfo('read', True, True))
m3 = HostMount("m3", MountInfo('readWrite', False, True))
m4 = HostMount("m4", MountInfo('readWrite', True, False))
ds.get_summary.assert_called_once_with(session)
class Prop(object):
DatastoreHostMount = [m1, m2, m3, m4]
session.invoke_api = mock.Mock()
session.invoke_api.return_value = Prop()
hosts = ds.get_connected_hosts(session)
self.assertEqual(1, len(hosts))
self.assertEqual("m1", hosts.pop())
def test_is_datastore_mount_usable(self):
m = MountInfo('readWrite', True, True)
self.assertTrue(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('read', True, True)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('readWrite', False, True)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('readWrite', True, False)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('readWrite', False, False)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('readWrite', None, None)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
m = MountInfo('readWrite', None, True)
self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
class DatastorePathTestCase(base.TestCase):
"""Test the DatastorePath object."""
def test_ds_path(self):
p = datastore.DatastorePath('dsname', 'a/b/c', 'file.iso')
self.assertEqual('[dsname] a/b/c/file.iso', str(p))
self.assertEqual('a/b/c/file.iso', p.rel_path)
self.assertEqual('a/b/c', p.parent.rel_path)
self.assertEqual('[dsname] a/b/c', str(p.parent))
self.assertEqual('dsname', p.datastore)
self.assertEqual('file.iso', p.basename)
self.assertEqual('a/b/c', p.dirname)
def test_ds_path_no_ds_name(self):
bad_args = [
('', ['a/b/c', 'file.iso']),
(None, ['a/b/c', 'file.iso'])]
for t in bad_args:
self.assertRaises(
ValueError, datastore.DatastorePath,
t[0], *t[1])
def test_ds_path_invalid_path_components(self):
bad_args = [
('dsname', [None]),
('dsname', ['', None]),
('dsname', ['a', None]),
('dsname', ['a', None, 'b']),
('dsname', [None, '']),
('dsname', [None, 'b'])]
for t in bad_args:
self.assertRaises(
ValueError, datastore.DatastorePath,
t[0], *t[1])
def test_ds_path_no_subdir(self):
args = [
('dsname', ['', 'x.vmdk']),
('dsname', ['x.vmdk'])]
canonical_p = datastore.DatastorePath('dsname', 'x.vmdk')
self.assertEqual('[dsname] x.vmdk', str(canonical_p))
self.assertEqual('', canonical_p.dirname)
self.assertEqual('x.vmdk', canonical_p.basename)
self.assertEqual('x.vmdk', canonical_p.rel_path)
for t in args:
p = datastore.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
def test_ds_path_ds_only(self):
args = [
('dsname', []),
('dsname', ['']),
('dsname', ['', ''])]
canonical_p = datastore.DatastorePath('dsname')
self.assertEqual('[dsname]', str(canonical_p))
self.assertEqual('', canonical_p.rel_path)
self.assertEqual('', canonical_p.basename)
self.assertEqual('', canonical_p.dirname)
for t in args:
p = datastore.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
self.assertEqual(canonical_p.rel_path, p.rel_path)
def test_ds_path_equivalence(self):
args = [
('dsname', ['a/b/c/', 'x.vmdk']),
('dsname', ['a/', 'b/c/', 'x.vmdk']),
('dsname', ['a', 'b', 'c', 'x.vmdk']),
('dsname', ['a/b/c', 'x.vmdk'])]
canonical_p = datastore.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
for t in args:
p = datastore.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
self.assertEqual(canonical_p.datastore, p.datastore)
self.assertEqual(canonical_p.rel_path, p.rel_path)
self.assertEqual(str(canonical_p.parent), str(p.parent))
def test_ds_path_non_equivalence(self):
args = [
# leading slash
('dsname', ['/a', 'b', 'c', 'x.vmdk']),
('dsname', ['/a/b/c/', 'x.vmdk']),
('dsname', ['a/b/c', '/x.vmdk']),
# leading space
('dsname', ['a/b/c/', ' x.vmdk']),
('dsname', ['a/', ' b/c/', 'x.vmdk']),
('dsname', [' a', 'b', 'c', 'x.vmdk']),
# trailing space
('dsname', ['/a/b/c/', 'x.vmdk ']),
('dsname', ['a/b/c/ ', 'x.vmdk'])]
canonical_p = datastore.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
for t in args:
p = datastore.DatastorePath(t[0], *t[1])
self.assertNotEqual(str(canonical_p), str(p))
def test_equal(self):
a = datastore.DatastorePath('ds_name', 'a')
b = datastore.DatastorePath('ds_name', 'a')
self.assertEqual(a, b)
def test_join(self):
p = datastore.DatastorePath('ds_name', 'a')
ds_path = p.join('b')
self.assertEqual('[ds_name] a/b', str(ds_path))
p = datastore.DatastorePath('ds_name', 'a')
ds_path = p.join()
bad_args = [
[None],
['', None],
['a', None],
['a', None, 'b']]
for arg in bad_args:
self.assertRaises(ValueError, p.join, *arg)
def test_ds_path_parse(self):
p = datastore.DatastorePath.parse('[dsname]')
self.assertEqual('dsname', p.datastore)
self.assertEqual('', p.rel_path)
p = datastore.DatastorePath.parse('[dsname] folder')
self.assertEqual('dsname', p.datastore)
self.assertEqual('folder', p.rel_path)
p = datastore.DatastorePath.parse('[dsname] folder/file')
self.assertEqual('dsname', p.datastore)
self.assertEqual('folder/file', p.rel_path)
for p in [None, '']:
self.assertRaises(ValueError, datastore.DatastorePath.parse, p)
for p in ['bad path', '/a/b/c', 'a/b/c']:
self.assertRaises(IndexError, datastore.DatastorePath.parse, p)
class DatastoreURLTestCase(base.TestCase):
"""Test the DatastoreURL object."""
def test_path_strip(self):
scheme = 'https'
server = '13.37.73.31'
path = 'images/ubuntu-14.04.vmdk'
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
expected_url = '%s://%s/folder/%s?%s' % (
scheme, server, path, query)
self.assertEqual(expected_url, str(url))
def test_path_lstrip(self):
scheme = 'https'
server = '13.37.73.31'
path = '/images/ubuntu-14.04.vmdk'
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
expected_url = '%s://%s/folder/%s?%s' % (
scheme, server, path.lstrip('/'), query)
self.assertEqual(expected_url, str(url))
def test_path_rstrip(self):
scheme = 'https'
server = '13.37.73.31'
path = 'images/ubuntu-14.04.vmdk/'
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
expected_url = '%s://%s/folder/%s?%s' % (
scheme, server, path.rstrip('/'), query)
self.assertEqual(expected_url, str(url))
def test_urlparse(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
ds_url = datastore.DatastoreURL.urlparse(url)
self.assertEqual(url, str(ds_url))
def test_datastore_name(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
ds_url = datastore.DatastoreURL.urlparse(url)
self.assertEqual(ds_name, ds_url.datastore_name)
def test_datacenter_path(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
ds_url = datastore.DatastoreURL.urlparse(url)
self.assertEqual(dc_path, ds_url.datacenter_path)
def test_path(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
path = 'images/aa.vmdk'
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/%s?%s' % (path, query)
ds_url = datastore.DatastoreURL.urlparse(url)
self.assertEqual(path, ds_url.path)
@mock.patch('six.moves.http_client.HTTPSConnection')
def test_connect(self, mock_conn):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
ds_url = datastore.DatastoreURL.urlparse(url)
cookie = mock.Mock()
ds_url.connect('PUT', 128, cookie)
mock_conn.assert_called_once_with('13.37.73.31')
def test_get_transfer_ticket(self):
dc_path = 'datacenter-1'
ds_name = 'datastore-1'
params = {'dcPath': dc_path, 'dsName': ds_name}
query = urlparse.urlencode(params)
url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
session = mock.Mock()
session.invoke_api = mock.Mock()
class Ticket(object):
id = 'fake_id'
session.invoke_api.return_value = Ticket()
ds_url = datastore.DatastoreURL.urlparse(url)
ticket = ds_url.get_transfer_ticket(session, 'PUT')
self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'),
ticket)

View File

@ -1,563 +0,0 @@
# coding=utf-8
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for session management and API invocation classes.
"""
from eventlet import greenthread
import mock
import six
import suds
from oslo.vmware import api
from oslo.vmware import exceptions
from oslo_vmware import vim_util as new_vim_util
from tests import base
class RetryDecoratorTest(base.TestCase):
"""Tests for retry decorator class."""
def test_retry(self):
result = "RESULT"
@api.RetryDecorator()
def func(*args, **kwargs):
return result
self.assertEqual(result, func())
def func2(*args, **kwargs):
return result
retry = api.RetryDecorator()
self.assertEqual(result, retry(func2)())
self.assertTrue(retry._retry_count == 0)
def test_retry_with_expected_exceptions(self):
result = "RESULT"
responses = [exceptions.VimSessionOverLoadException(None),
exceptions.VimSessionOverLoadException(None),
result]
def func(*args, **kwargs):
response = responses.pop(0)
if isinstance(response, Exception):
raise response
return response
sleep_time_incr = 0.01
retry_count = 2
retry = api.RetryDecorator(10, sleep_time_incr, 10,
(exceptions.VimSessionOverLoadException,))
self.assertEqual(result, retry(func)())
self.assertTrue(retry._retry_count == retry_count)
self.assertEqual(retry_count * sleep_time_incr, retry._sleep_time)
def test_retry_with_max_retries(self):
responses = [exceptions.VimSessionOverLoadException(None),
exceptions.VimSessionOverLoadException(None),
exceptions.VimSessionOverLoadException(None)]
def func(*args, **kwargs):
response = responses.pop(0)
if isinstance(response, Exception):
raise response
return response
retry = api.RetryDecorator(2, 0, 0,
(exceptions.VimSessionOverLoadException,))
self.assertRaises(exceptions.VimSessionOverLoadException, retry(func))
self.assertTrue(retry._retry_count == 2)
def test_retry_with_unexpected_exception(self):
def func(*args, **kwargs):
raise exceptions.VimException(None)
retry = api.RetryDecorator()
self.assertRaises(exceptions.VimException, retry(func))
self.assertTrue(retry._retry_count == 0)
class VMwareAPISessionTest(base.TestCase):
"""Tests for VMwareAPISession."""
SERVER_IP = '10.1.2.3'
PORT = 443
USERNAME = 'admin'
PASSWORD = 'password'
def setUp(self):
super(VMwareAPISessionTest, self).setUp()
patcher = mock.patch('oslo_vmware.vim.Vim')
self.addCleanup(patcher.stop)
self.VimMock = patcher.start()
self.VimMock.side_effect = lambda *args, **kw: mock.MagicMock()
self.cert_mock = mock.Mock()
def _create_api_session(self, _create_session, retry_count=10,
task_poll_interval=1):
return api.VMwareAPISession(VMwareAPISessionTest.SERVER_IP,
VMwareAPISessionTest.USERNAME,
VMwareAPISessionTest.PASSWORD,
retry_count,
task_poll_interval,
'https',
_create_session,
port=VMwareAPISessionTest.PORT,
cacert=self.cert_mock,
insecure=False)
def test_vim(self):
api_session = self._create_api_session(False)
api_session.vim
self.VimMock.assert_called_with(protocol=api_session._scheme,
host=VMwareAPISessionTest.SERVER_IP,
port=VMwareAPISessionTest.PORT,
wsdl_url=api_session._vim_wsdl_loc,
cacert=self.cert_mock,
insecure=False)
@mock.patch('oslo_vmware.pbm.Pbm')
def test_pbm(self, pbm_mock):
api_session = self._create_api_session(True)
vim_obj = api_session.vim
cookie = mock.Mock()
vim_obj.get_http_cookie.return_value = cookie
api_session._pbm_wsdl_loc = mock.Mock()
pbm = mock.Mock()
pbm_mock.return_value = pbm
api_session._get_session_cookie = mock.Mock(return_value=cookie)
self.assertEqual(pbm, api_session.pbm)
pbm.set_soap_cookie.assert_called_once_with(cookie)
def test_create_session(self):
session = mock.Mock()
session.key = "12345"
api_session = self._create_api_session(False)
cookie = mock.Mock()
vim_obj = api_session.vim
vim_obj.Login.return_value = session
vim_obj.get_http_cookie.return_value = cookie
pbm = mock.Mock()
api_session._pbm = pbm
api_session._create_session()
session_manager = vim_obj.service_content.sessionManager
vim_obj.Login.assert_called_once_with(
session_manager, userName=VMwareAPISessionTest.USERNAME,
password=VMwareAPISessionTest.PASSWORD)
self.assertFalse(vim_obj.TerminateSession.called)
self.assertEqual(session.key, api_session._session_id)
pbm.set_soap_cookie.assert_called_once_with(cookie)
def test_create_session_with_existing_inactive_session(self):
old_session_key = '12345'
new_session_key = '67890'
session = mock.Mock()
session.key = new_session_key
api_session = self._create_api_session(False)
api_session._session_id = old_session_key
api_session._session_username = api_session._server_username
vim_obj = api_session.vim
vim_obj.Login.return_value = session
vim_obj.SessionIsActive.return_value = False
api_session._create_session()
session_manager = vim_obj.service_content.sessionManager
vim_obj.SessionIsActive.assert_called_once_with(
session_manager, sessionID=old_session_key,
userName=VMwareAPISessionTest.USERNAME)
vim_obj.Login.assert_called_once_with(
session_manager, userName=VMwareAPISessionTest.USERNAME,
password=VMwareAPISessionTest.PASSWORD)
self.assertEqual(new_session_key, api_session._session_id)
def test_create_session_with_existing_active_session(self):
old_session_key = '12345'
api_session = self._create_api_session(False)
api_session._session_id = old_session_key
api_session._session_username = api_session._server_username
vim_obj = api_session.vim
vim_obj.SessionIsActive.return_value = True
api_session._create_session()
session_manager = vim_obj.service_content.sessionManager
vim_obj.SessionIsActive.assert_called_once_with(
session_manager, sessionID=old_session_key,
userName=VMwareAPISessionTest.USERNAME)
self.assertFalse(vim_obj.Login.called)
self.assertEqual(old_session_key, api_session._session_id)
def test_invoke_api(self):
api_session = self._create_api_session(True)
response = mock.Mock()
def api(*args, **kwargs):
return response
module = mock.Mock()
module.api = api
ret = api_session.invoke_api(module, 'api')
self.assertEqual(response, ret)
def test_logout_with_exception(self):
session = mock.Mock()
session.key = "12345"
api_session = self._create_api_session(False)
vim_obj = api_session.vim
vim_obj.Login.return_value = session
vim_obj.Logout.side_effect = exceptions.VimFaultException([], None)
api_session._create_session()
api_session.logout()
self.assertEqual("12345", api_session._session_id)
def test_logout_no_session(self):
api_session = self._create_api_session(False)
vim_obj = api_session.vim
api_session.logout()
self.assertEqual(0, vim_obj.Logout.call_count)
def test_logout_calls_vim_logout(self):
session = mock.Mock()
session.key = "12345"
api_session = self._create_api_session(False)
vim_obj = api_session.vim
vim_obj.Login.return_value = session
vim_obj.Logout.return_value = None
api_session._create_session()
session_manager = vim_obj.service_content.sessionManager
vim_obj.Login.assert_called_once_with(
session_manager, userName=VMwareAPISessionTest.USERNAME,
password=VMwareAPISessionTest.PASSWORD)
api_session.logout()
vim_obj.Logout.assert_called_once_with(
session_manager)
self.assertIsNone(api_session._session_id)
def test_invoke_api_with_expected_exception(self):
api_session = self._create_api_session(True)
api_session._create_session = mock.Mock()
vim_obj = api_session.vim
vim_obj.SessionIsActive.return_value = False
ret = mock.Mock()
responses = [exceptions.VimConnectionException(None), ret]
def api(*args, **kwargs):
response = responses.pop(0)
if isinstance(response, Exception):
raise response
return response
module = mock.Mock()
module.api = api
with mock.patch.object(greenthread, 'sleep'):
self.assertEqual(ret, api_session.invoke_api(module, 'api'))
api_session._create_session.assert_called_once_with()
def test_invoke_api_not_recreate_session(self):
api_session = self._create_api_session(True)
api_session._create_session = mock.Mock()
vim_obj = api_session.vim
vim_obj.SessionIsActive.return_value = True
ret = mock.Mock()
responses = [exceptions.VimConnectionException(None), ret]
def api(*args, **kwargs):
response = responses.pop(0)
if isinstance(response, Exception):
raise response
return response
module = mock.Mock()
module.api = api
with mock.patch.object(greenthread, 'sleep'):
self.assertEqual(ret, api_session.invoke_api(module, 'api'))
self.assertFalse(api_session._create_session.called)
def test_invoke_api_with_vim_fault_exception(self):
api_session = self._create_api_session(True)
def api(*args, **kwargs):
raise exceptions.VimFaultException([], None)
module = mock.Mock()
module.api = api
self.assertRaises(exceptions.VimFaultException,
api_session.invoke_api,
module,
'api')
def test_invoke_api_with_vim_fault_exception_details(self):
api_session = self._create_api_session(True)
fault_string = 'Invalid property.'
fault_list = [exceptions.INVALID_PROPERTY]
details = {u'name': suds.sax.text.Text(u'фира')}
module = mock.Mock()
module.api.side_effect = exceptions.VimFaultException(fault_list,
fault_string,
details=details)
e = self.assertRaises(exceptions.InvalidPropertyException,
api_session.invoke_api,
module,
'api')
details_str = u"{'name': 'фира'}"
expected_str = "%s\nFaults: %s\nDetails: %s" % (fault_string,
fault_list,
details_str)
self.assertEqual(expected_str, six.text_type(e))
self.assertEqual(details, e.details)
def test_invoke_api_with_empty_response(self):
api_session = self._create_api_session(True)
vim_obj = api_session.vim
vim_obj.SessionIsActive.return_value = True
def api(*args, **kwargs):
raise exceptions.VimFaultException(
[exceptions.NOT_AUTHENTICATED], None)
module = mock.Mock()
module.api = api
ret = api_session.invoke_api(module, 'api')
self.assertEqual([], ret)
vim_obj.SessionIsActive.assert_called_once_with(
vim_obj.service_content.sessionManager,
sessionID=api_session._session_id,
userName=api_session._session_username)
def test_invoke_api_with_stale_session(self):
api_session = self._create_api_session(True)
api_session._create_session = mock.Mock()
vim_obj = api_session.vim
vim_obj.SessionIsActive.return_value = False
result = mock.Mock()
responses = [exceptions.VimFaultException(
[exceptions.NOT_AUTHENTICATED], None), result]
def api(*args, **kwargs):
response = responses.pop(0)
if isinstance(response, Exception):
raise response
return response
module = mock.Mock()
module.api = api
with mock.patch.object(greenthread, 'sleep'):
ret = api_session.invoke_api(module, 'api')
self.assertEqual(result, ret)
vim_obj.SessionIsActive.assert_called_once_with(
vim_obj.service_content.sessionManager,
sessionID=api_session._session_id,
userName=api_session._session_username)
api_session._create_session.assert_called_once_with()
def test_wait_for_task(self):
api_session = self._create_api_session(True)
task_info_list = [('queued', 0), ('running', 40), ('success', 100)]
task_info_list_size = len(task_info_list)
def invoke_api_side_effect(module, method, *args, **kwargs):
(state, progress) = task_info_list.pop(0)
task_info = mock.Mock()
task_info.progress = progress
task_info.state = state
return task_info
api_session.invoke_api = mock.Mock(side_effect=invoke_api_side_effect)
task = mock.Mock()
with mock.patch.object(greenthread, 'sleep'):
ret = api_session.wait_for_task(task)
self.assertEqual('success', ret.state)
self.assertEqual(100, ret.progress)
api_session.invoke_api.assert_called_with(new_vim_util,
'get_object_property',
api_session.vim, task,
'info')
self.assertEqual(task_info_list_size,
api_session.invoke_api.call_count)
def test_wait_for_task_with_error_state(self):
api_session = self._create_api_session(True)
task_info_list = [('queued', 0), ('running', 40), ('error', -1)]
task_info_list_size = len(task_info_list)
def invoke_api_side_effect(module, method, *args, **kwargs):
(state, progress) = task_info_list.pop(0)
task_info = mock.Mock()
task_info.progress = progress
task_info.state = state
return task_info
api_session.invoke_api = mock.Mock(side_effect=invoke_api_side_effect)
task = mock.Mock()
with mock.patch.object(greenthread, 'sleep'):
self.assertRaises(exceptions.VMwareDriverException,
api_session.wait_for_task,
task)
api_session.invoke_api.assert_called_with(new_vim_util,
'get_object_property',
api_session.vim, task,
'info')
self.assertEqual(task_info_list_size,
api_session.invoke_api.call_count)
def test_wait_for_task_with_invoke_api_exception(self):
api_session = self._create_api_session(True)
api_session.invoke_api = mock.Mock(
side_effect=exceptions.VimException(None))
task = mock.Mock()
with mock.patch.object(greenthread, 'sleep'):
self.assertRaises(exceptions.VimException,
api_session.wait_for_task,
task)
api_session.invoke_api.assert_called_once_with(new_vim_util,
'get_object_property',
api_session.vim, task,
'info')
def test_wait_for_lease_ready(self):
api_session = self._create_api_session(True)
lease_states = ['initializing', 'ready']
num_states = len(lease_states)
def invoke_api_side_effect(module, method, *args, **kwargs):
return lease_states.pop(0)
api_session.invoke_api = mock.Mock(side_effect=invoke_api_side_effect)
lease = mock.Mock()
with mock.patch.object(greenthread, 'sleep'):
api_session.wait_for_lease_ready(lease)
api_session.invoke_api.assert_called_with(new_vim_util,
'get_object_property',
api_session.vim, lease,
'state')
self.assertEqual(num_states, api_session.invoke_api.call_count)
def test_wait_for_lease_ready_with_error_state(self):
api_session = self._create_api_session(True)
responses = ['initializing', 'error', 'error_msg']
def invoke_api_side_effect(module, method, *args, **kwargs):
return responses.pop(0)
api_session.invoke_api = mock.Mock(side_effect=invoke_api_side_effect)
lease = mock.Mock()
with mock.patch.object(greenthread, 'sleep'):
self.assertRaises(exceptions.VimException,
api_session.wait_for_lease_ready,
lease)
exp_calls = [mock.call(new_vim_util, 'get_object_property',
api_session.vim, lease, 'state')] * 2
exp_calls.append(mock.call(new_vim_util, 'get_object_property',
api_session.vim, lease, 'error'))
self.assertEqual(exp_calls, api_session.invoke_api.call_args_list)
def test_wait_for_lease_ready_with_unknown_state(self):
api_session = self._create_api_session(True)
def invoke_api_side_effect(module, method, *args, **kwargs):
return 'unknown'
api_session.invoke_api = mock.Mock(side_effect=invoke_api_side_effect)
lease = mock.Mock()
self.assertRaises(exceptions.VimException,
api_session.wait_for_lease_ready,
lease)
api_session.invoke_api.assert_called_once_with(new_vim_util,
'get_object_property',
api_session.vim,
lease, 'state')
def test_wait_for_lease_ready_with_invoke_api_exception(self):
api_session = self._create_api_session(True)
api_session.invoke_api = mock.Mock(
side_effect=exceptions.VimException(None))
lease = mock.Mock()
self.assertRaises(exceptions.VimException,
api_session.wait_for_lease_ready,
lease)
api_session.invoke_api.assert_called_once_with(
new_vim_util, 'get_object_property', api_session.vim, lease,
'state')
def _poll_task_well_known_exceptions(self, fault,
expected_exception):
api_session = self._create_api_session(False)
def fake_invoke_api(self, module, method, *args, **kwargs):
task_info = mock.Mock()
task_info.progress = -1
task_info.state = 'error'
error = mock.Mock()
error.localizedMessage = "Error message"
error_fault = mock.Mock()
error_fault.__class__.__name__ = fault
error.fault = error_fault
task_info.error = error
return task_info
with (
mock.patch.object(api_session, 'invoke_api', fake_invoke_api)
):
self.assertRaises(expected_exception,
api_session._poll_task,
'fake-task')
def test_poll_task_unknown_exception(self):
_unknown_exceptions = {
'NoDiskSpace': exceptions.VMwareDriverException,
'RuntimeFault': exceptions.VMwareDriverException
}
for k, v in six.iteritems(_unknown_exceptions):
self._poll_task_well_known_exceptions(k, v)
def _create_subclass_exception(self):
class VimSubClass(exceptions.VMwareDriverException):
pass
return VimSubClass
def test_register_fault_class(self):
exc = self._create_subclass_exception()
exceptions.register_fault_class('ValueError', exc)
self.assertEqual(exc, exceptions.get_fault_class('ValueError'))
def test_register_fault_class_override(self):
exc = self._create_subclass_exception()
exceptions.register_fault_class(exceptions.ALREADY_EXISTS, exc)
self.assertEqual(exc,
exceptions.get_fault_class(exceptions.ALREADY_EXISTS))
def test_register_fault_classi_invalid(self):
self.assertRaises(TypeError,
exceptions.register_fault_class,
'ValueError', ValueError)
def test_update_pbm_wsdl_loc(self):
session = mock.Mock()
session.key = "12345"
api_session = self._create_api_session(False)
self.assertIsNone(api_session._pbm_wsdl_loc)
api_session.pbm_wsdl_loc_set('fake_wsdl')
self.assertEqual('fake_wsdl', api_session._pbm_wsdl_loc)

View File

@ -1,519 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for functions and classes for image transfer.
"""
import math
from eventlet import greenthread
from eventlet import timeout
import mock
from oslo.vmware import exceptions
from oslo.vmware import image_transfer
from oslo.vmware import rw_handles
from oslo_vmware import image_transfer as new_image_transfer
from tests import base
class BlockingQueueTest(base.TestCase):
"""Tests for BlockingQueue."""
def test_read(self):
max_size = 10
chunk_size = 10
max_transfer_size = 30
queue = image_transfer.BlockingQueue(max_size, max_transfer_size)
def get_side_effect():
return [1] * chunk_size
queue.get = mock.Mock(side_effect=get_side_effect)
while True:
data_item = queue.read(chunk_size)
if not data_item:
break
self.assertEqual(max_transfer_size, queue._transferred)
exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) /
chunk_size))
self.assertEqual(exp_calls, queue.get.call_args_list)
def test_write(self):
queue = image_transfer.BlockingQueue(10, 30)
queue.put = mock.Mock()
write_count = 10
for _ in range(0, write_count):
queue.write([1])
exp_calls = [mock.call([1])] * write_count
self.assertEqual(exp_calls, queue.put.call_args_list)
def test_seek(self):
queue = image_transfer.BlockingQueue(10, 30)
self.assertRaises(IOError, queue.seek, 5)
def test_tell(self):
queue = image_transfer.BlockingQueue(10, 30)
self.assertEqual(0, queue.tell())
queue.get = mock.Mock(return_value=[1] * 10)
queue.read(10)
self.assertEqual(10, queue.tell())
class ImageWriterTest(base.TestCase):
"""Tests for ImageWriter class."""
def _create_image_writer(self):
self.image_service = mock.Mock()
self.context = mock.Mock()
self.input_file = mock.Mock()
self.image_id = mock.Mock()
return image_transfer.ImageWriter(self.context, self.input_file,
self.image_service, self.image_id)
@mock.patch.object(greenthread, 'sleep')
def test_start(self, mock_sleep):
writer = self._create_image_writer()
status_list = ['queued', 'saving', 'active']
def image_service_show_side_effect(context, image_id):
status = status_list.pop(0)
return {'status': status}
self.image_service.show.side_effect = image_service_show_side_effect
exp_calls = [mock.call(self.context, self.image_id)] * len(status_list)
writer.start()
self.assertTrue(writer.wait())
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.assertEqual(exp_calls, self.image_service.show.call_args_list)
def test_start_with_killed_status(self):
writer = self._create_image_writer()
def image_service_show_side_effect(_context, _image_id):
return {'status': 'killed'}
self.image_service.show.side_effect = image_service_show_side_effect
writer.start()
self.assertRaises(exceptions.ImageTransferException,
writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
def test_start_with_unknown_status(self):
writer = self._create_image_writer()
def image_service_show_side_effect(_context, _image_id):
return {'status': 'unknown'}
self.image_service.show.side_effect = image_service_show_side_effect
writer.start()
self.assertRaises(exceptions.ImageTransferException,
writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
def test_start_with_image_service_show_exception(self):
writer = self._create_image_writer()
self.image_service.show.side_effect = RuntimeError()
writer.start()
self.assertRaises(exceptions.ImageTransferException, writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
class FileReadWriteTaskTest(base.TestCase):
"""Tests for FileReadWriteTask class."""
def test_start(self):
data_items = [[1] * 10, [1] * 20, [1] * 5, []]
def input_file_read_side_effect(arg):
self.assertEqual(arg, rw_handles.READ_CHUNKSIZE)
data = data_items[input_file_read_side_effect.i]
input_file_read_side_effect.i += 1
return data
input_file_read_side_effect.i = 0
input_file = mock.Mock()
input_file.read.side_effect = input_file_read_side_effect
output_file = mock.Mock()
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
rw_task.start()
self.assertTrue(rw_task.wait())
self.assertEqual(len(data_items), input_file.read.call_count)
exp_calls = []
for i in range(0, len(data_items)):
exp_calls.append(mock.call(data_items[i]))
self.assertEqual(exp_calls, output_file.write.call_args_list)
self.assertEqual(len(data_items),
input_file.update_progress.call_count)
self.assertEqual(len(data_items),
output_file.update_progress.call_count)
def test_start_with_read_exception(self):
input_file = mock.Mock()
input_file.read.side_effect = RuntimeError()
output_file = mock.Mock()
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
rw_task.start()
self.assertRaises(exceptions.ImageTransferException, rw_task.wait)
input_file.read.assert_called_once_with(rw_handles.READ_CHUNKSIZE)
class ImageTransferUtilityTest(base.TestCase):
"""Tests for image_transfer utility methods."""
@mock.patch.object(timeout, 'Timeout')
@mock.patch('oslo_vmware.image_transfer.ImageWriter')
@mock.patch('oslo_vmware.image_transfer.FileReadWriteTask')
@mock.patch('oslo_vmware.image_transfer.BlockingQueue')
def test_start_transfer(self, fake_BlockingQueue, fake_FileReadWriteTask,
fake_ImageWriter, fake_Timeout):
context = mock.Mock()
read_file_handle = mock.Mock()
read_file_handle.close = mock.Mock()
image_service = mock.Mock()
image_id = mock.Mock()
blocking_queue = mock.Mock()
write_file_handle1 = mock.Mock()
write_file_handle1.close = mock.Mock()
write_file_handle2 = None
write_file_handles = [write_file_handle1, write_file_handle2]
timeout_secs = 10
blocking_queue_size = 10
image_meta = {}
max_data_size = 30
fake_BlockingQueue.return_value = blocking_queue
fake_timer = mock.Mock()
fake_timer.cancel = mock.Mock()
fake_Timeout.return_value = fake_timer
for write_file_handle in write_file_handles:
new_image_transfer._start_transfer(
context,
timeout_secs,
read_file_handle,
max_data_size,
write_file_handle=write_file_handle,
image_service=image_service,
image_id=image_id,
image_meta=image_meta)
exp_calls = [mock.call(blocking_queue_size,
max_data_size)] * len(write_file_handles)
self.assertEqual(exp_calls,
fake_BlockingQueue.call_args_list)
exp_calls2 = [mock.call(read_file_handle, blocking_queue),
mock.call(blocking_queue, write_file_handle1),
mock.call(read_file_handle, blocking_queue)]
self.assertEqual(exp_calls2,
fake_FileReadWriteTask.call_args_list)
exp_calls3 = mock.call(context, blocking_queue, image_service,
image_id, image_meta)
self.assertEqual(exp_calls3,
fake_ImageWriter.call_args)
exp_calls4 = [mock.call(timeout_secs)] * len(write_file_handles)
self.assertEqual(exp_calls4,
fake_Timeout.call_args_list)
self.assertEqual(len(write_file_handles),
fake_timer.cancel.call_count)
self.assertEqual(len(write_file_handles),
read_file_handle.close.call_count)
write_file_handle1.close.assert_called_once()
@mock.patch('oslo_vmware.rw_handles.FileWriteHandle')
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
@mock.patch('oslo_vmware.image_transfer._start_transfer')
def test_download_flat_image(
self,
fake_transfer,
fake_rw_handles_ImageReadHandle,
fake_rw_handles_FileWriteHandle):
context = mock.Mock()
image_id = mock.Mock()
image_service = mock.Mock()
image_service.download = mock.Mock()
image_service.download.return_value = 'fake_iter'
fake_ImageReadHandle = 'fake_ImageReadHandle'
fake_FileWriteHandle = 'fake_FileWriteHandle'
cookies = []
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
dc_path = 'dc1'
ds_name = 'ds1'
file_path = '/fake_path'
fake_rw_handles_ImageReadHandle.return_value = fake_ImageReadHandle
fake_rw_handles_FileWriteHandle.return_value = fake_FileWriteHandle
image_transfer.download_flat_image(
context,
timeout_secs,
image_service,
image_id,
image_size=image_size,
host=host,
port=port,
data_center_name=dc_path,
datastore_name=ds_name,
cookies=cookies,
file_path=file_path)
image_service.download.assert_called_once_with(context, image_id)
fake_rw_handles_ImageReadHandle.assert_called_once_with('fake_iter')
fake_rw_handles_FileWriteHandle.assert_called_once_with(
host,
port,
dc_path,
ds_name,
cookies,
file_path,
image_size,
cacerts=None)
fake_transfer.assert_called_once_with(
context,
timeout_secs,
fake_ImageReadHandle,
image_size,
write_file_handle=fake_FileWriteHandle)
@mock.patch('oslo_vmware.rw_handles.VmdkWriteHandle')
@mock.patch('oslo_vmware.image_transfer._start_transfer')
def test_download_stream_optimized_data(self, fake_transfer,
fake_rw_handles_VmdkWriteHandle):
context = mock.Mock()
session = mock.Mock()
read_handle = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
resource_pool = 'rp-1'
vm_folder = 'folder-1'
vm_import_spec = None
fake_VmdkWriteHandle = mock.Mock()
fake_VmdkWriteHandle.get_imported_vm = mock.Mock()
fake_rw_handles_VmdkWriteHandle.return_value = fake_VmdkWriteHandle
image_transfer.download_stream_optimized_data(
context,
timeout_secs,
read_handle,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
fake_rw_handles_VmdkWriteHandle.assert_called_once_with(
session,
host,
port,
resource_pool,
vm_folder,
vm_import_spec,
image_size)
fake_transfer.assert_called_once_with(
context,
timeout_secs,
read_handle,
image_size,
write_file_handle=fake_VmdkWriteHandle)
fake_VmdkWriteHandle.get_imported_vm.assert_called_once()
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
@mock.patch('oslo_vmware.image_transfer.download_stream_optimized_data')
def test_download_stream_optimized_image(
self, fake_download_stream_optimized_data,
fake_rw_handles_ImageReadHandle):
context = mock.Mock()
session = mock.Mock()
image_id = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
resource_pool = 'rp-1'
vm_folder = 'folder-1'
vm_import_spec = None
fake_iter = 'fake_iter'
image_service = mock.Mock()
image_service.download = mock.Mock()
image_service.download.return_value = fake_iter
fake_ImageReadHandle = 'fake_ImageReadHandle'
fake_rw_handles_ImageReadHandle.return_value = fake_ImageReadHandle
image_transfer.download_stream_optimized_image(
context,
timeout_secs,
image_service,
image_id,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
image_service.download.assert_called_once_with(context, image_id)
fake_rw_handles_ImageReadHandle.assert_called_once_with(fake_iter)
fake_download_stream_optimized_data.assert_called_once_with(
context,
timeout_secs,
fake_ImageReadHandle,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
@mock.patch('oslo_vmware.image_transfer._start_transfer')
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
def test_copy_stream_optimized_disk(
self, vmdk_read_handle, start_transfer):
read_handle = mock.sentinel.read_handle
vmdk_read_handle.return_value = read_handle
context = mock.sentinel.context
timeout = mock.sentinel.timeout
write_handle = mock.Mock(name='/cinder/images/tmpAbcd.vmdk')
session = mock.sentinel.session
host = mock.sentinel.host
port = mock.sentinel.port
vm = mock.sentinel.vm
vmdk_file_path = mock.sentinel.vmdk_file_path
vmdk_size = mock.sentinel.vmdk_size
image_transfer.copy_stream_optimized_disk(
context, timeout, write_handle, session=session, host=host,
port=port, vm=vm, vmdk_file_path=vmdk_file_path,
vmdk_size=vmdk_size)
vmdk_read_handle.assert_called_once_with(
session, host, port, vm, vmdk_file_path, vmdk_size)
start_transfer.assert_called_once_with(
context, timeout, read_handle, vmdk_size,
write_file_handle=write_handle)
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
@mock.patch('oslo_vmware.image_transfer._start_transfer')
def test_upload_image(self, fake_transfer, fake_rw_handles_VmdkReadHandle):
context = mock.Mock()
image_id = mock.Mock()
owner_id = mock.Mock()
session = mock.Mock()
vm = mock.Mock()
image_service = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
file_path = '/fake_path'
is_public = False
image_name = 'fake_image'
image_version = 1
fake_VmdkReadHandle = 'fake_VmdkReadHandle'
fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle
image_transfer.upload_image(context,
timeout_secs,
image_service,
image_id,
owner_id,
session=session,
host=host,
port=port,
vm=vm,
vmdk_file_path=file_path,
vmdk_size=image_size,
is_public=is_public,
image_name=image_name,
image_version=image_version)
fake_rw_handles_VmdkReadHandle.assert_called_once_with(session,
host,
port,
vm,
file_path,
image_size)
image_metadata = {'disk_format': 'vmdk',
'is_public': is_public,
'name': image_name,
'status': 'active',
'container_format': 'bare',
'size': 0,
'properties': {'vmware_image_version': image_version,
'vmware_disktype': 'streamOptimized',
'owner_id': owner_id}}
fake_transfer.assert_called_once_with(context,
timeout_secs,
fake_VmdkReadHandle,
0,
image_service=image_service,
image_id=image_id,
image_meta=image_metadata)

View File

@ -1,174 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for PBM utility methods.
"""
import os
import mock
import six.moves.urllib.parse as urlparse
import six.moves.urllib.request as urllib
from oslo.vmware import pbm
from oslo_vmware import pbm as new_pbm
from tests import base
class PBMUtilityTest(base.TestCase):
"""Tests for PBM utility methods."""
def test_get_all_profiles(self):
session = mock.Mock()
session.pbm = mock.Mock()
profile_ids = mock.Mock()
def invoke_api_side_effect(module, method, *args, **kwargs):
self.assertEqual(session.pbm, module)
self.assertTrue(method in ['PbmQueryProfile',
'PbmRetrieveContent'])
self.assertEqual(session.pbm.service_content.profileManager,
args[0])
if method == 'PbmQueryProfile':
self.assertEqual('STORAGE',
kwargs['resourceType'].resourceType)
return profile_ids
self.assertEqual(profile_ids, kwargs['profileIds'])
session.invoke_api.side_effect = invoke_api_side_effect
pbm.get_all_profiles(session)
self.assertEqual(2, session.invoke_api.call_count)
def test_get_all_profiles_with_no_profiles(self):
session = mock.Mock()
session.pbm = mock.Mock()
session.invoke_api.return_value = []
profiles = pbm.get_all_profiles(session)
session.invoke_api.assert_called_once_with(
session.pbm,
'PbmQueryProfile',
session.pbm.service_content.profileManager,
resourceType=session.pbm.client.factory.create())
self.assertEqual([], profiles)
def _create_profile(self, profile_id, name):
profile = mock.Mock()
profile.profileId = profile_id
profile.name = name
return profile
@mock.patch('oslo_vmware.pbm.get_all_profiles')
def test_get_profile_id_by_name(self, get_all_profiles):
profiles = [self._create_profile(str(i), 'profile-%d' % i)
for i in range(0, 10)]
get_all_profiles.return_value = profiles
session = mock.Mock()
exp_profile_id = '5'
profile_id = pbm.get_profile_id_by_name(session,
'profile-%s' % exp_profile_id)
self.assertEqual(exp_profile_id, profile_id)
get_all_profiles.assert_called_once_with(session)
@mock.patch('oslo_vmware.pbm.get_all_profiles')
def test_get_profile_id_by_name_with_invalid_profile(self,
get_all_profiles):
profiles = [self._create_profile(str(i), 'profile-%d' % i)
for i in range(0, 10)]
get_all_profiles.return_value = profiles
session = mock.Mock()
profile_id = pbm.get_profile_id_by_name(session,
('profile-%s' % 11))
self.assertFalse(profile_id)
get_all_profiles.assert_called_once_with(session)
def test_filter_hubs_by_profile(self):
pbm_client = mock.Mock()
session = mock.Mock()
session.pbm = pbm_client
hubs = mock.Mock()
profile_id = 'profile-0'
pbm.filter_hubs_by_profile(session, hubs, profile_id)
session.invoke_api.assert_called_once_with(
pbm_client,
'PbmQueryMatchingHub',
pbm_client.service_content.placementSolver,
hubsToSearch=hubs,
profile=profile_id)
def _create_datastore(self, value):
ds = mock.Mock()
ds.value = value
return ds
def test_convert_datastores_to_hubs(self):
ds_values = []
datastores = []
for i in range(0, 10):
value = "ds-%d" % i
ds_values.append(value)
datastores.append(self._create_datastore(value))
pbm_client_factory = mock.Mock()
pbm_client_factory.create.side_effect = lambda *args: mock.Mock()
hubs = pbm.convert_datastores_to_hubs(pbm_client_factory, datastores)
self.assertEqual(len(datastores), len(hubs))
hub_ids = [hub.hubId for hub in hubs]
self.assertEqual(set(ds_values), set(hub_ids))
def test_filter_datastores_by_hubs(self):
ds_values = []
datastores = []
for i in range(0, 10):
value = "ds-%d" % i
ds_values.append(value)
datastores.append(self._create_datastore(value))
hubs = []
hub_ids = ds_values[0:int(len(ds_values) / 2)]
for hub_id in hub_ids:
hub = mock.Mock()
hub.hubId = hub_id
hubs.append(hub)
filtered_ds = pbm.filter_datastores_by_hubs(hubs, datastores)
self.assertEqual(len(hubs), len(filtered_ds))
filtered_ds_values = [ds.value for ds in filtered_ds]
self.assertEqual(set(hub_ids), set(filtered_ds_values))
def test_get_pbm_wsdl_location(self):
wsdl = pbm.get_pbm_wsdl_location(None)
self.assertIsNone(wsdl)
def expected_wsdl(version):
driver_abs_dir = os.path.abspath(os.path.dirname(new_pbm.__file__))
path = os.path.join(driver_abs_dir, 'wsdl', version,
'pbmService.wsdl')
return urlparse.urljoin('file:', urllib.pathname2url(path))
with mock.patch('os.path.exists') as path_exists:
path_exists.return_value = True
wsdl = pbm.get_pbm_wsdl_location('5')
self.assertEqual(expected_wsdl('5'), wsdl)
wsdl = pbm.get_pbm_wsdl_location('5.5')
self.assertEqual(expected_wsdl('5.5'), wsdl)
wsdl = pbm.get_pbm_wsdl_location('5.5.1')
self.assertEqual(expected_wsdl('5.5'), wsdl)
path_exists.return_value = False
wsdl = pbm.get_pbm_wsdl_location('5.5')
self.assertIsNone(wsdl)

View File

@ -1,302 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for read and write handles for image transfer.
"""
import mock
import six
from oslo.vmware import exceptions
from oslo.vmware import rw_handles
from oslo_vmware import vim_util as new_vim_util
from tests import base
class FileHandleTest(base.TestCase):
"""Tests for FileHandle."""
def test_close(self):
file_handle = mock.Mock()
vmw_http_file = rw_handles.FileHandle(file_handle)
vmw_http_file.close()
file_handle.close.assert_called_once_with()
def test_find_vmdk_url(self):
device_url_0 = mock.Mock()
device_url_0.disk = False
device_url_1 = mock.Mock()
device_url_1.disk = True
device_url_1.url = 'https://*/ds1/vm1.vmdk'
lease_info = mock.Mock()
lease_info.deviceUrl = [device_url_0, device_url_1]
host = '10.1.2.3'
port = 443
exp_url = 'https://%s:%d/ds1/vm1.vmdk' % (host, port)
vmw_http_file = rw_handles.FileHandle(None)
self.assertEqual(exp_url, vmw_http_file._find_vmdk_url(lease_info,
host,
port))
class FileWriteHandleTest(base.TestCase):
"""Tests for FileWriteHandle."""
def setUp(self):
super(FileWriteHandleTest, self).setUp()
vim_cookie = mock.Mock()
vim_cookie.name = 'name'
vim_cookie.value = 'value'
self._conn = mock.Mock()
patcher = mock.patch(
'urllib3.connection.HTTPConnection')
self.addCleanup(patcher.stop)
HTTPConnectionMock = patcher.start()
HTTPConnectionMock.return_value = self._conn
self.vmw_http_write_file = rw_handles.FileWriteHandle(
'10.1.2.3', 443, 'dc-0', 'ds-0', [vim_cookie], '1.vmdk', 100,
'http')
def test_write(self):
self.vmw_http_write_file.write(None)
self._conn.send.assert_called_once_with(None)
def test_close(self):
self.vmw_http_write_file.close()
self._conn.getresponse.assert_called_once_with()
self._conn.close.assert_called_once_with()
class VmdkWriteHandleTest(base.TestCase):
"""Tests for VmdkWriteHandle."""
def setUp(self):
super(VmdkWriteHandleTest, self).setUp()
self._conn = mock.Mock()
patcher = mock.patch(
'urllib3.connection.HTTPConnection')
self.addCleanup(patcher.stop)
HTTPConnectionMock = patcher.start()
HTTPConnectionMock.return_value = self._conn
def _create_mock_session(self, disk=True, progress=-1):
device_url = mock.Mock()
device_url.disk = disk
device_url.url = 'http://*/ds/disk1.vmdk'
lease_info = mock.Mock()
lease_info.deviceUrl = [device_url]
session = mock.Mock()
def session_invoke_api_side_effect(module, method, *args, **kwargs):
if module == session.vim:
if method == 'ImportVApp':
return mock.Mock()
elif method == 'HttpNfcLeaseProgress':
self.assertEqual(progress, kwargs['percent'])
return
return lease_info
session.invoke_api.side_effect = session_invoke_api_side_effect
vim_cookie = mock.Mock()
vim_cookie.name = 'name'
vim_cookie.value = 'value'
session.vim.client.options.transport.cookiejar = [vim_cookie]
return session
def test_init_failure(self):
session = self._create_mock_session(False)
self.assertRaises(exceptions.VimException,
rw_handles.VmdkWriteHandle,
session,
'10.1.2.3',
443,
'rp-1',
'folder-1',
None,
100)
def test_write(self):
session = self._create_mock_session()
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
'rp-1', 'folder-1', None,
100)
data = [1] * 10
handle.write(data)
self.assertEqual(len(data), handle._bytes_written)
self._conn.send.assert_called_once_with(data)
def test_update_progress(self):
vmdk_size = 100
data_size = 10
session = self._create_mock_session(True, 10)
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
'rp-1', 'folder-1', None,
vmdk_size)
handle.write([1] * data_size)
handle.update_progress()
def test_update_progress_with_error(self):
session = self._create_mock_session(True, 10)
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
'rp-1', 'folder-1', None,
100)
session.invoke_api.side_effect = exceptions.VimException(None)
self.assertRaises(exceptions.VimException, handle.update_progress)
def test_close(self):
session = self._create_mock_session()
handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443,
'rp-1', 'folder-1', None,
100)
def session_invoke_api_side_effect(module, method, *args, **kwargs):
if module == new_vim_util and method == 'get_object_property':
return 'ready'
self.assertEqual(session.vim, module)
self.assertEqual('HttpNfcLeaseComplete', method)
session.invoke_api = mock.Mock(
side_effect=session_invoke_api_side_effect)
handle.close()
self.assertEqual(2, session.invoke_api.call_count)
class VmdkReadHandleTest(base.TestCase):
"""Tests for VmdkReadHandle."""
def setUp(self):
super(VmdkReadHandleTest, self).setUp()
send_patcher = mock.patch('requests.sessions.Session.send')
self.addCleanup(send_patcher.stop)
send_mock = send_patcher.start()
self._response = mock.Mock()
send_mock.return_value = self._response
def _create_mock_session(self, disk=True, progress=-1):
device_url = mock.Mock()
device_url.disk = disk
device_url.url = 'http://*/ds/disk1.vmdk'
lease_info = mock.Mock()
lease_info.deviceUrl = [device_url]
session = mock.Mock()
def session_invoke_api_side_effect(module, method, *args, **kwargs):
if module == session.vim:
if method == 'ExportVm':
return mock.Mock()
elif method == 'HttpNfcLeaseProgress':
self.assertEqual(progress, kwargs['percent'])
return
return lease_info
session.invoke_api.side_effect = session_invoke_api_side_effect
vim_cookie = mock.Mock()
vim_cookie.name = 'name'
vim_cookie.value = 'value'
session.vim.client.options.transport.cookiejar = [vim_cookie]
return session
def test_init_failure(self):
session = self._create_mock_session(False)
self.assertRaises(exceptions.VimException,
rw_handles.VmdkReadHandle,
session,
'10.1.2.3',
443,
'vm-1',
'[ds] disk1.vmdk',
100)
def test_read(self):
chunk_size = rw_handles.READ_CHUNKSIZE
session = self._create_mock_session()
self._response.raw.read.return_value = [1] * chunk_size
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,
'vm-1', '[ds] disk1.vmdk',
chunk_size * 10)
handle.read(chunk_size)
self.assertEqual(chunk_size, handle._bytes_read)
self._response.raw.read.assert_called_once_with(chunk_size)
def test_update_progress(self):
chunk_size = rw_handles.READ_CHUNKSIZE
vmdk_size = chunk_size * 10
session = self._create_mock_session(True, 10)
self._response.raw.read.return_value = [1] * chunk_size
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,
'vm-1', '[ds] disk1.vmdk',
vmdk_size)
handle.read(chunk_size)
handle.update_progress()
self._response.raw.read.assert_called_once_with(chunk_size)
def test_update_progress_with_error(self):
session = self._create_mock_session(True, 10)
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,
'vm-1', '[ds] disk1.vmdk',
100)
session.invoke_api.side_effect = exceptions.VimException(None)
self.assertRaises(exceptions.VimException, handle.update_progress)
def test_close(self):
session = self._create_mock_session()
handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,
'vm-1', '[ds] disk1.vmdk',
100)
def session_invoke_api_side_effect(module, method, *args, **kwargs):
if module == new_vim_util and method == 'get_object_property':
return 'ready'
self.assertEqual(session.vim, module)
self.assertEqual('HttpNfcLeaseComplete', method)
session.invoke_api = mock.Mock(
side_effect=session_invoke_api_side_effect)
handle.close()
self.assertEqual(2, session.invoke_api.call_count)
class ImageReadHandleTest(base.TestCase):
"""Tests for ImageReadHandle."""
def test_read(self):
max_items = 10
item = [1] * 10
class ImageReadIterator(six.Iterator):
def __init__(self):
self.num_items = 0
def __iter__(self):
return self
def __next__(self):
if (self.num_items < max_items):
self.num_items += 1
return item
raise StopIteration
next = __next__
handle = rw_handles.ImageReadHandle(ImageReadIterator())
for _ in range(0, max_items):
self.assertEqual(item, handle.read(10))
self.assertFalse(handle.read(10))

View File

@ -1,445 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
import six
import six.moves.http_client as httplib
import suds
from oslo.vmware import exceptions
from oslo.vmware import service
from tests import base
class ServiceMessagePluginTest(base.TestCase):
"""Test class for ServiceMessagePlugin."""
def test_add_attribute_for_value(self):
node = mock.Mock()
node.name = 'value'
plugin = service.ServiceMessagePlugin()
plugin.add_attribute_for_value(node)
node.set.assert_called_once_with('xsi:type', 'xsd:string')
def test_marshalled(self):
plugin = service.ServiceMessagePlugin()
context = mock.Mock()
plugin.marshalled(context)
context.envelope.prune.assert_called_once_with()
context.envelope.walk.assert_called_once_with(
plugin.add_attribute_for_value)
class ServiceTest(base.TestCase):
def setUp(self):
super(ServiceTest, self).setUp()
patcher = mock.patch('suds.client.Client')
self.addCleanup(patcher.stop)
self.SudsClientMock = patcher.start()
def test_retrieve_properties_ex_fault_checker_with_empty_response(self):
try:
service.Service._retrieve_properties_ex_fault_checker(None)
assert False
except exceptions.VimFaultException as ex:
self.assertEqual([exceptions.NOT_AUTHENTICATED],
ex.fault_list)
def test_retrieve_properties_ex_fault_checker(self):
fault_list = ['FileFault', 'VimFault']
missing_set = []
for fault in fault_list:
missing_elem = mock.Mock()
missing_elem.fault.fault.__class__.__name__ = fault
missing_set.append(missing_elem)
obj_cont = mock.Mock()
obj_cont.missingSet = missing_set
response = mock.Mock()
response.objects = [obj_cont]
try:
service.Service._retrieve_properties_ex_fault_checker(response)
assert False
except exceptions.VimFaultException as ex:
self.assertEqual(fault_list, ex.fault_list)
def test_request_handler(self):
managed_object = 'VirtualMachine'
resp = mock.Mock()
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
return resp
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
ret = svc_obj.powerOn(managed_object)
self.assertEqual(resp, ret)
def test_request_handler_with_retrieve_properties_ex_fault(self):
managed_object = 'Datacenter'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
return None
svc_obj = service.Service()
attr_name = 'retrievePropertiesEx'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimFaultException,
svc_obj.retrievePropertiesEx,
managed_object)
def test_request_handler_with_web_fault(self):
managed_object = 'VirtualMachine'
fault_list = ['Fault']
doc = mock.Mock()
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
fault = mock.Mock(faultstring="MyFault")
fault_children = mock.Mock()
fault_children.name = "name"
fault_children.getText.return_value = "value"
child = mock.Mock()
child.get.return_value = fault_list[0]
child.getChildren.return_value = [fault_children]
detail = mock.Mock()
detail.getChildren.return_value = [child]
doc.childAtPath.return_value = detail
raise suds.WebFault(fault, doc)
svc_obj = service.Service()
service_mock = svc_obj.client.service
setattr(service_mock, 'powerOn', side_effect)
ex = self.assertRaises(exceptions.VimFaultException, svc_obj.powerOn,
managed_object)
self.assertEqual(fault_list, ex.fault_list)
self.assertEqual({'name': 'value'}, ex.details)
self.assertEqual("MyFault", ex.msg)
doc.childAtPath.assertCalledOnceWith('/detail')
def test_request_handler_with_empty_web_fault_doc(self):
def side_effect(mo, **kwargs):
fault = mock.Mock(faultstring="MyFault")
raise suds.WebFault(fault, None)
svc_obj = service.Service()
service_mock = svc_obj.client.service
setattr(service_mock, 'powerOn', side_effect)
ex = self.assertRaises(exceptions.VimFaultException,
svc_obj.powerOn,
'VirtualMachine')
self.assertEqual([], ex.fault_list)
self.assertEqual({}, ex.details)
self.assertEqual("MyFault", ex.msg)
def test_request_handler_with_vc51_web_fault(self):
managed_object = 'VirtualMachine'
fault_list = ['Fault']
doc = mock.Mock()
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
fault = mock.Mock(faultstring="MyFault")
fault_children = mock.Mock()
fault_children.name = "name"
fault_children.getText.return_value = "value"
child = mock.Mock()
child.get.return_value = fault_list[0]
child.getChildren.return_value = [fault_children]
detail = mock.Mock()
detail.getChildren.return_value = [child]
doc.childAtPath.side_effect = [None, detail]
raise suds.WebFault(fault, doc)
svc_obj = service.Service()
service_mock = svc_obj.client.service
setattr(service_mock, 'powerOn', side_effect)
ex = self.assertRaises(exceptions.VimFaultException, svc_obj.powerOn,
managed_object)
self.assertEqual(fault_list, ex.fault_list)
self.assertEqual({'name': 'value'}, ex.details)
self.assertEqual("MyFault", ex.msg)
exp_calls = [mock.call('/detail'),
mock.call('/Envelope/Body/Fault/detail')]
self.assertEqual(exp_calls, doc.childAtPath.call_args_list)
def test_request_handler_with_attribute_error(self):
managed_object = 'VirtualMachine'
svc_obj = service.Service()
# no powerOn method in Service
service_mock = mock.Mock(spec=service.Service)
svc_obj.client.service = service_mock
self.assertRaises(exceptions.VimAttributeException,
svc_obj.powerOn,
managed_object)
def test_request_handler_with_http_cannot_send_error(self):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise httplib.CannotSendRequest()
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimSessionOverLoadException,
svc_obj.powerOn,
managed_object)
def test_request_handler_with_http_response_not_ready_error(self):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise httplib.ResponseNotReady()
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimSessionOverLoadException,
svc_obj.powerOn,
managed_object)
def test_request_handler_with_http_cannot_send_header_error(self):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise httplib.CannotSendHeader()
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimSessionOverLoadException,
svc_obj.powerOn,
managed_object)
def test_request_handler_with_connection_error(self):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise requests.ConnectionError()
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimConnectionException,
svc_obj.powerOn,
managed_object)
def test_request_handler_with_http_error(self):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise requests.HTTPError()
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exceptions.VimConnectionException,
svc_obj.powerOn,
managed_object)
@mock.patch('oslo_vmware.vim_util.get_moref', return_value=None)
def test_request_handler_no_value(self, mock_moref):
managed_object = 'VirtualMachine'
svc_obj = service.Service()
ret = svc_obj.UnregisterVM(managed_object)
self.assertIsNone(ret)
def _test_request_handler_with_exception(self, message, exception):
managed_object = 'VirtualMachine'
def side_effect(mo, **kwargs):
self.assertEqual(managed_object, mo._type)
self.assertEqual(managed_object, mo.value)
raise Exception(message)
svc_obj = service.Service()
attr_name = 'powerOn'
service_mock = svc_obj.client.service
setattr(service_mock, attr_name, side_effect)
self.assertRaises(exception, svc_obj.powerOn, managed_object)
def test_request_handler_with_address_in_use_error(self):
self._test_request_handler_with_exception(
service.ADDRESS_IN_USE_ERROR,
exceptions.VimSessionOverLoadException)
def test_request_handler_with_conn_abort_error(self):
self._test_request_handler_with_exception(
service.CONN_ABORT_ERROR, exceptions.VimSessionOverLoadException)
def test_request_handler_with_resp_not_xml_error(self):
self._test_request_handler_with_exception(
service.RESP_NOT_XML_ERROR, exceptions.VimSessionOverLoadException)
def test_request_handler_with_generic_error(self):
self._test_request_handler_with_exception(
'GENERIC_ERROR', exceptions.VimException)
def test_get_session_cookie(self):
svc_obj = service.Service()
cookie_value = 'xyz'
cookie = mock.Mock()
cookie.name = 'vmware_soap_session'
cookie.value = cookie_value
svc_obj.client.options.transport.cookiejar = [cookie]
self.assertEqual(cookie_value, svc_obj.get_http_cookie())
def test_get_session_cookie_with_no_cookie(self):
svc_obj = service.Service()
cookie = mock.Mock()
cookie.name = 'cookie'
cookie.value = 'xyz'
svc_obj.client.options.transport.cookiejar = [cookie]
self.assertIsNone(svc_obj.get_http_cookie())
class MemoryCacheTest(base.TestCase):
"""Test class for MemoryCache."""
def test_get_set(self):
cache = service.MemoryCache()
cache.put('key1', 'value1')
cache.put('key2', 'value2')
self.assertEqual('value1', cache.get('key1'))
self.assertEqual('value2', cache.get('key2'))
self.assertEqual(None, cache.get('key3'))
@mock.patch('suds.reader.DocumentReader.download')
def test_shared_cache(self, mock_reader):
cache1 = service.Service().client.options.cache
cache2 = service.Service().client.options.cache
self.assertIs(cache1, cache2)
@mock.patch('oslo_utils.timeutils.utcnow_ts')
def test_cache_timeout(self, mock_utcnow_ts):
mock_utcnow_ts.side_effect = [100, 125, 150, 175, 195, 200, 225]
cache = service.MemoryCache()
cache.put('key1', 'value1', 10)
cache.put('key2', 'value2', 75)
cache.put('key3', 'value3', 100)
self.assertIsNone(cache.get('key1'))
self.assertEqual('value2', cache.get('key2'))
self.assertIsNone(cache.get('key2'))
self.assertEqual('value3', cache.get('key3'))
class RequestsTransportTest(base.TestCase):
"""Tests for RequestsTransport."""
def test_open(self):
transport = service.RequestsTransport()
data = "Hello World"
resp = mock.Mock(content=data)
transport.session.get = mock.Mock(return_value=resp)
request = mock.Mock(url=mock.sentinel.url)
self.assertEqual(data,
transport.open(request).getvalue())
transport.session.get.assert_called_once_with(mock.sentinel.url,
verify=transport.verify)
def test_send(self):
transport = service.RequestsTransport()
resp = mock.Mock(status_code=mock.sentinel.status_code,
headers=mock.sentinel.headers,
content=mock.sentinel.content)
transport.session.post = mock.Mock(return_value=resp)
request = mock.Mock(url=mock.sentinel.url,
message=mock.sentinel.message,
headers=mock.sentinel.req_headers)
reply = transport.send(request)
self.assertEqual(mock.sentinel.status_code, reply.code)
self.assertEqual(mock.sentinel.headers, reply.headers)
self.assertEqual(mock.sentinel.content, reply.message)
@mock.patch('os.path.getsize')
def test_send_with_local_file_url(self, get_size_mock):
transport = service.RequestsTransport()
url = 'file:///foo'
request = requests.PreparedRequest()
request.url = url
data = b"Hello World"
get_size_mock.return_value = len(data)
def readinto_mock(buf):
buf[0:] = data
if six.PY3:
builtin_open = 'builtins.open'
open_mock = mock.MagicMock(name='file_handle',
spec=open)
import _io
file_spec = list(set(dir(_io.TextIOWrapper)).union(
set(dir(_io.BytesIO))))
else:
builtin_open = '__builtin__.open'
open_mock = mock.MagicMock(name='file_handle',
spec=file)
file_spec = file
file_handle = mock.MagicMock(spec=file_spec)
file_handle.write.return_value = None
file_handle.__enter__.return_value = file_handle
file_handle.readinto.side_effect = readinto_mock
open_mock.return_value = file_handle
with mock.patch(builtin_open, open_mock, create=True):
resp = transport.session.send(request)
self.assertEqual(data, resp.content)

View File

@ -1,110 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for classes to invoke VMware VI SOAP calls.
"""
import mock
from oslo_i18n import fixture as i18n_fixture
from oslo.vmware import exceptions
from oslo.vmware import vim
from oslo_vmware._i18n import _
from tests import base
class VimTest(base.TestCase):
"""Test class for Vim."""
def setUp(self):
super(VimTest, self).setUp()
patcher = mock.patch('suds.client.Client')
self.addCleanup(patcher.stop)
self.SudsClientMock = patcher.start()
self.useFixture(i18n_fixture.ToggleLazy(True))
@mock.patch.object(vim.Vim, '__getattr__', autospec=True)
def test_service_content(self, getattr_mock):
getattr_ret = mock.Mock()
getattr_mock.side_effect = lambda *args: getattr_ret
vim_obj = vim.Vim()
vim_obj.service_content
getattr_mock.assert_called_once_with(vim_obj, 'RetrieveServiceContent')
getattr_ret.assert_called_once_with('ServiceInstance')
self.assertEqual(self.SudsClientMock.return_value, vim_obj.client)
self.assertEqual(getattr_ret.return_value, vim_obj.service_content)
def test_exception_summary_exception_as_list(self):
# assert that if a list is fed to the VimException object
# that it will error.
self.assertRaises(ValueError,
exceptions.VimException,
[], ValueError('foo'))
def test_exception_summary_string(self):
e = exceptions.VimException(_("string"), ValueError("foo"))
string = str(e)
self.assertEqual("string\nCause: foo", string)
def test_vim_fault_exception_string(self):
self.assertRaises(ValueError,
exceptions.VimFaultException,
"bad", ValueError("argument"))
def test_vim_fault_exception(self):
vfe = exceptions.VimFaultException([ValueError("example")], _("cause"))
string = str(vfe)
self.assertEqual("cause\nFaults: [ValueError('example',)]", string)
def test_vim_fault_exception_with_cause_and_details(self):
vfe = exceptions.VimFaultException([ValueError("example")],
"MyMessage",
"FooBar",
{'foo': 'bar'})
string = str(vfe)
self.assertEqual("MyMessage\n"
"Cause: FooBar\n"
"Faults: [ValueError('example',)]\n"
"Details: {'foo': 'bar'}",
string)
def test_configure_non_default_host_port(self):
vim_obj = vim.Vim('https', 'www.test.com', 12345)
self.assertEqual('https://www.test.com:12345/sdk/vimService.wsdl',
vim_obj.wsdl_url)
self.assertEqual('https://www.test.com:12345/sdk',
vim_obj.soap_url)
def test_configure_ipv6(self):
vim_obj = vim.Vim('https', '::1')
self.assertEqual('https://[::1]/sdk/vimService.wsdl',
vim_obj.wsdl_url)
self.assertEqual('https://[::1]/sdk',
vim_obj.soap_url)
def test_configure_ipv6_and_non_default_host_port(self):
vim_obj = vim.Vim('https', '::1', 12345)
self.assertEqual('https://[::1]:12345/sdk/vimService.wsdl',
vim_obj.wsdl_url)
self.assertEqual('https://[::1]:12345/sdk',
vim_obj.soap_url)
def test_configure_with_wsdl_url_override(self):
vim_obj = vim.Vim('https', 'www.example.com',
wsdl_url='https://test.com/sdk/vimService.wsdl')
self.assertEqual('https://test.com/sdk/vimService.wsdl',
vim_obj.wsdl_url)
self.assertEqual('https://www.example.com/sdk', vim_obj.soap_url)

View File

@ -1,359 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for VMware API utility module.
"""
import collections
import mock
from oslo.vmware import vim_util
from tests import base
class VimUtilTest(base.TestCase):
"""Test class for utility methods in vim_util."""
def test_get_moref(self):
moref = vim_util.get_moref("vm-0", "VirtualMachine")
self.assertEqual("vm-0", moref.value)
self.assertEqual("VirtualMachine", moref._type)
def test_build_selection_spec(self):
client_factory = mock.Mock()
sel_spec = vim_util.build_selection_spec(client_factory, "test")
self.assertEqual("test", sel_spec.name)
def test_build_traversal_spec(self):
client_factory = mock.Mock()
sel_spec = mock.Mock()
traversal_spec = vim_util.build_traversal_spec(client_factory,
'dc_to_hf',
'Datacenter',
'hostFolder', False,
[sel_spec])
self.assertEqual("dc_to_hf", traversal_spec.name)
self.assertEqual("hostFolder", traversal_spec.path)
self.assertEqual([sel_spec], traversal_spec.selectSet)
self.assertFalse(traversal_spec.skip)
self.assertEqual("Datacenter", traversal_spec.type)
@mock.patch('oslo_vmware.vim_util.build_selection_spec')
def test_build_recursive_traversal_spec(self, build_selection_spec_mock):
sel_spec = mock.Mock()
rp_to_rp_sel_spec = mock.Mock()
rp_to_vm_sel_spec = mock.Mock()
def build_sel_spec_side_effect(client_factory, name):
if name == 'visitFolders':
return sel_spec
elif name == 'rp_to_rp':
return rp_to_rp_sel_spec
elif name == 'rp_to_vm':
return rp_to_vm_sel_spec
else:
return None
build_selection_spec_mock.side_effect = build_sel_spec_side_effect
traversal_spec_dict = {'dc_to_hf': {'type': 'Datacenter',
'path': 'hostFolder',
'skip': False,
'selectSet': [sel_spec]},
'dc_to_vmf': {'type': 'Datacenter',
'path': 'vmFolder',
'skip': False,
'selectSet': [sel_spec]},
'dc_to_netf': {'type': 'Datacenter',
'path': 'networkFolder',
'skip': False,
'selectSet': [sel_spec]},
'h_to_vm': {'type': 'HostSystem',
'path': 'vm',
'skip': False,
'selectSet': [sel_spec]},
'cr_to_h': {'type': 'ComputeResource',
'path': 'host',
'skip': False,
'selectSet': []},
'cr_to_ds': {'type': 'ComputeResource',
'path': 'datastore',
'skip': False,
'selectSet': []},
'cr_to_rp': {'type': 'ComputeResource',
'path': 'resourcePool',
'skip': False,
'selectSet': [rp_to_rp_sel_spec,
rp_to_vm_sel_spec]},
'cr_to_rp': {'type': 'ComputeResource',
'path': 'resourcePool',
'skip': False,
'selectSet': [rp_to_rp_sel_spec,
rp_to_vm_sel_spec]},
'ccr_to_h': {'type': 'ClusterComputeResource',
'path': 'host',
'skip': False,
'selectSet': []},
'ccr_to_ds': {'type': 'ClusterComputeResource',
'path': 'datastore',
'skip': False,
'selectSet': []},
'ccr_to_rp': {'type': 'ClusterComputeResource',
'path': 'resourcePool',
'skip': False,
'selectSet': [rp_to_rp_sel_spec,
rp_to_vm_sel_spec]},
'rp_to_rp': {'type': 'ResourcePool',
'path': 'resourcePool',
'skip': False,
'selectSet': [rp_to_rp_sel_spec,
rp_to_vm_sel_spec]},
'rp_to_vm': {'type': 'ResourcePool',
'path': 'vm',
'skip': False,
'selectSet': [rp_to_rp_sel_spec,
rp_to_vm_sel_spec]},
}
client_factory = mock.Mock()
client_factory.create.side_effect = lambda ns: mock.Mock()
trav_spec = vim_util.build_recursive_traversal_spec(client_factory)
self.assertEqual("visitFolders", trav_spec.name)
self.assertEqual("childEntity", trav_spec.path)
self.assertFalse(trav_spec.skip)
self.assertEqual("Folder", trav_spec.type)
self.assertEqual(len(traversal_spec_dict) + 1,
len(trav_spec.selectSet))
for spec in trav_spec.selectSet:
if spec.name not in traversal_spec_dict:
self.assertEqual(sel_spec, spec)
else:
exp_spec = traversal_spec_dict[spec.name]
self.assertEqual(exp_spec['type'], spec.type)
self.assertEqual(exp_spec['path'], spec.path)
self.assertEqual(exp_spec['skip'], spec.skip)
self.assertEqual(exp_spec['selectSet'], spec.selectSet)
def test_build_property_spec(self):
client_factory = mock.Mock()
prop_spec = vim_util.build_property_spec(client_factory)
self.assertFalse(prop_spec.all)
self.assertEqual(["name"], prop_spec.pathSet)
self.assertEqual("VirtualMachine", prop_spec.type)
def test_build_object_spec(self):
client_factory = mock.Mock()
root_folder = mock.Mock()
specs = [mock.Mock()]
obj_spec = vim_util.build_object_spec(client_factory,
root_folder, specs)
self.assertEqual(root_folder, obj_spec.obj)
self.assertEqual(specs, obj_spec.selectSet)
self.assertFalse(obj_spec.skip)
def test_build_property_filter_spec(self):
client_factory = mock.Mock()
prop_specs = [mock.Mock()]
obj_specs = [mock.Mock()]
filter_spec = vim_util.build_property_filter_spec(client_factory,
prop_specs,
obj_specs)
self.assertEqual(obj_specs, filter_spec.objectSet)
self.assertEqual(prop_specs, filter_spec.propSet)
@mock.patch(
'oslo_vmware.vim_util.build_recursive_traversal_spec')
def test_get_objects(self, build_recursive_traversal_spec):
vim = mock.Mock()
trav_spec = mock.Mock()
build_recursive_traversal_spec.return_value = trav_spec
max_objects = 10
_type = "VirtualMachine"
def vim_RetrievePropertiesEx_side_effect(pc, specSet, options):
self.assertTrue(pc is vim.service_content.propertyCollector)
self.assertEqual(max_objects, options.maxObjects)
self.assertEqual(1, len(specSet))
property_filter_spec = specSet[0]
propSet = property_filter_spec.propSet
self.assertEqual(1, len(propSet))
prop_spec = propSet[0]
self.assertFalse(prop_spec.all)
self.assertEqual(["name"], prop_spec.pathSet)
self.assertEqual(_type, prop_spec.type)
objSet = property_filter_spec.objectSet
self.assertEqual(1, len(objSet))
obj_spec = objSet[0]
self.assertTrue(obj_spec.obj is vim.service_content.rootFolder)
self.assertEqual([trav_spec], obj_spec.selectSet)
self.assertFalse(obj_spec.skip)
vim.RetrievePropertiesEx.side_effect = \
vim_RetrievePropertiesEx_side_effect
vim_util.get_objects(vim, _type, max_objects)
self.assertEqual(1, vim.RetrievePropertiesEx.call_count)
def test_get_object_properties_with_empty_moref(self):
vim = mock.Mock()
ret = vim_util.get_object_properties(vim, None, None)
self.assertIsNone(ret)
@mock.patch('oslo_vmware.vim_util.cancel_retrieval')
def test_get_object_properties(self, cancel_retrieval):
vim = mock.Mock()
moref = mock.Mock()
moref._type = "VirtualMachine"
retrieve_result = mock.Mock()
def vim_RetrievePropertiesEx_side_effect(pc, specSet, options):
self.assertTrue(pc is vim.service_content.propertyCollector)
self.assertEqual(1, options.maxObjects)
self.assertEqual(1, len(specSet))
property_filter_spec = specSet[0]
propSet = property_filter_spec.propSet
self.assertEqual(1, len(propSet))
prop_spec = propSet[0]
self.assertTrue(prop_spec.all)
self.assertEqual(['name'], prop_spec.pathSet)
self.assertEqual(moref._type, prop_spec.type)
objSet = property_filter_spec.objectSet
self.assertEqual(1, len(objSet))
obj_spec = objSet[0]
self.assertEqual(moref, obj_spec.obj)
self.assertEqual([], obj_spec.selectSet)
self.assertFalse(obj_spec.skip)
return retrieve_result
vim.RetrievePropertiesEx.side_effect = \
vim_RetrievePropertiesEx_side_effect
res = vim_util.get_object_properties(vim, moref, None)
self.assertEqual(1, vim.RetrievePropertiesEx.call_count)
self.assertTrue(res is retrieve_result.objects)
cancel_retrieval.assert_called_once_with(vim, retrieve_result)
@mock.patch('oslo_vmware.vim_util._get_token')
def test_cancel_retrieval(self, get_token):
token = mock.Mock()
get_token.return_value = token
vim = mock.Mock()
retrieve_result = mock.Mock()
vim_util.cancel_retrieval(vim, retrieve_result)
get_token.assert_called_once_with(retrieve_result)
vim.CancelRetrievePropertiesEx.assert_called_once_with(
vim.service_content.propertyCollector, token=token)
@mock.patch('oslo_vmware.vim_util._get_token')
def test_continue_retrieval(self, get_token):
token = mock.Mock()
get_token.return_value = token
vim = mock.Mock()
retrieve_result = mock.Mock()
vim_util.continue_retrieval(vim, retrieve_result)
get_token.assert_called_once_with(retrieve_result)
vim.ContinueRetrievePropertiesEx.assert_called_once_with(
vim.service_content.propertyCollector, token=token)
@mock.patch('oslo_vmware.vim_util.get_object_properties')
def test_get_object_property(self, get_object_properties):
prop = mock.Mock()
prop.val = "ubuntu-12.04"
properties = mock.Mock()
properties.propSet = [prop]
properties_list = [properties]
get_object_properties.return_value = properties_list
vim = mock.Mock()
moref = mock.Mock()
property_name = 'name'
val = vim_util.get_object_property(vim, moref, property_name)
self.assertEqual(prop.val, val)
get_object_properties.assert_called_once_with(
vim, moref, [property_name])
def test_find_extension(self):
vim = mock.Mock()
ret = vim_util.find_extension(vim, 'fake-key')
self.assertIsNotNone(ret)
service_content = vim.service_content
vim.client.service.FindExtension.assert_called_once_with(
service_content.extensionManager, 'fake-key')
def test_register_extension(self):
vim = mock.Mock()
ret = vim_util.register_extension(vim, 'fake-key', 'fake-type')
self.assertIsNone(ret)
service_content = vim.service_content
vim.client.service.RegisterExtension.assert_called_once_with(
service_content.extensionManager, mock.ANY)
def test_get_vc_version(self):
session = mock.Mock()
expected_version = '6.0.1'
session.vim.service_content.about.version = expected_version
version = vim_util.get_vc_version(session)
self.assertEqual(expected_version, version)
expected_version = '5.5'
session.vim.service_content.about.version = expected_version
version = vim_util.get_vc_version(session)
self.assertEqual(expected_version, version)
def test_get_inventory_path_folders(self):
ObjectContent = collections.namedtuple('ObjectContent', ['propSet'])
DynamicProperty = collections.namedtuple('Property', ['name', 'val'])
obj1 = ObjectContent(propSet=[
DynamicProperty(name='Datacenter', val='dc-1'),
])
obj2 = ObjectContent(propSet=[
DynamicProperty(name='Datacenter', val='folder-2'),
])
obj3 = ObjectContent(propSet=[
DynamicProperty(name='Datacenter', val='folder-1'),
])
objects = ['foo', 'bar', obj1, obj2, obj3]
result = mock.sentinel.objects
result.objects = objects
session = mock.Mock()
session.vim.RetrievePropertiesEx = mock.Mock()
session.vim.RetrievePropertiesEx.return_value = result
entity = mock.Mock()
inv_path = vim_util.get_inventory_path(session.vim, entity, 100)
self.assertEqual('/folder-2/dc-1', inv_path)
def test_get_inventory_path_no_folder(self):
ObjectContent = collections.namedtuple('ObjectContent', ['propSet'])
DynamicProperty = collections.namedtuple('Property', ['name', 'val'])
obj1 = ObjectContent(propSet=[
DynamicProperty(name='Datacenter', val='dc-1'),
])
objects = ['foo', 'bar', obj1]
result = mock.sentinel.objects
result.objects = objects
session = mock.Mock()
session.vim.RetrievePropertiesEx = mock.Mock()
session.vim.RetrievePropertiesEx.return_value = result
entity = mock.Mock()
inv_path = vim_util.get_inventory_path(session.vim, entity, 100)
self.assertEqual('dc-1', inv_path)

View File

@ -1,26 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_vmware
----------------------------------
Tests for `vmware` module.
"""
from tests import base
class TestVmware(base.TestCase):
def test_something(self):
pass