
Add basic bvt tests for influxdb_grafana. Add helpers modules. Change-Id: I698661a478674b7810f688433000ca39cdf35f3c
89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
# Copyright 2016 Mirantis, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import urllib2
|
|
|
|
from fuelweb_test import logger
|
|
from proboscis import asserts
|
|
|
|
from stacklight_tests import settings
|
|
|
|
|
|
def create_cluster(
|
|
env, name, cluster_settings=None, mode=settings.DEPLOYMENT_MODE):
|
|
return env.fuel_web.create_cluster(
|
|
name=name, settings=cluster_settings, mode=mode)
|
|
|
|
|
|
class PluginHelper(object):
|
|
"""Class for common help functions."""
|
|
|
|
def __init__(self, env):
|
|
self.env = env
|
|
self.fuel_web = self.env.fuel_web
|
|
self._cluster_id = None
|
|
|
|
@property
|
|
def cluster_id(self):
|
|
if self._cluster_id is None:
|
|
try:
|
|
self._cluster_id = self.fuel_web.get_last_created_cluster()
|
|
except urllib2.URLError:
|
|
raise EnvironmentError("No cluster was created.")
|
|
return self._cluster_id
|
|
|
|
@cluster_id.setter
|
|
def cluster_id(self, value):
|
|
self._cluster_id = value
|
|
|
|
def prepare_plugin(self, plugin_path):
|
|
"""Upload and install plugin by path."""
|
|
self.env.admin_actions.upload_plugin(plugin=plugin_path)
|
|
self.env.admin_actions.install_plugin(
|
|
plugin_file_name=os.path.basename(plugin_path))
|
|
|
|
def activate_plugin(self, name, version, options):
|
|
"""Activate and check exist plugin."""
|
|
msg = "Plugin couldn't be enabled. Check plugin version. Test aborted"
|
|
asserts.assert_true(
|
|
self.fuel_web.check_plugin_exists(self.cluster_id, name),
|
|
msg)
|
|
self.fuel_web.update_plugin_settings(
|
|
self.cluster_id, name, version, options)
|
|
|
|
def get_plugin_vip(self, vip_name):
|
|
"""Get plugin IP."""
|
|
networks = self.fuel_web.client.get_networks(self.cluster_id)
|
|
vip = networks.get('vips').get(vip_name, {}).get('ipaddr', None)
|
|
asserts.assert_is_not_none(
|
|
vip, "Failed to get the IP of {} server".format(vip_name))
|
|
|
|
logger.debug("Check that {} is ready".format(vip_name))
|
|
return vip
|
|
|
|
def get_all_ready_nodes(self):
|
|
return [node for node in
|
|
self.fuel_web.client.list_cluster_nodes(self.cluster_id)
|
|
if node["status"] == "ready"]
|
|
|
|
def deploy_cluster(self, nodes_roles):
|
|
"""Method to deploy cluster with provided node roles."""
|
|
self.fuel_web.update_nodes(self.cluster_id, nodes_roles)
|
|
self.fuel_web.deploy_cluster_wait(self.cluster_id)
|
|
|
|
def run_ostf(self, *args, **kwargs):
|
|
kwargs.update({"cluster_id": self.cluster_id})
|
|
self.fuel_web.run_ostf(*args, **kwargs)
|