# Copyright 2014 Huawei Technologies Co. Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import logging import mock import os import sys import unittest2 os.environ['COMPASS_IGNORE_SETTING'] = 'true' from compass.utils import setting_wrapper as setting reload(setting) from base import BaseTest from compass.db.api import adapter as adapter_api from compass.db.api import adapter_holder as adapter from compass.db.api import cluster from compass.db.api import database from compass.db.api import host from compass.db.api import machine from compass.db.api import metadata as metadata_api from compass.db.api import metadata_holder as metadata from compass.db.api import network from compass.db.api import switch from compass.db.api import user as user_api from compass.db import exception from compass.utils import flags from compass.utils import logsetting from compass.utils import util class ClusterTestCase(unittest2.TestCase): """Cluster base test case.""" def setUp(self): super(ClusterTestCase, self).setUp() os.environ['COMPASS_IGNORE_SETTING'] = 'true' os.environ['COMPASS_CONFIG_DIR'] = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'data' ) reload(setting) database.init('sqlite://') database.create_db() adapter.load_adapters(force_reload=True) metadata.load_metadatas(force_reload=True) adapter.load_flavors(force_reload=True) self.user_object = ( user_api.get_user_object( setting.COMPASS_ADMIN_EMAIL ) ) self.adapter_id = None self.os_id = None self.flavor_id = None self.cluster_id = None # get adapter information list_adapters = adapter.list_adapters(user=self.user_object) for list_adapter in list_adapters: for supported_os in list_adapter['supported_oses']: self.os_id = supported_os['os_id'] break if list_adapter['flavors']: details = list_adapter['flavors'] for detail in details: if detail['display_name'] == 'allinone': roles = detail['roles'] for role in roles: self.adapter_id = role['adapter_id'] self.flavor_id = role['flavor_id'] break # add cluster cluster_names = ['test_cluster1', 'test_cluster2'] for cluster_name in cluster_names: cluster.add_cluster( user=self.user_object, adapter_id=self.adapter_id, os_id=self.os_id, flavor_id=self.flavor_id, name=cluster_name ) clusters = cluster.list_clusters(user=self.user_object) self.roles = None for list_cluster in clusters: for item in list_cluster['flavor']['roles']: self.roles = item if list_cluster['name'] == 'test_cluster1': self.cluster_id = list_cluster['id'] break self.package_configs = { 'security': { 'service_credentials': { '$service': { 'username': 'root', 'password': 'root' } }, 'console_credentials': { '$console': { 'username': 'root', 'password': 'root' } } }, 'network_mapping': { '$interface_type': { 'interface': 'eth0', 'subnet': '10.145.88.0/23' } } } self.os_configs = { 'general': { 'language': 'EN', 'timezone': 'UTC', 'http_proxy': 'http://127.0.0.1:3128', 'https_proxy': 'http://127.0.0.1:3128', 'no_proxy': [ '127.0.0.1', 'compass' ], 'ntp_server': '127.0.0.1', 'dns_servers': [ '127.0.0.1' ], 'domain': 'ods.com', 'search_path': [ 'ods.com' ], 'default_gateway': '127.0.0.1', }, 'server_credentials': { 'username': 'root', 'password': 'root', }, 'partition': { '/var': { 'max_size': '100G', 'percentage': 10, 'size': '1G' } } } # add cluster config cluster.update_cluster_config( self.cluster_id, user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) # add switch switch.add_switch( user=self.user_object, ip='172.29.8.40' ) switches = switch.list_switches(user=self.user_object) self.switch_id = None for item in switches: self.switch_id = item['id'] macs = ['28:6e:d4:46:c4:25', '00:0c:29:bf:eb:1d'] for mac in macs: switch.add_switch_machine( self.switch_id, user=self.user_object, mac=mac, port='1' ) # get machine information machines = machine.list_machines(user=self.user_object) self.machine_ids = [] for item in machines: self.machine_ids.append(item['id']) # add cluster host name = ['newname1', 'newname2'] for i in range(0, 2): cluster.add_cluster_host( self.cluster_id, user=self.user_object, machine_id=self.machine_ids[i], name=name[i] ) self.host_id = [] self.clusterhost_id = [] clusterhosts = cluster.list_clusterhosts(user=self.user_object) for clusterhost in clusterhosts: self.host_id.append(clusterhost['host_id']) self.clusterhost_id.append(clusterhost['clusterhost_id']) # add log file file_names = ['log_file1', 'log_file2'] for file_name in file_names: cluster.add_cluster_host_log_history( self.cluster_id, self.host_id[0], user=self.user_object, filename=file_name ) # add subnet subnets = ['10.145.88.0/23', '192.168.100.0/23'] for subnet in subnets: network.add_subnet( user=self.user_object, subnet=subnet ) list_subnet = network.list_subnets( user=self.user_object ) self.subnet_ids = [] for item in list_subnet: self.subnet_ids.append(item['id']) # add host network host.add_host_network( self.host_id[0], user=self.user_object, interface='eth0', ip='10.145.88.0', subnet_id=self.subnet_ids[0], is_mgmt=True ) host.add_host_network( self.host_id[0], user=self.user_object, interface='eth1', ip='10.145.88.10', subnet_id=self.subnet_ids[0], is_promiscuous=True ) host.list_host_networks( self.host_id[0], user=self.user_object, ) def tearDown(self): super(ClusterTestCase, self).tearDown() class TestListClusters(ClusterTestCase): """Test list clusters.""" def setUp(self): super(TestListClusters, self).setUp() def tearDown(self): super(TestListClusters, self).tearDown() def test_list_clusters(self): clusters = cluster.list_clusters(user=self.user_object) result = [] for list_cluster in clusters: result.append(list_cluster['name']) expects = ['test_cluster1', 'test_cluster2'] self.assertIsNotNone(clusters) for expect in expects: self.assertIn(expect, result) class TestGetCluster(ClusterTestCase): """Test get cluster.""" def setUp(self): super(TestGetCluster, self).setUp() def tearDown(self): super(TestGetCluster, self).tearDown() def test_get_cluster(self): get_cluster = cluster.get_cluster( self.cluster_id, user=self.user_object, ) self.assertIsNotNone(get_cluster) self.assertEqual(get_cluster['name'], 'test_cluster1') def test_non_exsit_cluster_id(self): self.assertRaises( exception.RecordNotExists, cluster.get_cluster, 99, user=self.user_object, ) class TestAddCluster(ClusterTestCase): """Test add cluster.""" def setUp(self): super(TestAddCluster, self).setUp() def tearDown(self): super(TestAddCluster, self).tearDown() def test_add_cluster(self): cluster.add_cluster( user=self.user_object, adapter_id=self.adapter_id, os_id=self.os_id, flavor_id=self.flavor_id, name='test_add_cluster' ) add_clusters = cluster.list_clusters(user=self.user_object) result = [] for add_cluster in add_clusters: result.append(add_cluster['name']) self.assertIn('test_add_cluster', result) def test_add_cluster_position_args(self): cluster.add_cluster( True, 'test_add_cluster_position', user=self.user_object, adapter_id=self.adapter_id, os_id=self.os_id, flavor_id=self.flavor_id, ) add_clusters = cluster.list_clusters(user=self.user_object) result = [] for add_cluster in add_clusters: result.append(add_cluster['name']) self.assertIn('test_add_cluster_position', result) def test_add_cluster_session(self): with database.session() as session: cluster.add_cluster( user=self.user_object, adapter_id=self.adapter_id, os_id=self.os_id, flavor_id=self.flavor_id, name='test_add_cluster_session', session=session ) add_clusters = cluster.list_clusters(user=self.user_object) result = [] for add_cluster in add_clusters: result.append(add_cluster['name']) self.assertIn('test_add_cluster_session', result) class TestUpdateCluster(ClusterTestCase): """Test update cluster.""" def setUp(self): super(TestUpdateCluster, self).setUp() def tearDown(self): super(TestUpdateCluster, self).tearDown() def test_update_cluster(self): cluster.update_cluster( self.cluster_id, user=self.user_object, name='test_update_cluster' ) update_cluster = cluster.get_cluster( self.cluster_id, user=self.user_object, ) self.assertEqual(update_cluster['name'], 'test_update_cluster') def test_duplicate_name(self): cluster.update_cluster( self.cluster_id, user=self.user_object, name='test_update_cluster' ) self.assertRaises( exception.DuplicatedRecord, cluster.update_cluster, 2, user=self.user_object, name='test_update_cluster' ) def test_is_cluster_editable(self): # state is INSTALLING cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.update_cluster, self.cluster_id, user=self.user_object, name='cluster_editable' ) # reinstall self.assertRaises( exception.Forbidden, cluster.update_cluster, self.cluster_id, user=self.user_object, reinstall_distributed_system=True ) class TestDelCluster(ClusterTestCase): """Test delete cluster.""" def setUp(self): super(TestDelCluster, self).setUp() def tearDown(self): super(TestDelCluster, self).setUp() def test_del_cluster(self): from compass.tasks import client as celery_client celery_client.celery.send_task = mock.Mock() cluster.del_cluster( self.cluster_id, user=self.user_object, ) del_clusters = cluster.list_clusters( user=self.user_object, ) for del_cluster in del_clusters: self.assertNotEqual(1, del_cluster['id']) def test_is_cluster_editable(self): # state is INSTALLING cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.del_cluster, self.cluster_id, user=self.user_object, ) class TestGetClusterConfig(ClusterTestCase): """Test get cluster config.""" def setUp(self): super(TestGetClusterConfig, self).setUp() cluster.update_cluster_config( self.cluster_id, user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestGetClusterConfig, self).tearDown() def test_get_cluster_config(self): cluster_config = cluster.get_cluster_config( self.cluster_id, user=self.user_object, ) package_config = cluster_config['package_config'] os_config = cluster_config['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestGetClusterDeployedConfig(ClusterTestCase): def setUp(self): super(TestGetClusterDeployedConfig, self).setUp() cluster.update_cluster_config( self.cluster_id, user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'hosts': [self.host_id[0]] } ) cluster.update_cluster_deployed_config( self.cluster_id, user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestGetClusterDeployedConfig, self).tearDown() def test_get_cluster_deployed_config(self): configs = cluster.get_cluster_deployed_config( self.cluster_id, user=self.user_object, ) os_config = configs['deployed_os_config'] package_config = configs['deployed_package_config'] self.assertItemsEqual(os_config, self.os_configs) self.assertItemsEqual(package_config, self.package_configs) class TestGetClusterMetadata(ClusterTestCase): """Test get cluster metadata.""" def setUp(self): super(TestGetClusterMetadata, self).setUp() def tearDown(self): super(TestGetClusterMetadata, self).tearDown() def test_get_cluster_metadata(self): cluster_metadata = cluster.get_cluster_metadata( self.cluster_id, user=self.user_object, ) results = [] for k, v in cluster_metadata.items(): results.append(k) expected = ['os_config', 'package_config'] self.assertIsNotNone(cluster_metadata) for result in results: self.assertIn(result, expected) class TestUpdateClusterConfig(ClusterTestCase): """Test update cluster config.""" def setUp(self): super(TestUpdateClusterConfig, self).setUp() def tearDown(self): super(TestUpdateClusterConfig, self).tearDown() def test_update_cluster_config(self): cluster.update_cluster_config( self.cluster_id, user=self.user_object, put_os_config=self.os_configs, put_package_config=self.package_configs ) update_cluster_config = cluster.get_cluster_config( self.cluster_id, user=self.user_object, ) package_config = update_cluster_config['package_config'] os_config = update_cluster_config['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestPatchClusterConfig(ClusterTestCase): """Test patch cluster config.""" def setUp(self): super(TestPatchClusterConfig, self).setUp() def tearDown(self): super(TestPatchClusterConfig, self).tearDown() def test_patch_cluster_config(self): patch_cluster_config = cluster.patch_cluster_config( self.cluster_id, user=self.user_object, package_config=self.package_configs, os_config=self.os_configs ) package_config = patch_cluster_config['package_config'] os_config = patch_cluster_config['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestDelClusterConfig(ClusterTestCase): """Test delete a cluster config.""" def setUp(self): super(TestDelClusterConfig, self).setUp() cluster.update_cluster_config( self.cluster_id, user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestDelClusterConfig, self).tearDown() def test_del_cluster_config(self): cluster.del_cluster_config( self.cluster_id, user=self.user_object, ) del_cluster_config = cluster.get_cluster_config( self.cluster_id, user=self.user_object, ) configs = [] for k, v in del_cluster_config.items(): if k == 'package_config' or k == 'os_config': configs.append(v) for config in configs: self.assertEqual(config, {}) def test_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.del_cluster_config, self.cluster_id, user=self.user_object, ) class TestListClusterHosts(ClusterTestCase): """Test list cluster hosts.""" def setUp(self): super(TestListClusterHosts, self).setUp() def tearDown(self): super(TestListClusterHosts, self).tearDown() def test_list_cluster_hosts(self): list_cluster_hosts = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) results = [] expected = ['28:6e:d4:46:c4:25', '00:0c:29:bf:eb:1d'] for item in list_cluster_hosts: results.append(item['mac']) for result in results: self.assertIn(result, expected) class TestListClusterhosts(ClusterTestCase): """Test list clusterhosts.""" def setUp(self): super(TestListClusterhosts, self).setUp() def tearDown(self): super(TestListClusterhosts, self).tearDown() def test_list_clusterhosts(self): list_clusterhosts = cluster.list_clusterhosts(user=self.user_object) results = [] expected = ['28:6e:d4:46:c4:25', '00:0c:29:bf:eb:1d'] for item in list_clusterhosts: results.append(item['mac']) for result in results: self.assertIn(result, expected) class TestGetClusterHost(ClusterTestCase): """Test get cluster host.""" def setUp(self): super(TestGetClusterHost, self).setUp() def tearDown(self): super(TestGetClusterHost, self).tearDown() def test_get_cluster_host(self): get_cluster_host = cluster.get_cluster_host( self.cluster_id, self.host_id[1], user=self.user_object, ) self.assertEqual(get_cluster_host['mac'], '00:0c:29:bf:eb:1d') class TestGetClusterhost(ClusterTestCase): """Test get clusterhost.""" def setUp(self): super(TestGetClusterhost, self).setUp() def tearDown(self): super(TestGetClusterhost, self).tearDown() def test_get_clusterhost(self): get_clusterhost = cluster.get_clusterhost( self.clusterhost_id[1], user=self.user_object, ) self.assertEqual(get_clusterhost['mac'], '00:0c:29:bf:eb:1d') class TestAddClusterHost(ClusterTestCase): """Test add cluster host.""" def setUp(self): super(TestAddClusterHost, self).setUp() switch.add_switch_machine( self.switch_id, user=self.user_object, mac='00:0c:29:5b:ee:eb', port='1' ) machines = machine.list_machines(user=self.user_object) self.add_machine_id = None for item in machines: if item['mac'] == '00:0c:29:5b:ee:eb': self.add_machine_id = item['id'] def tearDown(self): super(TestAddClusterHost, self).tearDown() def test_add_cluster_host(self): # add a cluster_host cluster.add_cluster_host( self.cluster_id, user=self.user_object, machine_id=self.add_machine_id, name='test_add_cluster_host' ) add_cluster_hosts = cluster.list_clusterhosts(user=self.user_object) expected = { 'hostname': 'test_add_cluster_host', 'owner': 'admin@huawei.com', 'name': 'test_add_cluster_host.test_cluster1', } self.assertTrue( all(item in add_cluster_hosts[2].items() for item in expected.items())) def test_duplicate_name(self): cluster.add_cluster_host( self.cluster_id, user=self.user_object, machine_id=self.add_machine_id, name='test_add_cluster_host' ) self.assertRaises( exception.DuplicatedRecord, cluster.add_cluster_host, 2, user=self.user_object, machine_id=2, name='test_add_cluster_host' ) def test_is_cluster_editable(self): # installing cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.add_cluster_host, self.cluster_id, user=self.user_object, machine_id=self.add_machine_id ) class TestUpdateClusterHost(ClusterTestCase): """Test update cluster host.""" def setUp(self): super(TestUpdateClusterHost, self).setUp() def tearDown(self): super(TestUpdateClusterHost, self).tearDown() def test_update_cluster_host(self): cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'], ) update_cluster_hosts = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) result = None for item in update_cluster_hosts: if item['roles']: result = item['roles'][0]['display_name'] self.assertEqual(result, 'all in one compute') def test_duplicate_name(self): self.assertRaises( exception.DuplicatedRecord, cluster.update_cluster_host, self.cluster_id, self.host_id[0], user=self.user_object, name='newname2' ) def test_invalid_role(self): self.assertRaises( exception.InvalidParameter, cluster.update_cluster_host, self.cluster_id, self.host_id[0], user=self.user_object, roles=['invalid_role'] ) def test_is_cluster_editable(self): # state is INSTALLING cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.update_cluster_host, self.cluster_id, self.host_id[0], user=self.user_object, ) class TestUpdateClusterhost(ClusterTestCase): """Test update clusterhost.""" def setUp(self): super(TestUpdateClusterhost, self).setUp() def tearDown(self): super(TestUpdateClusterhost, self).tearDown() def test_update_clusterhost(self): cluster.update_clusterhost( self.clusterhost_id[0], user=self.user_object, reinstall_os=False, roles=['allinone-compute'] ) update_clusterhosts = cluster.list_clusterhosts( user=self.user_object, ) result = None for item in update_clusterhosts: if item['roles']: result = item['roles'][0]['display_name'] self.assertEqual(result, 'all in one compute') def test_duplicate_name(self): self.assertRaises( exception.DuplicatedRecord, cluster.update_clusterhost, self.clusterhost_id[0], user=self.user_object, name='newname2' ) def test_invalid_role(self): self.assertRaises( exception.InvalidParameter, cluster.update_clusterhost, self.clusterhost_id[0], user=self.user_object, roles=['invalid_role'] ) def test_is_cluster_editable(self): # state is INSTALLING cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.update_clusterhost, self.clusterhost_id[0], user=self.user_object, ) class TestPatchClusterHost(ClusterTestCase): def setUp(self): super(TestPatchClusterHost, self).setUp() def tearDown(self): super(TestPatchClusterHost, self).tearDown() def test_patch_cluster_host(self): cluster.patch_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) patch = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) result = None for item in patch: for role in item['roles']: result = role['display_name'] self.assertEqual(result, 'all in one compute') def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.patch_cluster_host, self.cluster_id, self.host_id[0], user=self.user_object, ) class TestPatchClusterhost(ClusterTestCase): def setUp(self): super(TestPatchClusterhost, self).setUp() def tearDown(self): super(TestPatchClusterhost, self).tearDown() def test_patch_clusterhost(self): cluster.patch_clusterhost( self.clusterhost_id[0], user=self.user_object, roles=['allinone-compute'] ) patch = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) result = None for item in patch: for role in item['roles']: result = role['display_name'] self.assertEqual(result, 'all in one compute') def testi_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.patch_clusterhost, self.clusterhost_id[0], user=self.user_object, ) class TestDelClusterHost(ClusterTestCase): """test delete cluster host.""" def setUp(self): super(TestDelClusterHost, self).setUp() def tearDown(self): super(TestDelClusterHost, self).tearDown() def test_del_cluster_host(self): from compass.tasks import client as celery_client celery_client.celery.send_task = mock.Mock() cluster.del_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, ) del_cluster_hosts = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) for del_cluster_host in del_cluster_hosts: self.assertNotEqual(del_cluster_host['id'], 1) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.del_cluster_host, self.cluster_id, self.host_id[0], user=self.user_object, ) class TestDelClusterhost(ClusterTestCase): """test delete clusterhost.""" def setUp(self): super(TestDelClusterhost, self).setUp() def tearDown(self): super(TestDelClusterhost, self).tearDown() def test_del_clusterhost(self): from compass.tasks import client as celery_client celery_client.celery.send_task = mock.Mock() cluster.del_clusterhost( self.clusterhost_id[0], user=self.user_object, ) del_clusterhosts = cluster.list_clusterhosts( user=self.user_object, ) for del_clusterhost in del_clusterhosts: self.assertNotEqual(del_clusterhost['id'], 1) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.del_clusterhost, self.clusterhost_id[0], user=self.user_object, ) class TestGetClusterHostConfig(ClusterTestCase): """Test get cluster host config.""" def setUp(self): super(TestGetClusterHostConfig, self).setUp() cluster.update_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestGetClusterHostConfig, self).tearDown() def test_get_cluster_host_config(self): configs = cluster.get_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, ) package_config = configs['package_config'] os_config = configs['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestGetClusterhostConfig(ClusterTestCase): """Test get clusterhost config.""" def setUp(self): super(TestGetClusterhostConfig, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tesrDown(self): super(TestGetClusterhostConfig, self).tearDown() def test_get_clusterhost_config(self): configs = cluster.get_clusterhost_config( self.clusterhost_id[0], user=self.user_object, ) package_config = configs['package_config'] os_config = configs['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestGetClusterHostDeployedConfig(ClusterTestCase): def setUp(self): super(TestGetClusterHostDeployedConfig, self).setUp() cluster.update_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'hosts': [self.host_id[0]] } ) cluster.update_cluster_host_deployed_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestGetClusterHostDeployedConfig, self).tearDown() def test_get_cluster_host_deployed_config(self): configs = cluster.get_cluster_host_deployed_config( self.cluster_id, self.host_id[0], user=self.user_object, ) package_config = configs['deployed_package_config'] os_config = configs['deployed_os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestGetClusterhostDeployedConfig(ClusterTestCase): """Test get clusterhost deployed config.""" def setUp(self): super(TestGetClusterhostDeployedConfig, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_clusterhost( self.clusterhost_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'hosts': [self.host_id[0]] } ) cluster.update_clusterhost_deployed_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestGetClusterhostDeployedConfig, self).tearDown() def test_get_clusterhost_deployed_config(self): configs = cluster.get_clusterhost_deployed_config( self.clusterhost_id[0], user=self.user_object, ) package_config = configs['deployed_package_config'] os_config = configs['deployed_os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestUpdateClusterHostConfig(ClusterTestCase): """Test update cluster host config.""" def setUp(self): super(TestUpdateClusterHostConfig, self).setUp() def tearDown(self): super(TestUpdateClusterHostConfig, self).tearDown() def test_update_cluster_host_config(self): cluster.update_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) config = cluster.get_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, ) package_configs = config['package_config'] os_configs = config['os_config'] self.assertItemsEqual(package_configs, self.package_configs) self.assertItemsEqual(os_configs, self.os_configs) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.update_cluster_host_config, self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) class TestUpdateClusterHostDeployedConfig(ClusterTestCase): """Test update cluster host deployed config.""" def setUp(self): super(TestUpdateClusterHostDeployedConfig, self).setUp() cluster.update_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_clusterhost( self.clusterhost_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'clusterhosts': [self.clusterhost_id[0]] } ) def tearDown(self): super(TestUpdateClusterHostDeployedConfig, self).tearDown() def test_udpate_cluster_host_deployed_config(self): cluster.update_cluster_host_deployed_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) configs = cluster.get_cluster_host_deployed_config( self.cluster_id, self.host_id[0], user=self.user_object, ) package_config = configs['deployed_package_config'] os_config = configs['deployed_os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestUpdateClusterhostConfig(ClusterTestCase): """Test update clusterhost config.""" def setUp(self): super(TestUpdateClusterhostConfig, self).setUp() def tearDown(self): super(TestUpdateClusterhostConfig, self).tearDown() def test_update_clusterhost_config(self): cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) configs = cluster.get_clusterhost_config( self.clusterhost_id[0], user=self.user_object, ) package_config = configs['package_config'] os_config = configs['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) def test_id_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.update_clusterhost_config, self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) class TestUpdateClusterhostDeployedConfig(ClusterTestCase): """Test update clusterhost config.""" def setUp(self): super(TestUpdateClusterhostDeployedConfig, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'clusterhosts': [self.clusterhost_id[0]] } ) def tearDown(self): super(TestUpdateClusterhostDeployedConfig, self).tearDown() def test_update_clusterhost_config(self): cluster.update_clusterhost_deployed_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) configs = cluster.get_clusterhost_deployed_config( self.clusterhost_id[0], user=self.user_object, ) package_config = configs['deployed_package_config'] os_config = configs['deployed_os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) class TestPatchClusterHostConfig(ClusterTestCase): """Test patch cluster host config.""" def setUp(self): super(TestPatchClusterHostConfig, self).setUp() def tearDown(self): super(TestPatchClusterHostConfig, self).tearDown() def test_patch_cluster_host_config(self): cluster.patch_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) configs = cluster.get_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, ) package_config = configs['package_config'] os_config = configs['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.patch_cluster_host_config, self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) class TestPatchClusterhostConfig(ClusterTestCase): """Test patch clusterhost config.""" def setUp(self): super(TestPatchClusterhostConfig, self).setUp() def tearDown(self): super(TestPatchClusterhostConfig, self).tearDown() def test_patch_clusterhost_config(self): cluster.patch_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) config = cluster.get_clusterhost_config( self.clusterhost_id[0], user=self.user_object, ) package_config = config['package_config'] os_config = config['os_config'] self.assertItemsEqual(package_config, self.package_configs) self.assertItemsEqual(os_config, self.os_configs) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.patch_clusterhost_config, self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) class TestDeleteClusterHostConfig(ClusterTestCase): """Test delete cluster host config.""" def setUp(self): super(TestDeleteClusterHostConfig, self).setUp() cluster.update_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestDeleteClusterHostConfig, self).tearDown() def test_delete_cluster_host_config(self): cluster.delete_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, ) del_cluster_host_config = cluster.get_cluster_host_config( self.cluster_id, self.host_id[0], user=self.user_object, ) configs = [] for k, v in del_cluster_host_config.items(): if k == 'package_config': configs.append(v) for config in configs: self.assertEqual(config, {}) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.delete_cluster_host_config, self.cluster_id, self.host_id[0], user=self.user_object, ) class TestDeleteClusterhostConfig(ClusterTestCase): """Test delete clusterhost config.""" def setUp(self): super(TestDeleteClusterhostConfig, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) def tearDown(self): super(TestDeleteClusterhostConfig, self).setUp() def test_delete_clusterhost_config(self): cluster.delete_clusterhost_config( self.clusterhost_id[0], user=self.user_object, ) del_clusterhost_config = cluster.get_clusterhost_config( self.clusterhost_id[0], user=self.user_object, ) configs = [] for k, v in del_clusterhost_config.items(): if k == 'package_config': configs.append(v) for config in configs: self.assertEqual(config, {}) def test_is_cluster_editable(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) self.assertRaises( exception.Forbidden, cluster.delete_clusterhost_config, self.clusterhost_id[0], user=self.user_object, ) class TestUpdateClusterHosts(ClusterTestCase): """Test update cluster hosts.""" def setUp(self): super(TestUpdateClusterHosts, self).setUp() switch.add_switch_machine( self.switch_id, user=self.user_object, mac='00:0c:29:5b:ee:eb', port='1' ) machines = machine.list_machines(self.user_object) self.add_machine_id = None for item in machines: if item['mac'] == '00:0c:29:5b:ee:eb': self.add_machine_id = item['id'] def tearDown(self): super(TestUpdateClusterHosts, self).tearDown() def test_update_cluster_hosts(self): # remove host cluster.update_cluster_hosts( self.cluster_id, user=self.user_object, remove_hosts={'hosts': self.host_id[0]} ) remove_hosts = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) result = None for item in remove_hosts: result = item self.assertNotIn(self.host_id[0], result) # add host cluster.update_cluster_hosts( self.cluster_id, user=self.user_object, add_hosts={'machines': [{'machine_id': self.add_machine_id}]} ) add_hosts = cluster.list_cluster_hosts( self.cluster_id, user=self.user_object, ) result = None for item in add_hosts: if item['machine_id'] == self.add_machine_id: result = item['mac'] self.assertEqual(result, '00:0c:29:5b:ee:eb') class TestReviewCluster(ClusterTestCase): """Test review cluster.""" def setUp(self): super(TestReviewCluster, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) def tearDown(self): super(TestReviewCluster, self).tearDown() def test_review_cluster(self): review_cluster = cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'hosts': [self.host_id[0]] } ) cluster_package_config = None cluster_os_config = None for k, v in review_cluster['cluster'].items(): if k == 'package_config': cluster_package_config = v if k == 'os_config': cluster_os_config = v host_package_config = None host_package_config = None for item in review_cluster['hosts']: for k, v in item.items(): if k == 'package_config': host_package_config = v if k == 'os_config': host_os_config = v self.assertItemsEqual(cluster_package_config, self.package_configs) self.assertItemsEqual(cluster_os_config, self.os_configs) self.assertItemsEqual(host_package_config, self.package_configs) self.assertItemsEqual(host_os_config, self.os_configs) class TestDeployedCluster(ClusterTestCase): """Test deployed cluster.""" def setUp(self): super(TestDeployedCluster, self).setUp() cluster.update_clusterhost_config( self.clusterhost_id[0], user=self.user_object, os_config=self.os_configs, package_config=self.package_configs ) cluster.update_cluster_host( self.cluster_id, self.host_id[0], user=self.user_object, roles=['allinone-compute'] ) cluster.review_cluster( self.cluster_id, user=self.user_object, review={ 'hosts': [self.host_id[0]], 'clusterhosts': [self.clusterhost_id[0]] } ) def tearDown(self): super(TestDeployedCluster, self).tearDown() def test_deploy_cluster(self): from compass.tasks import client as celery_client celery_client.celery.send_task = mock.Mock() deploy_cluster = cluster.deploy_cluster( self.cluster_id, user=self.user_object, deploy={ 'hosts': [self.host_id[0]], } ) cluster_package_config = None cluster_os_config = None for k, v in deploy_cluster['cluster'].items(): if k == 'package_config': cluster_package_config = v if k == 'os_config': cluster_os_config = v self.assertItemsEqual(cluster_package_config, self.package_configs) self.assertItemsEqual(cluster_os_config, self.os_configs) expected = { 'ip': '10.145.88.0', 'clusterhost_id': self.clusterhost_id[0], 'cluster_id': self.cluster_id, 'hostname': 'newname1', 'mac': '28:6e:d4:46:c4:25', 'clustername': 'test_cluster1' } self.assertTrue( all(item in deploy_cluster['hosts'][0].items() for item in expected.items())) class TestGetClusterState(ClusterTestCase): """Test get cluster state.""" def setUp(self): super(TestGetClusterState, self).setUp() def tearDown(self): super(TestGetClusterState, self).tearDown() def test_get_cluster_state(self): cluster_state = cluster.get_cluster_state( self.cluster_id, user=self.user_object, ) self.assertEqual(cluster_state['state'], 'UNINITIALIZED') class TestGetClusterHostState(ClusterTestCase): """Test get cluster host state.""" def setUp(self): super(TestGetClusterHostState, self).setUp() def tearDown(self): super(TestGetClusterHostState, self).tearDown() def test_get_cluster_host_state(self): cluster_host_state = cluster.get_cluster_host_state( self.cluster_id, self.host_id[0], user=self.user_object, ) self.assertEqual(cluster_host_state['state'], 'UNINITIALIZED') class TestGetClusterHostSelfState(ClusterTestCase): """Test get cluster hosts self state.""" def setUp(self): super(TestGetClusterHostSelfState, self).setUp() def tearDown(self): super(TestGetClusterHostSelfState, self).tearDown() def test_get_cluster_host_self_state(self): cluster_host_self_state = cluster.get_cluster_host_self_state( self.cluster_id, self.host_id[0], user=self.user_object, ) self.assertEqual(cluster_host_self_state['state'], 'UNINITIALIZED') class TestGetClusterhostState(ClusterTestCase): """Test get clusterhost state.""" def setUp(self): super(TestGetClusterhostState, self).setUp() def tearDown(self): super(TestGetClusterhostState, self).tearDown() def test_get_clusterhost_state(self): clusterhost_state = cluster.get_clusterhost_state( self.clusterhost_id[0], user=self.user_object, ) self.assertEqual(clusterhost_state['state'], 'UNINITIALIZED') class TestGetClusterhostSelfState(ClusterTestCase): """Test get clusterhost state.""" def setUp(self): super(TestGetClusterhostSelfState, self).setUp() def tearDown(self): super(TestGetClusterhostSelfState, self).tearDown() def test_get_clusterhost_state(self): clusterhost_state = cluster.get_clusterhost_self_state( self.clusterhost_id[0], user=self.user_object, ) self.assertEqual(clusterhost_state['state'], 'UNINITIALIZED') class TestUpdateClusterHostState(ClusterTestCase): """Test update cluster host state.""" def setUp(self): super(TestUpdateClusterHostState, self).setUp() def tearDown(self): super(TestUpdateClusterHostState, self).tearDown() def test_update_cluster_host_state(self): cluster.update_cluster_host_state( self.cluster_id, self.host_id[0], user=self.user_object, state='INSTALLING' ) update_state = cluster.get_cluster_host_state( self.cluster_id, self.host_id[0], user=self.user_object, ) self.assertEqual(update_state['state'], 'INSTALLING') class TestUpdateClusterhostState(ClusterTestCase): """Test update clusterhost state.""" def setUp(self): super(TestUpdateClusterhostState, self).setUp() def tearDown(self): super(TestUpdateClusterhostState, self).tearDown() def test_update_clusterhost_state(self): cluster.update_clusterhost_state( self.clusterhost_id[0], user=self.user_object, state='INSTALLING' ) clusterhost_state = cluster.get_clusterhost_state( self.clusterhost_id[0], user=self.user_object, ) self.assertEqual(clusterhost_state['state'], 'INSTALLING') class TestUpdateClusterState(ClusterTestCase): """Test update cluster state.""" def setUp(self): super(TestUpdateClusterState, self).setUp() def tearDown(self): super(TestUpdateClusterState, self).tearDown() def test_update_cluster_state(self): cluster.update_cluster_state( self.cluster_id, user=self.user_object, state='INSTALLING' ) cluster_state = cluster.get_cluster_state( self.cluster_id, user=self.user_object, ) self.assertEqual(cluster_state['state'], 'INSTALLING') class TestGetClusterHostLogHistories(ClusterTestCase): """Test get cluster host log histories.""" def setUp(self): super(TestGetClusterHostLogHistories, self).setUp() def tearDown(self): super(TestGetClusterHostLogHistories, self).tearDown() def test_get_cluster_host_log_histories(self): logs = cluster.get_cluster_host_log_histories( self.cluster_id, self.host_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) for item in result: self.assertEqual(result, ['log_file1', 'log_file2']) class TestGetClusterhostLogHistories(ClusterTestCase): """Test get clusterhost log histories.""" def setUp(self): super(TestGetClusterhostLogHistories, self).setUp() def tearDown(self): super(TestGetClusterhostLogHistories, self).tearDown() def test_get_clusterhost_log_histories(self): logs = cluster.get_clusterhost_log_histories( self.clusterhost_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) for item in result: self.assertEqual(result, ['log_file1', 'log_file2']) class TestGetClusterHostLogHistory(ClusterTestCase): """Test get cluster host log history.""" def setUp(self): super(TestGetClusterHostLogHistory, self).setUp() def tearDown(self): super(TestGetClusterHostLogHistory, self).tearDown() def test_get_cluster_host_log_history(self): log = cluster.get_cluster_host_log_history( self.cluster_id, self.host_id[0], 'log_file1', user=self.user_object, ) self.assertEqual(log['filename'], 'log_file1') class TestGetClusterhostLogHistory(ClusterTestCase): """Test get clusterhost log history.""" def setUp(self): super(TestGetClusterhostLogHistory, self).setUp() def tearDown(self): super(TestGetClusterhostLogHistory, self).tearDown() def test_get_clusterhost_log_history(self): log = cluster.get_clusterhost_log_history( self.clusterhost_id[0], 'log_file1', user=self.user_object, ) self.assertEqual(log['filename'], 'log_file1') class TestUpdateClusterHostLogHistory(ClusterTestCase): """Test update cluster host log history.""" def setUp(self): super(TestUpdateClusterHostLogHistory, self).setUp() def tearDown(self): super(TestUpdateClusterHostLogHistory, self).tearDown() def test_update_cluster_host_log_history(self): cluster.update_cluster_host_log_history( self.cluster_id, self.host_id[0], 'log_file1', user=self.user_object, severity='WARNING', message='test update cluster host log history.' ) update_log = cluster.get_cluster_host_log_history( self.cluster_id, self.host_id[0], 'log_file1', user=self.user_object, ) expected = { 'severity': 'WARNING', 'clusterhost_id': 1, 'filename': 'log_file1', 'cluster_id': 1, 'host_id': 1, 'message': 'test update cluster host log history.' } self.assertTrue( all(item in update_log.items() for item in expected.items())) class TestUpdateClusterhostLogHistory(ClusterTestCase): """Test update clusterhost log history.""" def setUp(self): super(TestUpdateClusterhostLogHistory, self).setUp() def tearDown(self): super(TestUpdateClusterhostLogHistory, self).tearDown() def test_update_clusterhost_log_history(self): cluster.update_clusterhost_log_history( self.clusterhost_id[0], 'log_file1', user=self.user_object, severity='WARNING', message='test update clusterhost log history.' ) update_log = cluster.get_clusterhost_log_history( self.clusterhost_id[0], 'log_file1', user=self.user_object, ) expected = { 'severity': 'WARNING', 'clusterhost_id': 1, 'filename': 'log_file1', 'cluster_id': 1, 'host_id': 1, 'message': 'test update clusterhost log history.' } self.assertTrue( all(item in update_log.items() for item in expected.items()) ) class TestAddClusterhostLogHistory(ClusterTestCase): """Test add clusterhost log history.""" def setUp(self): super(TestAddClusterhostLogHistory, self).setUp() def tearDown(self): super(TestAddClusterhostLogHistory, self).tearDown() def test_add_clusterhost_log_history(self): cluster.add_clusterhost_log_history( self.clusterhost_id[0], user=self.user_object, filename='add_log_file' ) logs = cluster.get_clusterhost_log_histories( self.clusterhost_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file', result) def test_add_clusterhost_log_history_position_args(self): cluster.add_clusterhost_log_history( self.clusterhost_id[0], False, 'add_log_file_position', user=self.user_object, ) logs = cluster.get_clusterhost_log_histories( self.clusterhost_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file_position', result) def test_add_clusterhost_log_history_session(self): with database.session() as session: cluster.add_clusterhost_log_history( self.clusterhost_id[0], user=self.user_object, filename='add_log_file_session', session=session ) logs = cluster.get_clusterhost_log_histories( self.clusterhost_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file_session', result) class TestAddClusterHostLogHistory(ClusterTestCase): """Test add cluster host log history.""" def setUp(self): super(TestAddClusterHostLogHistory, self).setUp() def tearDown(self): super(TestAddClusterHostLogHistory, self).tearDown() def test_add_cluster_host_log_history(self): cluster.add_cluster_host_log_history( self.cluster_id, self.host_id[0], user=self.user_object, filename='add_log_file' ) logs = cluster.get_cluster_host_log_histories( self.cluster_id, self.host_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file', result) def test_add_cluster_host_log_history_position(self): cluster.add_cluster_host_log_history( self.cluster_id, self.host_id[0], False, 'add_log_file_position', user=self.user_object, ) logs = cluster.get_cluster_host_log_histories( self.cluster_id, self.host_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file_position', result) def test_add_cluster_host_log_history_session(self): with database.session() as session: cluster.add_cluster_host_log_history( self.cluster_id, self.host_id[0], user=self.user_object, filename='add_log_file_session', session=session ) logs = cluster.get_cluster_host_log_histories( self.cluster_id, self.host_id[0], user=self.user_object, ) result = [] for log in logs: result.append(log['filename']) self.assertIn('add_log_file_session', result) if __name__ == '__main__': flags.init() logsetting.init() unittest2.main()