Huang Rui cf77385929 The init commmit of stackforge/zvm-driver
Including all python modules for nova-zvm-virt-driver and
neutron-zvm-plugin.

Change-Id: I72dd9f64fc412cbf10f5e7ab6e4ac465a977e849
2014-11-04 17:03:02 +08:00

472 lines
24 KiB
Python
Executable File

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the z/VM utils.
"""
import mock
from neutron.plugins.zvm.common import exception
from neutron.plugins.zvm.common import utils
from neutron.tests import base
from oslo.config import cfg
class TestZVMUtils(base.BaseTestCase):
_FAKE_VSWITCH_NAME = "fakevsw1"
_FAKE_PORT_NAME = "fake_port_name"
_FAKE_RET_VAL = 0
_FAKE_VM_PATH = "fake_vm_path"
_FAKE_VSWITCH = "fakevsw1"
_FAKE_VLAN_ID = "fake_vlan_id"
_FAKE_ZHCP_NODENAME = "fakezhcp"
_FAKE_ZHCP_USER = 'zhcp_user'
_FAKE_VDEV = "1000"
_FAKE_XCAT_NODENAME = "fakexcat"
_FAKE_XCAT_USER = "fake_xcat_user"
_FAKE_XCAT_PW = "fake_xcat_password"
def setUp(self):
super(TestZVMUtils, self).setUp()
self.addCleanup(cfg.CONF.reset)
cfg.CONF.set_override('zvm_xcat_username', self._FAKE_XCAT_USER,
group='AGENT')
cfg.CONF.set_override('zvm_xcat_password', self._FAKE_XCAT_PW,
group='AGENT')
with mock.patch(
'neutron.plugins.zvm.common.utils.zvmUtils._get_xcat_node_name',
mock.Mock(return_value=self._FAKE_XCAT_NODENAME)):
self._utils = utils.zvmUtils()
def test_couple_nic_to_vswitch(self):
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [[self._FAKE_VDEV]]},
{'data': [['OK']]},
{'data': [['OK']]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.couple_nic_to_vswitch(self._FAKE_VSWITCH,
self._FAKE_PORT_NAME,
self._FAKE_ZHCP_NODENAME,
"fake_user")
self.assertEqual(ret, self._FAKE_VDEV)
url_vdev = ('/xcatws/tables/switch?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json&'
'col=port&value=fake_port_name&attribute=interface')
url_couple_nic = ('/xcatws/nodes/fakezhcp/dsh?userName='
'fake_xcat_user&password=fake_xcat_password&format=json')
body_couple_nic_dm = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Adapter_Connect_Vswitch_DM -T fake_user'
' -v 1000 -n fakevsw1')]
body_couple_nic = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Adapter_Connect_Vswitch -T fake_user'
' -v 1000 -n fakevsw1')]
calls = [mock.call('GET', url_vdev),
mock.call('PUT', url_couple_nic, body_couple_nic_dm),
mock.call('PUT', url_couple_nic, body_couple_nic)]
xcat_req.assert_has_calls(calls)
def test_grant_user(self):
xcat_req = mock.Mock()
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.grant_user(self._FAKE_ZHCP_NODENAME,
self._FAKE_VSWITCH,
"fake_user")
url_grant_user = ('/xcatws/nodes/fakezhcp/dsh?userName='
'fake_xcat_user&password=fake_xcat_password&format=json')
body_grant_user = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Vswitch_Set_Extended -T fake_user'
' -k switch_name=fakevsw1 -k grant_userid=fake_user')]
xcat_req.assert_called_with('PUT', url_grant_user, body_grant_user)
def test_uncouple_nic_from_vswitch(self):
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [[self._FAKE_VDEV]]},
{'data': [['OK']]},
{'data': [['OK']]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.uncouple_nic_from_vswitch(self._FAKE_VSWITCH,
self._FAKE_PORT_NAME,
self._FAKE_ZHCP_NODENAME,
"fake_user")
url_vdev = ('/xcatws/tables/switch?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json&'
'col=port&value=fake_port_name&attribute=interface')
url_uncouple_nic = ('/xcatws/nodes/fakezhcp/dsh?userName='
'fake_xcat_user&password=fake_xcat_password&format=json')
body_uncouple_nic_dm = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Adapter_Disconnect_DM -T fake_user'
' -v 1000')]
body_uncouple_nic = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Adapter_Disconnect -T fake_user -v 1000')]
calls = [mock.call('GET', url_vdev),
mock.call('PUT', url_uncouple_nic, body_uncouple_nic_dm),
mock.call('PUT', url_uncouple_nic, body_uncouple_nic)]
xcat_req.assert_has_calls(calls)
def test_revoke_user(self):
res = {'errorcode': [['0']]}
xcat_req = mock.MagicMock()
xcat_req.return_value = res
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.revoke_user(self._FAKE_ZHCP_NODENAME,
self._FAKE_VSWITCH_NAME,
"fake_user")
url_revoke_user = ('/xcatws/nodes/fakezhcp/dsh?userName='
'fake_xcat_user&password=fake_xcat_password&format=json')
body_revoke_user = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Vswitch_Set_Extended -T fake_user'
' -k switch_name=fakevsw1 -k revoke_userid=fake_user')]
xcat_req.assert_called_with('PUT', url_revoke_user,
body_revoke_user)
def test_add_vswitch_exist(self):
res = {'errorcode': [['0']]}
xcat_req = mock.MagicMock()
xcat_req.return_value = res
self._utils.get_zhcp_userid = mock.MagicMock(
return_value=self._FAKE_ZHCP_USER)
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
with mock.patch.object(utils, "LOG") as log:
self._utils.add_vswitch(self._FAKE_ZHCP_NODENAME,
self._FAKE_VSWITCH_NAME,
self._FAKE_VDEV)
log.info.assert_called_with('Vswitch %s already exists.',
self._FAKE_VSWITCH_NAME)
def test_add_vswitch(self):
self._utils.get_zhcp_userid = mock.MagicMock()
self._utils.get_zhcp_userid.side_effect = [self._FAKE_ZHCP_USER,
self._FAKE_ZHCP_USER,
self._FAKE_ZHCP_USER]
xcat_req = mock.Mock()
res = {'errorcode': [['0']]} # vswitch does exist
res_err = {'errorcode': [['1']]} # vswitch does not exist
xcat_req.side_effect = [res_err, res, res]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.add_vswitch(self._FAKE_ZHCP_NODENAME,
self._FAKE_VSWITCH_NAME,
self._FAKE_VDEV,
vid=[self._FAKE_VLAN_ID])
url = ('/xcatws/nodes/fakezhcp/dsh?userName=fake_xcat_user'
'&password=fake_xcat_password&format=json')
body = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Vswitch_Create -T zhcp_user -n fakevsw1'
' -r 1000 -c 1 -q 8 -e 0 -t 2 -v 1'
' -p 1 -u 1 -G 2 -V 1')]
xcat_req.assert_any_called('PUT', url, body)
def test_set_vswitch_port_vlan_id(self):
self._utils._get_nic_settings = mock.MagicMock(return_value='inst1')
xcat_req = mock.Mock()
xcat_req.return_value = "OK"
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.set_vswitch_port_vlan_id(self._FAKE_VLAN_ID,
self._FAKE_PORT_NAME,
self._FAKE_VDEV,
self._FAKE_ZHCP_NODENAME,
self._FAKE_VSWITCH)
url = ('/xcatws/nodes/fakezhcp/dsh?userName=fake_xcat_user'
'&password=fake_xcat_password&format=json')
body = [('command=/opt/zhcp/bin/smcli'
' Virtual_Network_Vswitch_Set_Extended -T inst1'
' -k grant_userid=inst1 -k switch_name=fakevsw1'
' -k user_vlan_id=fake_vlan_id')]
xcat_req.assert_called_with('PUT', url, body)
def test_get_nic_ids(self):
xcat_req = mock.Mock()
data = 'fnode,fswitch,fport,fvlan,finf,-,false'
xcat_req.return_value = {'data': [[(
'#node,switch,port,vlan,interface,comments,disable'), data]]}
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.get_nic_ids()
self.assertEqual(ret, [data])
url = ('/xcatws/tables/switch?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
xcat_req.assert_called_with('GET', url)
def test_get_node_from_port(self):
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [[self._FAKE_ZHCP_NODENAME]]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.get_node_from_port(self._FAKE_PORT_NAME)
self.assertEqual(ret, self._FAKE_ZHCP_NODENAME)
url = ('/xcatws/tables/switch?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json&'
'col=port&value=fake_port_name&attribute=node')
calls = [mock.call('GET', url)]
xcat_req.assert_has_calls(calls)
def _test_get_userid_from_node(self, node, user):
xcat_req = mock.Mock()
xcat_req.return_value = {'data': [[user]]}
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.get_zhcp_userid(self._FAKE_ZHCP_NODENAME)
url = ('/xcatws/tables/zvm?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json&col=node&'
'value=%s&attribute=userid' % node)
xcat_req.assert_called_with('GET', url)
return ret
def test_get_userid_from_node(self):
self.assertEqual(self._test_get_userid_from_node(
self._FAKE_ZHCP_NODENAME,
self._FAKE_ZHCP_USER),
self._FAKE_ZHCP_USER)
def test_get_zhcp_userid(self):
self.assertEqual(self._test_get_userid_from_node(
self._FAKE_ZHCP_NODENAME,
self._FAKE_ZHCP_USER),
self._FAKE_ZHCP_USER)
def test_put_user_direct_online(self):
xcat_req = mock.Mock()
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.put_user_direct_online(self._FAKE_ZHCP_NODENAME,
'inst1')
url = ('/xcatws/nodes/fakezhcp/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = [('command=/opt/zhcp/bin/smcli'
' Static_Image_Changes_Immediate_DM -T inst1')]
xcat_req.assert_called_with('PUT', url, body)
def test_update_xcat_switch(self):
xcat_req = mock.Mock()
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.update_xcat_switch(self._FAKE_PORT_NAME,
self._FAKE_VSWITCH,
self._FAKE_VLAN_ID)
url = ('/xcatws/tables/switch?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['port=fake_port_name switch.switch=fakevsw1'
' switch.vlan=fake_vlan_id']
xcat_req.assert_called_with('PUT', url, body)
def _verify_query_nic(self, result, xcat_req):
url = ('/xcatws/nodes/fakexcat/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['command=smcli Virtual_Network_Adapter_Query'
' -T fakexcat -v 0800']
xcat_req.assert_any_with('PUT', url, body)
def test_create_xcat_mgt_network_exist(self):
nic_def = ['zhcp: Adapter:\nzhcp: Address: 0800\n'
'zhcp: Device count: 3\nzhcp: Adapter type: QDIO\n'
'zhcp: Adapter status: Coupled and active\n'
'zhcp: LAN owner: SYSTEM\n'
'zhcp: LAN name: XCATVSW2']
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [nic_def]},
{'data': [['OK']]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.create_xcat_mgt_network(self._FAKE_ZHCP_NODENAME,
"10.1.1.1",
"255.255.0.0",
self._FAKE_VSWITCH)
self._verify_query_nic(nic_def, xcat_req)
url = ('/xcatws/nodes/fakexcat/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['command=/usr/bin/perl /usr/sbin/sspqeth2.pl -a 10.1.1.1'
' -d 0800 0801 0802 -e eth2 -m 255.255.0.0 -g 10.1.1.1']
xcat_req.assert_called_with('PUT', url, body)
def test_create_xcat_mgt_network_not_exist(self):
nic_undef = ['zhcp: Failed\nzhcp: Return Code: 212\n'
'zhcp: Reason Code: 8\n'
'zhcp: Description: Adapter does not exist']
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [nic_undef]},
{'data': [['OK']]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.create_xcat_mgt_network(self._FAKE_ZHCP_NODENAME,
"10.1.1.1",
"255.255.0.0",
self._FAKE_VSWITCH)
self._verify_query_nic(nic_undef, xcat_req)
url = ('/xcatws/nodes/fakexcat/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['command=vmcp define nic 0800 type qdio\n'
'vmcp couple 0800 system fakevsw1\n'
'/usr/bin/perl /usr/sbin/sspqeth2.pl -a 10.1.1.1'
' -d 0800 0801 0802 -e eth2 -m 255.255.0.0 -g 10.1.1.1']
xcat_req.assert_called_with('PUT', url, body)
def test_create_xcat_mgt_network_error(self):
nic_err = ['zhcp: Adapter:\nzhcp: Address: 0800\n'
'zhcp: Device count: 3\nzhcp: Adapter type: QDIO\n'
'zhcp: Adapter status: Not coupled\n'
'zhcp: LAN owner: \n'
'zhcp: LAN name: ']
smapi_err = ['Failed']
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [nic_err]},
{'data': [smapi_err]}]
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
with mock.patch.object(utils, "LOG") as log:
self._utils.create_xcat_mgt_network(self._FAKE_ZHCP_NODENAME,
"10.1.1.1",
"255.255.0.0",
self._FAKE_VSWITCH)
self._verify_query_nic(nic_err, xcat_req)
log.error.assert_called_with('NIC 800 staus is unknown.')
self.assertRaises(exception.zvmException,
self._utils.create_xcat_mgt_network,
self._FAKE_ZHCP_NODENAME,
"10.1.1.1",
"255.255.0.0",
self._FAKE_VSWITCH)
self._verify_query_nic(smapi_err, xcat_req)
def test_re_grant_user(self):
'''We assume there is three nodes valid in the xCAT MN db, they are:
node1, node2, node4. We mock _MAX_REGRANT_USER_NUMBER to 2. So the
process of regrant has two steps. Fisrt grant two nodes and then
grant one node.'''
fake_port_info = ['node1,switch,port1,10,inf1,fakezhcp,false',
'node2,switch,port2,10,inf2,fakezhcp,false',
# node3's zhcp field is invalid
'node3,switch,port3,10,inf3,zhcp,false',
'node4,switch,port3,10,inf4,fakezhcp,false']
self._utils.get_nic_ids = mock.MagicMock(return_value=fake_port_info)
fake_user_id = ['#node,hcp,userid,nodetype,parent,comments,disable',
'"opnstk1","zhcp.ibm.com",,,,,', # invalid record
'"node1","fakezhcp","user01",,,,',
'"zhcp2","zhcp2.ibm.com","ZHCP",,,,', # invalid record
'"node2","fakezhcp","user02",,,,',
'"node3","zhcp","user03",,,,', # invalid record
'"node4","fakezhcp","user04",,,,']
xcat_req = mock.Mock()
xcat_req.side_effect = [{'data': [fake_user_id]},
{'data': [['OK']]}, # run_command step 1, regrant two node
{'data': [['OK']]}, # run_command remove
{'data': [['OK']]}, # run_command step 2
{'data': [['OK']]}] # run_command remove
with mock.patch.object(utils.zvmUtils, '_MAX_REGRANT_USER_NUMBER', 2):
with mock.patch(
'neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
self._utils.re_grant_user(self._FAKE_ZHCP_NODENAME)
url_command = ('/xcatws/nodes/fakezhcp/dsh?userName='
'fake_xcat_user&password=fake_xcat_password&format=json')
valid_users = [1, 2, 4]
last_user = None
# re_grant_user uses a dict to keep the ports info, so we don't
# know the order, which nodes are reganted in step 1 and which
# one is regranted in step 2. We will try to find the node
# removed in step 2 first, because this is easier. Then we
# verify the step 1.
for i in valid_users:
cmd_vsw_couple =\
('command=echo -e "#!/bin/sh\n/opt/zhcp/bin/smcli'
' Virtual_Network_Vswitch_Set_Extended -T user0%s -k'
' switch_name=switch -k grant_userid=user0%s'
' -k user_vlan_id=10" > grant.sh' % (i, i))
if mock.call('PUT', url_command, [cmd_vsw_couple]) in\
xcat_req.call_args_list:
last_user = i
break
self.assertTrue(last_user)
# remove the node from valid users, so we can verify if the
# other two nodes has been regranted via the valid_users.
del(valid_users[valid_users.index(last_user)])
body_cmd_node_1 =\
('command=echo -e "#!/bin/sh\n'
'/opt/zhcp/bin/smcli Virtual_Network_Vswitch_Set_Extended'
' -T user0%s -k switch_name=switch -k grant_userid=user0%s'
' -k user_vlan_id=10\n'
% (valid_users[0], valid_users[0])) +\
('/opt/zhcp/bin/smcli Virtual_Network_Vswitch_Set_Extended'
' -T user0%s -k switch_name=switch -k grant_userid=user0%s'
' -k user_vlan_id=10" > grant.sh'
% (valid_users[1], valid_users[1]))
body_cmd_node_2 =\
('command=echo -e "#!/bin/sh\n'
'/opt/zhcp/bin/smcli Virtual_Network_Vswitch_Set_Extended'
' -T user0%s -k switch_name=switch -k grant_userid=user0%s'
' -k user_vlan_id=10\n'
% (valid_users[1], valid_users[1])) +\
('/opt/zhcp/bin/smcli Virtual_Network_Vswitch_Set_Extended'
' -T user0%s -k switch_name=switch -k grant_userid=user0%s'
' -k user_vlan_id=10" > grant.sh'
% (valid_users[0], valid_users[0]))
self.assertTrue(
(mock.call('PUT', url_command, [body_cmd_node_1])
in xcat_req.call_args_list)
or (mock.call('PUT', url_command, [body_cmd_node_2])
in xcat_req.call_args_list))
def test_query_xcat_uptime(self):
xcat_uptime = {'data':
[['XCAT was activated on 2014-06-11 at 02:41:15']]}
xcat_req = mock.Mock(return_value=xcat_uptime)
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
with mock.patch.object(utils.zvmUtils, "get_userid_from_node",
mock.Mock(return_value='xcat')):
ret = self._utils.query_xcat_uptime(self._FAKE_ZHCP_NODENAME)
self.assertEqual(ret, '2014-06-11 at 02:41:15')
url = ('/xcatws/nodes/fakezhcp/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['command=/opt/zhcp/bin/smcli'
' Image_Query_Activate_Time -T xcat -f 4']
xcat_req.assert_called_with('PUT', url, body)
def test_query_zvm_uptime(self):
fake_ret = ('timezone\ncurrent time\nversion\nGen time\n'
'zhcp: The z/VM CP IPL time: 2014-06-11 01:38:37 EDT\n'
'storage\n')
zvm_uptime = {'data': [[fake_ret]]}
xcat_req = mock.Mock(return_value=zvm_uptime)
with mock.patch('neutron.plugins.zvm.common.xcatutils.xcat_request',
xcat_req):
ret = self._utils.query_zvm_uptime(self._FAKE_ZHCP_NODENAME)
self.assertEqual(ret, '2014-06-11 01:38:37 EDT')
url = ('/xcatws/nodes/fakezhcp/dsh?userName=fake_xcat_user&'
'password=fake_xcat_password&format=json')
body = ['command=/opt/zhcp/bin/smcli System_Info_Query']
xcat_req.assert_called_with('PUT', url, body)