Add all missing attributes of StorageService in RSD 2.1

Change-Id: I7984e902690b110a0445baab8af5e64ef5191ca7
This commit is contained in:
Lin Yang 2019-03-25 11:51:07 -07:00
parent 8de4eb879a
commit 0a37bbd349
5 changed files with 338 additions and 318 deletions

View File

@ -16,66 +16,73 @@
import logging
from sushy.resources import base
from sushy import utils
from rsd_lib import common as rsd_lib_common
from rsd_lib import base as rsd_lib_base
from rsd_lib import utils as rsd_lib_utils
LOG = logging.getLogger(__name__)
class LogicalDrive(base.ResourceBase):
class LinksField(base.CompositeField):
identity = base.Field('Id', required=True)
"""The logical drive identity string"""
logical_drives = base.Field(
"LogicalDrives", adapter=utils.get_members_identities
)
drive_type = base.Field('Type')
"""Type of logical drive"""
physical_drives = base.Field(
"PhysicalDrives", adapter=utils.get_members_identities
)
mode = base.Field('Mode')
"""Drive mode - for Type=='LVM' only supported mode is 'LV'"""
master_drive = base.Field(
"MasterDrive", adapter=rsd_lib_utils.get_resource_identity
)
protected = base.Field('Protected')
"""If logical drive is protected"""
used_by = base.Field("UsedBy", adapter=utils.get_members_identities)
capacity_gib = base.Field('CapacityGiB')
"""Logical drive capacity in GiB"""
image = base.Field('Image')
"""Any name that identifies the content of image copied to this LV"""
bootable = base.Field('Bootable')
"""If the LV contains a bootable operating system"""
snapshot = base.Field('Snapshot')
"""Type of drive replication. Yes - copy on write, No - disc clone"""
status = rsd_lib_common.StatusField('Status')
"""The logical drive status"""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a LogicalDrive
:param connector: A Connector instance
:param identity: The identity of the LogicalDrive resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(LogicalDrive, self).__init__(connector, identity,
redfish_version)
targets = base.Field("Targets", adapter=utils.get_members_identities)
class LogicalDriveCollection(base.ResourceCollectionBase):
class LogicalDrive(rsd_lib_base.ResourceBase):
type = base.Field("Type")
"""Type of volume"""
mode = base.Field("Mode")
"""Mode defines how the logical drive is built on top of underlying
physical/logical drives. The value shall correspond to the logical
drive type.
"""
protected = base.Field("Protected", adapter=bool)
"""Write (modify) protection state."""
capacity_gi_b = base.Field(
"CapacityGiB", adapter=rsd_lib_utils.num_or_none
)
"""Drive capacity in GibiBytes."""
image = base.Field("Image")
"""Image name."""
bootable = base.Field("Bootable", adapter=bool)
"""Specify if target is bootable."""
snapshot = base.Field("Snapshot", adapter=bool)
"""Indicates if the logical drive should be created as a snapshot of the
source master drive, or should be created as a full copy of an image
from the source master drive.
"""
status = rsd_lib_base.StatusField("Status")
"""This indicates the known state of the resource, such as if it is
enabled.
"""
links = LinksField("Links")
class LogicalDriveCollection(rsd_lib_base.ResourceCollectionBase):
@property
def _resource_type(self):
return LogicalDrive
def __init__(self, connector, path, redfish_version=None):
"""A class representing a LogicalDriveCollection
:param connector: A Connector instance
:param path: The canonical path to the LogicalDrive collection resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(LogicalDriveCollection, self).__init__(connector, path,
redfish_version)

View File

@ -16,67 +16,53 @@
import logging
from sushy.resources import base
from sushy import utils
from rsd_lib import common as rsd_lib_common
from rsd_lib import base as rsd_lib_base
from rsd_lib import utils as rsd_lib_utils
LOG = logging.getLogger(__name__)
class PhysicalDrive(base.ResourceBase):
class LinksField(base.CompositeField):
identity = base.Field('Id', required=True)
"""The physical drive identity string"""
interface = base.Field('Interface')
"""The interface for the physical drive"""
capacity_gib = base.Field('CapacityGiB')
"""Physical drive capacity in GiB"""
drive_type = base.Field('Type')
"""Type of physical drive"""
rpm = base.Field('RPM')
"""The RPM of this physical drive"""
manufacturer = base.Field('Manufacture')
"""The manufacturer of the physical drive"""
model = base.Field('Model')
"""The model of this physical drive"""
serial_number = base.Field('SerialNumber')
"""The serial number for the physical drive"""
status = rsd_lib_common.StatusField('Status')
"""The physical drive status"""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a PhysicalDrive
:param connector: A Connector instance
:param identity: The identity of the PhysicalDrive resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(PhysicalDrive, self).__init__(connector, identity,
redfish_version)
used_by = base.Field("UsedBy", adapter=utils.get_members_identities)
class PhysicalDriveCollection(base.ResourceCollectionBase):
class PhysicalDrive(rsd_lib_base.ResourceBase):
interface = base.Field("Interface")
"""Controller interface"""
capacity_gi_b = base.Field(
"CapacityGiB", adapter=rsd_lib_utils.num_or_none
)
"""Drive capacity in GibiBytes."""
type = base.Field("Type")
"""Type of drive"""
rpm = base.Field("RPM", adapter=rsd_lib_utils.num_or_none)
"""For traditional drive, rotation per minute."""
manufacturer = base.Field("Manufacturer")
"""Drive manufacturer name."""
model = base.Field("Model")
"""Drive model"""
serial_number = base.Field("SerialNumber")
"""Drive serial number"""
status = rsd_lib_base.StatusField("Status")
"""This indicates the known state of the resource, such as if it is
enabled.
"""
links = LinksField("Links")
class PhysicalDriveCollection(rsd_lib_base.ResourceCollectionBase):
@property
def _resource_type(self):
return PhysicalDrive
def __init__(self, connector, path, redfish_version=None):
"""A class representing a PhysicalDriveCollection
:param connector: A Connector instance
:param path: The canonical path to the PhysicalDrive collection
resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(PhysicalDriveCollection, self).__init__(connector, path,
redfish_version)

View File

@ -17,74 +17,93 @@ import logging
from sushy.resources import base
from rsd_lib import common as rsd_lib_common
from rsd_lib import base as rsd_lib_base
from rsd_lib import utils as rsd_lib_utils
LOG = logging.getLogger(__name__)
class TargetLunField(base.ListField):
lun = base.Field('LUN')
class iSCSIInitiatorField(base.CompositeField):
initiator_iqn = base.Field("InitiatorIQN")
"""IQN of iSCSI target initiator"""
class ChapField(base.CompositeField):
type = base.Field("Type")
"""CHAP parameters of iSCSI target."""
username = base.Field("Username")
"""CHAP one way user name."""
secret = base.Field("Secret")
"""CHAP one way secret."""
mutual_username = base.Field("MutualUsername")
"""CHAP mutual user name."""
mutual_secret = base.Field("MutualSecret")
"""CHAP mutual secret."""
class TargetLUNCollectionField(rsd_lib_base.ReferenceableMemberField):
lun = base.Field("LUN", adapter=rsd_lib_utils.num_or_none)
"""Logical unit number"""
logical_drive = base.Field(
"LogicalDrive", adapter=rsd_lib_utils.get_resource_identity
)
"""Logical drive URI"""
class ISCSIAddressField(base.CompositeField):
target_lun = TargetLunField('TargetLUN')
target_iqn = base.Field('TargetIQN')
target_portal_ip = base.Field('TargetPortalIP')
target_portal_port = base.Field('TargetPortalPort')
target_lun = TargetLUNCollectionField("TargetLUN")
"""Target Logical Unit"""
target_iqn = base.Field("TargetIQN")
"""Target IQN"""
target_portal_ip = base.Field("TargetPortalIP")
"""iSCSI target portal IP address"""
target_portal_port = base.Field(
"TargetPortalPort", adapter=rsd_lib_utils.num_or_none
)
"""iSCSI target port"""
chap = ChapField("CHAP")
"""CHAP parameters of iSCSI target."""
class AddressesField(base.ListField):
iscsi = ISCSIAddressField('iSCSI')
class InitiatorCollectionField(rsd_lib_base.ReferenceableMemberField):
iscsi = iSCSIInitiatorField("iSCSI")
class ISCSIInitiatorField(base.CompositeField):
iqn = base.Field('InitiatorIQN')
class AddressCollectionField(rsd_lib_base.ReferenceableMemberField):
iscsi = ISCSIAddressField("iSCSI")
class InitiatorsField(base.ListField):
iscsi = ISCSIInitiatorField('iSCSI')
class RemoteTarget(rsd_lib_base.ResourceBase):
status = rsd_lib_base.StatusField("Status")
"""This indicates the known state of the resource, such as if it is
enabled.
"""
class RemoteTarget(base.ResourceBase):
identity = base.Field('Id', required=True)
"""The target identity string"""
target_type = base.Field('Type')
type = base.Field("Type")
"""Type of target"""
addresses = AddressesField('Addresses')
addresses = AddressCollectionField("Addresses")
initiators = InitiatorsField('Initiator')
status = rsd_lib_common.StatusField('Status')
"""The remote target status"""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a RemoteTarget
:param connector: A Connector instance
:param identity: The identity of the RemoteTarget resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(RemoteTarget, self).__init__(connector, identity,
redfish_version)
initiator = InitiatorCollectionField("Initiator")
class RemoteTargetCollection(base.ResourceCollectionBase):
class RemoteTargetCollection(rsd_lib_base.ResourceCollectionBase):
@property
def _resource_type(self):
return RemoteTarget
def __init__(self, connector, path, redfish_version=None):
"""A class representing a RemoteTargetCollection
:param connector: A Connector instance
:param path: The canonical path to the RemoteTarget collection resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(RemoteTargetCollection, self).__init__(connector, path,
redfish_version)

View File

@ -18,7 +18,7 @@ import logging
from sushy.resources import base
from sushy import utils
from rsd_lib import common as rsd_lib_common
from rsd_lib import base as rsd_lib_base
from rsd_lib.resources.v2_1.storage_service import logical_drive
from rsd_lib.resources.v2_1.storage_service import physical_drive
from rsd_lib.resources.v2_1.storage_service import remote_target
@ -26,94 +26,64 @@ from rsd_lib.resources.v2_1.storage_service import remote_target
LOG = logging.getLogger(__name__)
class StorageService(base.ResourceBase):
class LinksField(base.CompositeField):
description = base.Field('Description')
"""The storage service description"""
managed_by = base.Field("ManagedBy", adapter=utils.get_members_identities)
identity = base.Field('Id', required=True)
"""The storage service identity string"""
name = base.Field('Name')
"""The storage service name"""
class StorageService(rsd_lib_base.ResourceBase):
status = rsd_lib_common.StatusField('Status')
"""The storage service status"""
status = rsd_lib_base.StatusField("Status")
"""This indicates the known state of the resource, such as if it is
enabled.
"""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a StorageService
:param connector: A Connector instance
:param identity: The identity of the StorageService resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(StorageService, self).__init__(connector, identity,
redfish_version)
def _get_logical_drive_collection_path(self):
"""Helper function to find the LogicalDriveCollection path"""
return utils.get_sub_resource_path_by(self, 'LogicalDrives')
@property
@utils.cache_it
def logical_drives(self):
"""Property to provide reference to `LogicalDriveCollection` instance
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
"""
return logical_drive.LogicalDriveCollection(
self._conn, self._get_logical_drive_collection_path(),
redfish_version=self.redfish_version)
def _get_physical_drive_collection_path(self):
"""Helper function to find the PhysicalDriveCollection path"""
return utils.get_sub_resource_path_by(self, 'Drives')
@property
@utils.cache_it
def physical_drives(self):
"""Property to provide reference to `PhysicalDriveCollection` instance
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
"""
return physical_drive.PhysicalDriveCollection(
self._conn, self._get_physical_drive_collection_path(),
redfish_version=self.redfish_version)
def _get_remote_target_collection_path(self):
"""Helper function to find the RemoteTargetCollection path"""
return utils.get_sub_resource_path_by(self, 'RemoteTargets')
links = LinksField("Links")
@property
@utils.cache_it
def remote_targets(self):
"""Property to provide reference to `RemoteTargetCollection` instance
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
"""
return remote_target.RemoteTargetCollection(
self._conn, self._get_remote_target_collection_path(),
redfish_version=self.redfish_version)
self._conn,
utils.get_sub_resource_path_by(self, "RemoteTargets"),
redfish_version=self.redfish_version,
)
@property
@utils.cache_it
def logical_drives(self):
"""Property to provide reference to `LogicalDriveCollection` instance
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
"""
return logical_drive.LogicalDriveCollection(
self._conn,
utils.get_sub_resource_path_by(self, "LogicalDrives"),
redfish_version=self.redfish_version,
)
@property
@utils.cache_it
def drives(self):
"""Property to provide reference to `PhysicalDriveCollection` instance
It is calculated once when it is queried for the first time. On
refresh, this property is reset.
"""
return physical_drive.PhysicalDriveCollection(
self._conn,
utils.get_sub_resource_path_by(self, "Drives"),
redfish_version=self.redfish_version,
)
class StorageServiceCollection(base.ResourceCollectionBase):
class StorageServiceCollection(rsd_lib_base.ResourceCollectionBase):
@property
def _resource_type(self):
return StorageService
def __init__(self, connector, path, redfish_version=None):
"""A class representing a StorageServiceCollection
:param connector: A Connector instance
:param path: The canonical path to the StorageService collection
resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of the given version.
"""
super(StorageServiceCollection, self).__init__(connector, path,
redfish_version)

View File

@ -16,7 +16,6 @@
import json
import mock
from sushy import exceptions
import testtools
from rsd_lib.resources.v2_1.storage_service import logical_drive
@ -26,217 +25,256 @@ from rsd_lib.resources.v2_1.storage_service import storage_service
class StorageServiceTestCase(testtools.TestCase):
def setUp(self):
super(StorageServiceTestCase, self).setUp()
self.conn = mock.Mock()
with open('rsd_lib/tests/unit/json_samples/v2_1/storage_service.json',
'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/storage_service.json", "r"
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.storage_service_inst = storage_service.StorageService(
self.conn, '/redfish/v1/Nodes/RSS1',
redfish_version='1.0.2')
self.conn, "/redfish/v1/Nodes/RSS1", redfish_version="1.0.2"
)
def test__parse_attributes(self):
self.storage_service_inst._parse_attributes()
self.assertEqual('1.0.2', self.storage_service_inst.redfish_version)
self.assertEqual('Storage Service',
self.storage_service_inst.description)
self.assertEqual('RSS1', self.storage_service_inst.identity)
self.assertEqual('Storage Service', self.storage_service_inst.name)
self.assertEqual('Enabled', self.storage_service_inst.status.state)
self.assertEqual('OK', self.storage_service_inst.status.health)
self.assertEqual('OK', self.storage_service_inst.status.health_rollup)
def test__get_logical_drive_collection_path_missing_processors_attr(self):
self.storage_service_inst._json.pop('LogicalDrives')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute LogicalDrives',
self.storage_service_inst._get_logical_drive_collection_path)
self.assertEqual("1.0.2", self.storage_service_inst.redfish_version)
self.assertEqual(
"Storage Service", self.storage_service_inst.description
)
self.assertEqual("RSS1", self.storage_service_inst.identity)
self.assertEqual("Storage Service", self.storage_service_inst.name)
self.assertEqual("Enabled", self.storage_service_inst.status.state)
self.assertEqual("OK", self.storage_service_inst.status.health)
self.assertEqual("OK", self.storage_service_inst.status.health_rollup)
def test_logical_drives(self):
# | GIVEN |
self.conn.get.return_value.json.reset_mock()
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'logical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"logical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN |
actual_logical_drives = self.storage_service_inst.logical_drives
# | THEN |
self.assertIsInstance(actual_logical_drives,
logical_drive.LogicalDriveCollection)
self.assertIsInstance(
actual_logical_drives, logical_drive.LogicalDriveCollection
)
self.conn.get.return_value.json.assert_called_once_with()
# reset mock
self.conn.get.return_value.json.reset_mock()
# | WHEN & THEN |
# tests for same object on invoking subsequently
self.assertIs(actual_logical_drives,
self.storage_service_inst.logical_drives)
self.assertIs(
actual_logical_drives, self.storage_service_inst.logical_drives
)
self.conn.get.return_value.json.assert_not_called()
def test_logical_drives_on_refresh(self):
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'logical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"logical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.logical_drives,
logical_drive.LogicalDriveCollection)
self.assertIsInstance(
self.storage_service_inst.logical_drives,
logical_drive.LogicalDriveCollection,
)
# On refreshing the storage service instance...
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'storage_service.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/" "storage_service.json", "r"
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.storage_service_inst.invalidate()
self.storage_service_inst.refresh(force=False)
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'logical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"logical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.logical_drives,
logical_drive.LogicalDriveCollection)
def test__get_physical_drive_collection_path_missing_processors_attr(self):
self.storage_service_inst._json.pop('Drives')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute Drives',
self.storage_service_inst._get_physical_drive_collection_path)
self.assertIsInstance(
self.storage_service_inst.logical_drives,
logical_drive.LogicalDriveCollection,
)
def test_physical_drives(self):
# | GIVEN |
self.conn.get.return_value.json.reset_mock()
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'physical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"physical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN |
actual_physical_drives = self.storage_service_inst.physical_drives
actual_drives = self.storage_service_inst.drives
# | THEN |
self.assertIsInstance(actual_physical_drives,
physical_drive.PhysicalDriveCollection)
self.assertIsInstance(
actual_drives, physical_drive.PhysicalDriveCollection
)
self.conn.get.return_value.json.assert_called_once_with()
# reset mock
self.conn.get.return_value.json.reset_mock()
# | WHEN & THEN |
# tests for same object on invoking subsequently
self.assertIs(actual_physical_drives,
self.storage_service_inst.physical_drives)
self.assertIs(actual_drives, self.storage_service_inst.drives)
self.conn.get.return_value.json.assert_not_called()
def test_physical_drives_on_refresh(self):
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'physical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"physical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.physical_drives,
physical_drive.PhysicalDriveCollection)
self.assertIsInstance(
self.storage_service_inst.drives,
physical_drive.PhysicalDriveCollection,
)
# On refreshing the storage service instance...
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'storage_service.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/" "storage_service.json", "r"
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.storage_service_inst.invalidate()
self.storage_service_inst.refresh(force=False)
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'physical_drive_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"physical_drive_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.physical_drives,
physical_drive.PhysicalDriveCollection)
def test__get_remote_target_collection_path_missing_processors_attr(self):
self.storage_service_inst._json.pop('RemoteTargets')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute RemoteTargets',
self.storage_service_inst._get_remote_target_collection_path)
self.assertIsInstance(
self.storage_service_inst.drives,
physical_drive.PhysicalDriveCollection,
)
def test_remote_targets(self):
# | GIVEN |
self.conn.get.return_value.json.reset_mock()
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'remote_target_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"remote_target_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN |
actual_remote_targets = self.storage_service_inst.remote_targets
# | THEN |
self.assertIsInstance(actual_remote_targets,
remote_target.RemoteTargetCollection)
self.assertIsInstance(
actual_remote_targets, remote_target.RemoteTargetCollection
)
self.conn.get.return_value.json.assert_called_once_with()
# reset mock
self.conn.get.return_value.json.reset_mock()
# | WHEN & THEN |
# tests for same object on invoking subsequently
self.assertIs(actual_remote_targets,
self.storage_service_inst.remote_targets)
self.assertIs(
actual_remote_targets, self.storage_service_inst.remote_targets
)
self.conn.get.return_value.json.assert_not_called()
def test_remote_targets_on_refresh(self):
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'remote_target_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"remote_target_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.remote_targets,
remote_target.RemoteTargetCollection)
self.assertIsInstance(
self.storage_service_inst.remote_targets,
remote_target.RemoteTargetCollection,
)
# On refreshing the storage service instance...
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'storage_service.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/" "storage_service.json", "r"
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.storage_service_inst.invalidate()
self.storage_service_inst.refresh(force=False)
# | GIVEN |
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'remote_target_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"remote_target_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
# | WHEN & THEN |
self.assertIsInstance(self.storage_service_inst.remote_targets,
remote_target.RemoteTargetCollection)
self.assertIsInstance(
self.storage_service_inst.remote_targets,
remote_target.RemoteTargetCollection,
)
class StorageServiceCollectionTestCase(testtools.TestCase):
def setUp(self):
super(StorageServiceCollectionTestCase, self).setUp()
self.conn = mock.Mock()
with open('rsd_lib/tests/unit/json_samples/v2_1/'
'storage_service_collection.json', 'r') as f:
with open(
"rsd_lib/tests/unit/json_samples/v2_1/"
"storage_service_collection.json",
"r",
) as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.storage_service_col = storage_service.StorageServiceCollection(
self.conn, '/redfish/v1/Services', redfish_version='1.0.2')
self.conn, "/redfish/v1/Services", redfish_version="1.0.2"
)
def test__parse_attributes(self):
self.storage_service_col._parse_attributes()
self.assertEqual('1.0.2', self.storage_service_col.redfish_version)
self.assertEqual('Storage Services Collection',
self.storage_service_col.name)
self.assertEqual(('/redfish/v1/Services/RSS1',),
self.storage_service_col.members_identities)
self.assertEqual("1.0.2", self.storage_service_col.redfish_version)
self.assertEqual(
"Storage Services Collection", self.storage_service_col.name
)
self.assertEqual(
("/redfish/v1/Services/RSS1",),
self.storage_service_col.members_identities,
)
@mock.patch.object(storage_service, 'StorageService', autospec=True)
@mock.patch.object(storage_service, "StorageService", autospec=True)
def test_get_member(self, mock_storage_service):
self.storage_service_col.get_member('/redfish/v1/Services/RSS1')
self.storage_service_col.get_member("/redfish/v1/Services/RSS1")
mock_storage_service.assert_called_once_with(
self.storage_service_col._conn, '/redfish/v1/Services/RSS1',
redfish_version=self.storage_service_col.redfish_version)
self.storage_service_col._conn,
"/redfish/v1/Services/RSS1",
redfish_version=self.storage_service_col.redfish_version,
)
@mock.patch.object(storage_service, 'StorageService', autospec=True)
@mock.patch.object(storage_service, "StorageService", autospec=True)
def test_get_members(self, mock_storage_service):
members = self.storage_service_col.get_members()
mock_storage_service.assert_called_once_with(
self.storage_service_col._conn, '/redfish/v1/Services/RSS1',
redfish_version=self.storage_service_col.redfish_version)
self.storage_service_col._conn,
"/redfish/v1/Services/RSS1",
redfish_version=self.storage_service_col.redfish_version,
)
self.assertIsInstance(members, list)
self.assertEqual(1, len(members))