Merge "Patch for python3 change in tempest"

This commit is contained in:
Zuul 2020-03-17 11:30:39 +00:00 committed by Gerrit Code Review
commit e56d90b36e
11 changed files with 147 additions and 80 deletions

View File

@ -799,6 +799,7 @@ class FeatureManager(traffic_manager.IperfManager,
break
def count_response(self, response):
response = response.decode('utf-8')
if response in self.http_cnt:
self.http_cnt[response] += 1
else:

View File

@ -183,7 +183,7 @@ class TrafficManager(appliance_manager.ApplianceManager):
'file1': src_file,
'dst_folder':
dst_folder}
args = shlex.split(cmd.encode('utf-8'))
args = shlex.split(cmd)
subprocess_args = {'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT}
proc = subprocess.Popen(args, **subprocess_args)
@ -257,10 +257,11 @@ class TrafficManager(appliance_manager.ApplianceManager):
'Connection: close\r\nContent-Type: text/html; '
'charset=UTF-8\r\n\r\n%s"; cat >/dev/null')
with tempfile.NamedTemporaryFile() as script:
script.write(resp % (len(server_name), server_name))
script.write((resp % (len(server_name), server_name)).
encode('utf-8'))
script.flush()
with tempfile.NamedTemporaryFile() as key:
key.write(private_key)
key.write(private_key.encode('utf-8'))
key.flush()
self.scp_file_to_instance_using_fip(script.name,
"/tmp/script",

View File

@ -91,7 +91,7 @@ class NSXPClient(object):
content_type = self.content_type if content is None else content
accept_type = self.accept_type if accept is None else accept
auth_cred = self.username + ":" + self.password
auth = base64.b64encode(auth_cred)
auth = base64.b64encode(auth_cred.encode()).decode()
headers = {}
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type

View File

@ -90,7 +90,7 @@ class NSXV3Client(object):
content_type = self.content_type if content is None else content
accept_type = self.accept_type if accept is None else accept
auth_cred = self.username + ":" + self.password
auth = base64.b64encode(auth_cred)
auth = base64.b64encode(auth_cred.encode()).decode()
headers = {}
headers['Authorization'] = "Basic %s" % auth
headers['Content-Type'] = content_type

View File

@ -79,7 +79,8 @@ class TestQos(feature_manager.FeatureManager):
'start': str(address_cidr).split('/')[0] + '2',
'end':str(address_cidr).split('/')[0] + '70'}]}
if slaac:
subnet = self.create_topology_subnet(subnet_name, network,
subnet = self.create_topology_subnet(
subnet_name, network,
subnets_client=subnet_client,
routers_client=self.cmgr_adm.routers_client,
router_id=router['id'],
@ -87,7 +88,8 @@ class TestQos(feature_manager.FeatureManager):
ipv6_address_mode='slaac',
**allocation_pools)
else:
subnet = self.create_topology_subnet(subnet_name, network,
subnet = self.create_topology_subnet(
subnet_name, network,
subnets_client=subnet_client,
ip_version=6, enable_dhcp=False,
**allocation_pools)
@ -129,8 +131,8 @@ class TestQos(feature_manager.FeatureManager):
"""Create network with IPv6 static assignment based subnet
Associate qos policy to the network
"""
network, _ = self._create_single_ipv6_rtr_topology()
policy = self. _create_qos_policy()
network, _ = self._create_single_ipv6_rtr_topology()
self.cmgr_adm.networks_client.update_network(
network['id'], qos_policy_id=policy['id'])
updated_network = self.cmgr_adm.networks_client.show_network(
@ -164,7 +166,8 @@ class TestQos(feature_manager.FeatureManager):
policy = self. _create_qos_policy()
port_client = self.cmgr_adm.ports_client
body = self.create_topology_port(network=network,
ports_client=port_client, qos_policy_id=policy['id'])
ports_client=port_client,
qos_policy_id=policy['id'])
port = body['port']
body = self.show_topology_port(port['id'],
ports_client=port_client)

View File

@ -13,12 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest.api.network import base
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.services import nsxv3_client
CONF = config.CONF
@ -57,6 +60,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.networks_client.delete_network, net_id)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -75,6 +80,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.networks_client.delete_network, net_id)
self.create_subnet(network, enable_dhcp=False)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -93,6 +100,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.networks_client.delete_network, net_id)
subnet = self.create_subnet(network)
self.assertTrue('ACTIVE', network['status'])
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
nsx_switch = self.nsx.get_logical_switch(network['name'],
network['id'])
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
@ -101,6 +110,8 @@ class NSXv3NativeDHCPNegative(base.BaseNetworkTest):
self.assertIsNotNone(dhcp_server)
# Update subnet to disable DHCP
self.subnets_client.update_subnet(subnet['id'], enable_dhcp=False)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
dhcp_server = self.nsx.get_logical_dhcp_server(network['name'],
network['id'])
self.assertIsNone(dhcp_server)

View File

@ -258,6 +258,8 @@ class NSXv3MacLearningTest(base.BaseNetworkTest):
self.assertEqual(nsx_port['display_name'], test_port['name'],
"OS port and NSX port name do not match")
self._delete_port(test_port)
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
self.assertIsNone(self.nsx.get_logical_port(test_port['name']),
"Port %s is not None" % test_port['name'])

View File

@ -117,8 +117,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
for port in self._list_ports(device_id=instance['id']):
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
@ -129,8 +130,9 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def get_port_ipv4v6_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_ipv4v6_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
for addr in instance_addr:
if addr['version'] == 4:
instance_fixed_ip = addr["addr"]
@ -284,14 +286,16 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2 = '87.0.0.4'
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2}]
port_client.update_port(
@ -312,8 +316,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_v6_connectivity_between_allowed_adddress_pair_ports(self,
network_topo):
def _test_v6_connectivity_between_allowed_adddress_pair_ports(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
@ -332,14 +336,16 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2 = '3000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2}]
port_client.update_port(
@ -360,8 +366,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
(ssh_source, ip_address_vm2, 'True'),
'Destination is reachable')
def _test_v4_v6_connectivity_between_allowed_adddress_pair_ports(self,
network_topo):
def _test_v4_v6_connectivity_between_allowed_adddress_pair_ports(
self, network_topo):
server_name_default = data_utils.rand_name('server-default')
network = network_topo['network']
server_default = self._create_server(server_name_default, network)
@ -382,7 +388,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ipv6_address_vm2 = '3000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm1},
{'ip_address': ipv6_address_vm1}]
@ -390,7 +397,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm2},
{'ip_address': ipv6_address_vm2}]
@ -544,8 +552,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.interface_client.delete_interface,
server_default1['id'], port1_id['port']['id'])
ssh_source1 = self.get_remote_client(ip_address_default_vm,
private_key=private_key_default_vm)
ssh_source1 = self.get_remote_client(
ip_address_default_vm, private_key=private_key_default_vm)
ssh_source2 = self.get_remote_client(
ip_address_default1_vm,
private_key=private_key_default1_vm)
@ -583,7 +591,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2_2 = '78.0.0.4'
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1_1},
{'ip_address': ip_address_vm1_2}]
@ -591,7 +600,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2_1},
{'ip_address': ip_address_vm2_2}]
@ -642,7 +652,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ip_address_vm2_2 = '4000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1_1},
{'ip_address': ip_address_vm1_2}]
@ -650,7 +661,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm2_1},
{'ip_address': ip_address_vm2_2}]
@ -705,7 +717,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
ipv6_address_vm2_2 = '4000:10:10::3'
port_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm1_1},
{'ip_address': ipv4_address_vm1_2},
@ -715,7 +728,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
port_id, allowed_address_pairs=allowed_address_pairs)
port1_id = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_default1)
server_default1,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ipv4_address_vm2_1},
{'ip_address': ipv4_address_vm2_2},
@ -789,7 +803,8 @@ class TestAllowedAddressPair(manager.NetworkScenarioTest):
# Allowed Address pair
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'],
server_default)
server_default,
network_name=network['name'])
# Update allowed address pair attribute of port
allowed_address_pairs = [{'ip_address': ip_address_vm1}]
port_client.update_port(port_id,

View File

@ -65,10 +65,12 @@ class ProviderNetworks(feature_manager.FeatureManager):
for tz in out:
if "transport_type" in tz.keys() and (vlan_flag == 0 or
vxlan_flag == 0):
if vxlan_flag == 0 and tz['transport_type'] == "OVERLAY":
if vxlan_flag == 0 and tz['transport_type'] == "OVERLAY" \
and tz['_create_user'] == 'admin':
cls.overlay_id = tz['id']
vxlan_flag = 1
if vlan_flag == 0 and tz['transport_type'] == "VLAN":
if vlan_flag == 0 and tz['transport_type'] == "VLAN" \
and tz['_create_user'] == 'admin':
cls.vlan_id = tz['id']
vlan_flag = 1

View File

@ -136,8 +136,9 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
@ -150,6 +151,21 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def get_port_ipv4v6_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
for addr in instance_addr:
if addr['version'] == 4:
instance_fixed_ip = addr["addr"]
for port in self._list_ports():
port_fixed_ip = port["fixed_ips"][0]["ip_address"]
if port["network_id"] == network_id and port["fixed_ips"][0][
"subnet_id"] == subnet_id and instance["id"] == port[
"device_id"] and port_fixed_ip == instance_fixed_ip:
port_id = port["id"]
self.assertIsNotNone(port_id, "Failed to find Instance's port id!!!")
return port_id
def _create_server(self, name, network, port_id=None, image_id=None):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
@ -451,9 +467,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_1,
private_ip_address_server_2,
private_key_server_1)
port_id_server_1 = self.get_port_id(network_topo['network']['id'],
port_id_server_1 = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1)
server_default_1,
network_name=network_topo['network']['name'])
port_id_server_2 = port_id['port']['id']
sec_grp_port = port_client.show_port(port_id_server_1)
sec_group = sec_grp_port['port']['security_groups'][0]
@ -521,9 +539,11 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_1,
private_ip_address_server_2,
private_key_server_1)
port_id_server_1 = self.get_port_id(network_topo['network']['id'],
port_id_server_1 = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
server_default_1)
server_default_1,
network_name=network_topo['network']['name'])
port_id_server_2 = port_id['port']['id']
sec_grp_port = port_client.show_port(port_id_server_1)
sec_group = sec_grp_port['port']['security_groups'][0]
@ -573,7 +593,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
network = network_topo['network']
server_1 = self._create_server(server_name_1, network)
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_1)
network_topo['subnet']['id'], server_1,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id)
@ -641,7 +662,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
private_ip_address_server_1,
private_key_server_2)
port_id1 = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_2)
network_topo['subnet']['id'], server_2,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id1)
@ -678,8 +700,10 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
self._check_server_connectivity(public_ip_address_server_2,
private_ip_address_server_1,
private_key_server_2)
port_id1 = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_2)
port_id1 = self.get_port_ipv4v6_id(network['id'],
network_topo['subnet']['id'],
server_2,
network_name=network['name'])
kwargs = {"port_security_enabled": "false", "security_groups": []}
port_client = self.cmgr_adm.ports_client
sec_grp_port = port_client.show_port(port_id1)
@ -727,7 +751,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port_id = self.get_port_id(network2['id'],
subnet2['id'], server_2)
subnet2['id'], server_2,
network_name=network2['name'])
sec_grp_port = port_client.show_port(port_id)
sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs)
@ -753,8 +778,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
private_address_server_1,
private_key_server_2)
def _test_ipv6_connectivity_between_servers_with_router(self,
network_topo):
def _test_ipv6_connectivity_between_servers_with_router(
self, network_topo):
server_name_default_1 =\
data_utils.rand_name('server-port-sec-1')
server_name_default_2 =\
@ -780,7 +805,8 @@ class TestNSXv3PortSecurityScenario(manager.NetworkScenarioTest):
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port_id = self.get_port_id(network['id'],
subnet['id'], server_1)
subnet['id'], server_1,
network_name=network['name'])
sec_grp_port = port_client.show_port(port_id)
sec_group = sec_grp_port['port']['security_groups'][0]
port_client.update_port(port_id, **kwargs)

View File

@ -143,8 +143,9 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
*args, **kwargs)
return ports_list['ports']
def get_port_id(self, network_id, subnet_id, instance):
_, instance_addr = instance["addresses"].items()[0]
def get_port_id(self, network_id, subnet_id, instance,
network_name=None):
instance_addr = instance["addresses"][network_name]
instance_fixed_ip = instance_addr[0]["addr"]
port_id = None
for port in self._list_ports():
@ -405,12 +406,16 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
p_client = self.ports_client
kwargs = {"provider_security_groups": ["%s" % sg_id]}
port_id_psg = self.get_port_id(network_topo['network']['id'],
port_id_psg = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_psg'])
port_id_default = self.get_port_id(network_topo['network']['id'],
servers['server_psg'],
network_name=network_topo['network']['name'])
port_id_default = self.get_port_id(
network_topo['network']['id'],
network_topo['subnet']['id'],
servers['server_default'])
servers['server_default'],
network_name=network_topo['network']['name'])
p_client.update_port(port_id_psg, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
p_client.update_port(port_id_default, **kwargs)
@ -511,7 +516,8 @@ class TestProviderSecurityGroup(manager.NetworkScenarioTest):
should_connect=False)
kwargs = {"provider_security_groups": []}
port_id = self.get_port_id(network['id'],
network_topo['subnet']['id'], server_psg)
network_topo['subnet']['id'], server_psg,
network_name=network['name'])
self.cmgr_adm.ports_client.update_port(port_id, **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
self._check_server_connectivity(ip_address_default_vm,