diff --git a/compass/tests/api/test_api.py b/compass/tests/api/test_api.py new file mode 100644 index 00000000..999a83f8 --- /dev/null +++ b/compass/tests/api/test_api.py @@ -0,0 +1,1017 @@ +#!/usr/bin/python +# +# Copyright 2014 Huawei Technologies Co. Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""test api module.""" +import celery +import copy +import mock +import os +import simplejson as json +import unittest2 + + +os.environ['COMPASS_IGNORE_SETTING'] = 'true' + + +from compass.utils import setting_wrapper as setting +reload(setting) + + +from compass.api import login_manager +from compass.db.api import adapter_holder as adapter_api +from compass.db.api import cluster as cluster_api +from compass.db.api import database +from compass.db.api import host as host_api +from compass.db.api import metadata_holder as metadata_api +from compass.db.api import user as user_api +from compass.db.models import User +from compass.utils import flags +from compass.utils import logsetting +from compass.utils import util + + +class ApiTestCase(unittest2.TestCase): + """base api test class.""" + + CLUSTER_NAME = "Test_CLUSTER" + SWITCH_IP_ADDRESS = '10.10.10.1' + SWITCH_CREDENTIAL = {'version': 'xxx', + 'community': 'xxx'} + USER_CREDENTIALS = { + 'email': setting.COMPASS_ADMIN_EMAIL, + 'password': setting.COMPASS_ADMIN_PASSWORD + } + + def setUp(self): + super(ApiTestCase, self).setUp() + reload(setting) + setting.CONFIG_DIR = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'data' + ) + database.init('sqlite://') + database.create_db() + adapter_api.load_adapters() + metadata_api.load_metadatas() + + from compass.api import api as compass_api + application = compass_api.app + self.test_client = application.test_client() + + celery.current_app.send_task = mock.Mock() + url = '/users/token' + data = self.USER_CREDENTIALS + request_data = json.dumps(data) + return_value = self.test_client.post( + url, + data=request_data, + ) + resp = return_value.get_data() + resp = json.loads(resp) + self.token = resp['token'] + + #create a cluster + adapter_name, adapter_id, os_id, flavor_id = ( + self._get_adapter_info() + ) + url = '/clusters' + data = {} + data['name'] = 'test_cluster1' + data['adapter_id'] = adapter_id + data['os_id'] = os_id + data['flavor_id'] = flavor_id + self.post(url, data) + data = {} + data['name'] = 'test_cluster2' + data['adapter_id'] = adapter_id + data['os_id'] = os_id + data['flavor_id'] = flavor_id + self.post(url, data) + + #create a switch + url = '/switches' + datas = [ + { + 'ip': self.SWITCH_IP_ADDRESS, + 'credentials': { + "version": "2c", + "community": "public" + }, + 'vendor': 'huawei', + 'state': 'under_monitoring' + }, + { + 'ip': '172.29.8.40', + 'credentials': { + "version": "2c", + "community": "public" + }, + 'vendor': 'huawei', + 'state': 'under_monitoring' + } + ] + for data in datas: + self.post(url, data) + + def get(self, url): + return self.test_client.get( + url, headers={ + setting.USER_AUTH_HEADER_NAME: self.token + } + ) + + def post(self, url, data): + return self.test_client.post(url, data=json.dumps(data)) + + def put(self, url, data): + return self.test_client.put(url, data=json.dumps(data)) + + def delete(self, url): + return self.test_client.delete(url) + + def tearDown(self): + database.drop_db() + reload(setting) + super(ApiTestCase, self).tearDown() + + def _get_adapter_info(self): + adapter_name = None + adapter_id = None + os_id = None + flavor_id = None + adapters = self.get( + '/adapters' + ).get_data() + adapters = json.loads(adapters) + for adapter in adapters: + if adapter['flavors']: + adapter_name = adapter['name'] + adapter_id = adapter['id'] + os_id = adapter['supported_oses'][0]['os_id'] + for flavor in adapter['flavors']: + flavor_id = flavor['id'] + break + return (adapter_name, adapter_id, os_id, flavor_id) + + +class TestAuth(ApiTestCase): + """Test user authentication.""" + + def setUp(self): + super(TestAuth, self).setUp() + + def tearDown(self): + super(TestAuth, self).tearDown() + + def test_login_logout(self): + # Test login + url = '/users/token' + data = self.USER_CREDENTIALS + request_data = json.dumps(data) + return_value = self.test_client.post( + url, + data=request_data, + follow_redirects=True + ) + self.assertEqual(return_value.status_code, 200) + # Test logout + url = '/users/logout' + return_value = self.test_client.post( + url, + follow_redirects=True + ) + self.assertEqual(return_value.status_code, 200) + + def test_login_failed(self): + url = '/users/token' + # Wrong credentials + data_list = [ + {"email": "xxx", "password": "admin"}, + {"email": "admin@huawei.com", "password": "xxx"} + ] + for data in data_list: + return_value = self.test_client.post( + url, + data=data, + follow_redirects=True + ) + self.assertEqual(return_value.status_code, 400) + self.assertIn('missing email or password', return_value.get_data()) + + # disable user + User.query.filter_by( + email="admin@huawei.com" + ).update({"active": False}) + data = { + "email": "admin@huawei.com", + "password": "admin" + } + request_data = json.dumps(data) + return_value = self.test_client.post( + url, + data=request_data, + follow_redirects=True + ) + self.assertEqual(return_value.status_code, 403) + self.assertIn("failed to login", return_value.get_data()) + + +class TestUserAPI(ApiTestCase): + """Test user api.""" + + def setUp(self): + super(TestUserAPI, self).setUp() + + def tearDown(self): + super(TestUserAPI, self).tearDown() + + def test_list_users(self): + url = '/users' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 1) + self.assertEqual(return_value.status_code, 200) + + +class TestClusterAPI(ApiTestCase): + """Test cluster api.""" + + def setUp(self): + super(TestClusterAPI, self).setUp() + # add a machine + url = '/switches/2/machines' + data = { + 'mac': '28:6e:d4:46:c4:25', + 'port': '1', + 'vlans': [88] + } + self.post(url, data) + url = '/clusters/1/hosts' + data = { + 'name': 'test_cluster_host', + 'reinstall_os': True, + 'machine_id': 1 + } + self.post(url, data) + + def tearDown(self): + super(TestClusterAPI, self).tearDown() + + def test_list_clusters(self): + # list clusters successfully + + url = '/clusters' + return_value = self.test_client.get(url) + self.assertEqual(return_value.status_code, 200) + resp = json.loads(return_value.get_data()) + expected = [ + {'name': 'test_cluster1'}, + {'name': 'test_cluster2'} + ] + for i, v in enumerate(resp): + self.assertTrue( + all(item in resp[i].items() for item in expected[i].items()) + ) + + # give a non-existed + url = '/clusters?name=xx' + return_value = json.loads( + self.test_client.get(url).get_data() + ) + self.assertEqual([], return_value) + + def test_show_cluster(self): + # get a cluster successfully + url = '/clusters/1' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 200) + resp = json.loads(return_value.get_data()) + self.assertEqual(resp['name'], 'test_cluster1') + + # get a non-exist cluster + url = 'clusters/999' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_add_cluster(self): + # add a cluster successfully + url = '/clusters' + adapter_name, adapter_id, os_id, flavor_id = ( + self._get_adapter_info() + ) + data = {} + data['name'] = 'test_add_cluster' + data['adapter_id'] = adapter_id + data['os_id'] = os_id + data['flavor_id'] = flavor_id + return_value = self.post(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['name'], 'test_add_cluster') + + # add a duplicated cluster + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 409) + + # add a cluster with a non-existed adapter-id + data = {} + data['name'] = 'cluster_invalid' + data['adapter_id'] = 9 + data['os_id'] = 1 + data['flavor_id'] = 1 + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_update_cluster(self): + # update a cluster sucessfully + url = 'clusters/1' + data = { + 'name': 'cluster_update' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 200) + self.assertEqual( + json.loads(return_value.get_data())['name'], + 'cluster_update' + ) + + # update a non-existed cluster + url = 'clusters/99' + data = { + 'name': 'cluster_update_non_existed' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 404) + + # update a cluster with wrong keyword + url = 'clusters/1' + data = { + 'xxx': 'cluster_update_wrong_keyword' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_delete_cluster_not_editable(self): + # delete a cluster which state is installing + self.user_object = ( + user_api.get_user_object( + setting.COMPASS_ADMIN_EMAIL + ) + ) + cluster_api.update_cluster_state( + self.user_object, + 1, + state='INSTALLING' + ) + url = '/clusters/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 403) + + def test_delete_cluster(self): + # delete a cluster sucessfully + url = '/clusters/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + def test_list_cluster_hosts(self): + # list cluster_hosts successfully + url = '/clusters/1/hosts' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 1) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed cluster_id + url = '/clusters/99/hosts' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(resp, []) + + def test_show_cluster_host(self): + # show a cluster_host successfully + url = '/clusters/1/hosts/1' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(resp['hostname'], 'test_cluster_host') + + # give a non-existed host_id + url = '/clusters/1/hosts/99' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_add_cluster_host(self): + # add a cluster_host successfully + url = '/clusters/2/hosts' + data = { + 'name': 'add_cluster_host', + 'reinstall_os': True, + 'machine_id': 1 + } + return_value = self.post(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['hostname'], 'add_cluster_host') + + # add a duplicate cluster_host + data = { + 'name': 'duplicate_cluster_host', + 'reinstall_os': True, + 'machine_id': 1 + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 409) + + def test_update_cluster_host(self): + # update cluster_host successfully + url = '/clusters/1/hosts/1' + data = { + 'roles': [ + 'allinone-compute' + ] + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 200) + + def test_delete_cluster_host(self): + # delete a cluster_host successfully + url = '/clusters/1/hosts/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed cluster_id + url = '/clusters/99/hosts/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 404) + + +class TestSubnetAPI(ApiTestCase): + """Test subnet api.""" + + def setUp(self): + super(TestSubnetAPI, self).setUp() + url = '/subnets' + data = { + 'subnet': '10.145.89.0/24', + 'name': 'test_subnet' + } + self.post(url, data) + + def tearDown(self): + super(TestSubnetAPI, self).tearDown() + + def test_list_subnets(self): + # list subnets successfully + url = '/subnets' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 1) + self.assertEqual(return_value.status_code, 200) + + # list subnets with non-exists name + url = '/subnets?name=test' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(resp, []) + + def test_show_subnet(self): + # get a subnet successfully + url = '/subnets/1' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['name'], 'test_subnet') + + # give a non-existed id + url = '/subnets/99' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_add_subnet(self): + # subnet already added in setUp() + # duplicate subnet + url = '/subnets' + data = { + 'subnet': '10.145.89.0/24', + 'name': 'test_subnet' + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 409) + + # add subnet with invalid subnet + data = { + 'subnet': 'xxx', + 'name': 'subnet_invalid' + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_update_subnet(self): + # update subnet successfully + url = '/subnets/1' + data = { + 'subnet': '192.168.100.0/24', + 'name': 'update_subnet' + } + return_value = self.put(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertTrue(item in data.items() for item in resp.items()) + + # give a non-existed id + url = '/subnets/99' + data = { + 'subnet': '192.168.100.0/24', + 'name': 'subnet_invalid' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 404) + + # update with wrong filter + url = '/subnets/1' + data = { + 'xxx': 'wrong_filter' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_delete_subnet(self): + # delete a subnet successfully + url = '/subnets/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + # delete a non-existed subnet + url = '/subnets/99' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 404) + + +class TestSwitchAPI(ApiTestCase): + """Test switch api.""" + + def setUp(self): + super(TestSwitchAPI, self).setUp() + + def tearDown(self): + super(TestSwitchAPI, self).tearDown() + + def test_list_switches(self): + url = '/switches' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 2) + self.assertEqual(return_value.status_code, 200) + + # give a ip_int + url = '/switches?ip_int=172.29.8.40' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 200) + + # give a invalid ip_int + url = '/switches?ip_int=xxx' + return_value = json.loads(self.get(url).get_data()) + self.assertEqual([], return_value) + + def test_show_switch(self): + # show switch successfully + url = '/switches/2' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['ip'], self.SWITCH_IP_ADDRESS) + + # give a non-existed id + url = '/switches/99' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_add_switch(self): + # add a new switch successfully + url = '/switches' + data = { + 'ip': '172.29.8.10', + 'credentials': { + "version": "2c", + "community": "public" + }, + 'vendor': 'huawei', + 'state': 'under_monitoring' + } + return_value = self.post(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['ip'], '172.29.8.10') + + # add a duplicated switch + data = { + 'ip': self.SWITCH_IP_ADDRESS, + 'credentials': { + "version": "2c", + "community": "public" + }, + 'vendor': 'huawei', + 'state': 'under_monitoring' + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 409) + + # add a invalid swtich + data = { + 'ip': 'xxx', + 'vendor': 'huawei' + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_update_switch(self): + # update a swithc successfully + url = '/switches/1' + data = { + 'vendor': 'update_vendor' + } + return_value = self.put(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['vendor'], 'update_vendor') + + # update a non-existed switch + url = '/switches/99' + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 404) + + # update with wrong filter + url = '/switches/2' + data = { + 'xxx': 'invlid' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_delete_switch(self): + # delete a switch successfully + url = '/switches/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + # delete a non-existed switch + url = '/switches/99' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 404) + + +class TestAdapterAPI(ApiTestCase): + """Test adapter api.""" + + def setUp(self): + super(TestAdapterAPI, self).setUp() + self.adapter_name = None + self.adapter_id = None + url = '/adapters' + adapters = json.loads(self.get(url).get_data()) + for adapter in adapters: + if adapter['flavors']: + self.adapter_name = adapter['name'] + self.adapter_id = adapter['id'] + + def tearDown(self): + super(TestAdapterAPI, self).tearDown() + + def test_list_adapters(self): + # list adapters successfully + url = '/adapters' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 3) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed filter + url = '/adapters?name=xx' + return_value = json.loads(self.get(url).get_data()) + self.assertEqual([], return_value) + + def test_show_adapter(self): + # show an adapter successfully + url = '/adapters/%s' % self.adapter_id + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['name'], self.adapter_name) + + # give a non-existed id + url = '/adapters/99' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_show_adapter_roles(self): + # get adapter role successfully + url = '/adapters/%s/roles' % self.adapter_id + return_value = self.get(url) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed id + url = '/adapters/99/roles' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + +class TestHostAPI(ApiTestCase): + """Test host api.""" + + def setUp(self): + super(TestHostAPI, self).setUp() + # add a machine to get the machine_id + url = 'switches/1/machines' + datas = [ + { + 'mac': '28:6e:d4:46:c4:25', + 'port': '1', + 'location': 'test_location1' + }, + { + 'mac': '00:0c:29:bf:eb:1d', + 'port': '1', + 'location': 'test_location2' + } + ] + machine_id = {} + for i, data in enumerate(datas): + return_value = self.post(url, data) + resp = json.loads(return_value.get_data()) + machine_id[i] = resp['machine_id'] + + # add a host + url = '/clusters/1/hosts' + datas = [ + { + 'machine_id': machine_id[0], + 'name': 'test_host1', + 'reinstall_os': True + }, + { + 'machine_id': machine_id[1], + 'name': 'test_hosts2', + 'reinstall_os': True + } + ] + for data in datas: + self.post(url, data) + + # add host_network + url = '/subnets' + data = { + 'subnet': '10.172.20.0/24', + 'name': 'test_subnet' + } + self.post(url, data) + url = '/hosts/1/networks' + datas = [ + { + 'ip': '179049563', + 'subnet_id': 1, + 'is_mgmt': False, + 'is_promiscuous': False + }, + { + 'ip': '179049582', + 'subnet_id': 1, + 'is_mgmt': False, + 'is_promiscuous': False + } + ] + for data in datas: + self.post(url, data) + + def tearDown(self): + super(TestHostAPI, self).tearDown() + + def test_list_hosts(self): + # list hosts successfully + url = '/hosts' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 2) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed name + url = '/hosts?name=xx' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual([], resp) + + def test_show_host(self): + #show a host successfully + url = '/hosts/1' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['name'], 'test_host1') + + # give a non-existed id + url = '/hosts/99' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_list_machines_or_hosts(self): + url = '/machines-hosts' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 200) + + def test_show_machine_or_host(self): + url = '/machines-hosts/1' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 200) + + def test_update_host(self): + # update a host successfully + url = '/hosts/1' + data = { + 'name': 'update_host' + } + return_value = self.put(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['name'], 'update_host') + + # give a wrong filter + data = { + 'location': 'update_location' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 400) + self.assertIn('are not supported', return_value.get_data()) + + # give a non-existed id + url = '/hosts/99' + data = { + 'name': 'host' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 404) + + def test_delete_host(self): + # delete a host successfully + url = '/hosts/2' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed id + url = '/hosts/99' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 404) + + def test_list_host_networks(self): + url = '/hosts/1/networks' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 1) + self.assertEqual(return_value.status_code, 200) + + def test_show_host_network(self): + url = '/hosts/1/networks/1' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['ip'], '10.172.20.91') + + +class TestSwitchMachines(ApiTestCase): + + def setUp(self): + super(TestSwitchMachines, self).setUp() + url = '/switches/2/machines' + datas = [ + { + 'mac': '28:6e:d4:46:c4:25', + 'port': '1', + 'vlans': [88] + }, + { + 'mac': '00:0c:29:bf:eb:1d', + 'port': '1', + 'vlans': [1] + } + ] + for data in datas: + self.post(url, data) + + def tearDown(self): + super(TestSwitchMachines, self).tearDown() + + def test_list_switch_machines(self): + # list switch machines successfully + url = '/switches/2/machines' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + count = len(resp) + self.assertEqual(count, 2) + self.assertEqual(return_value.status_code, 200) + + # give a non-existed switch_id + url = '/switches/99/machines' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(resp, []) + + def test_add_switch_machine(self): + # add a switch machine successfully + url = '/switches/2/machines' + data = { + 'mac': '00:0c:29:5b:ee:eb', + 'port': '1', + 'vlans': [10] + } + return_value = self.post(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['mac'], '00:0c:29:5b:ee:eb') + + # add a dulicated switch machine + url = '/switches/2/machines' + data = { + 'mac': '28:6e:d4:46:c4:25', + 'port': '1', + 'vlans': [88] + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 409) + + # add a invalid switch machine + url = '/switches/2/machines' + data = { + 'mac': 'xxx' + } + return_value = self.post(url, data) + self.assertEqual(return_value.status_code, 400) + + def test_show_switch_machine(self): + # show a switch_machine successfully + url = '/switches/2/machines/1' + return_value = self.get(url) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['mac'], '28:6e:d4:46:c4:25') + + # give a non-existed switch_id + url = '/switches/99/machines/1' + return_value = self.get(url) + self.assertEqual(return_value.status_code, 404) + + def test_update_switch_machine(self): + # update a switch_machine successfully + url = '/switches/2/machines/1' + data = { + 'port': '100' + } + return_value = self.put(url, data) + resp = json.loads(return_value.get_data()) + self.assertEqual(return_value.status_code, 200) + self.assertEqual(resp['port'], '100') + + # give a non-existed switch_id + url = '/switches/99/machines/1' + data = { + 'port': '99' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 404) + + # give a wrong filter + url = '/switches/2/machines/1' + data = { + 'mac': '00:0c:29:a5:f2:05' + } + return_value = self.put(url, data) + self.assertEqual(return_value.status_code, 400) + self.assertIn('are not supported', return_value.get_data()) + + def test_delete_switch_machine(self): + # delete a switch_machine successfully + url = '/switches/2/machines/1' + return_value = self.delete(url) + self.assertEqual(return_value.status_code, 200) + + +if __name__ == '__main__': + flags.init() + logsetting.init() + unittest2.main()