Add support for progression runs

1. Add support for progression runs;
2. Move http_tool_config from metadata to redis message;
3. v4 image is created (back-compatiable to v3 if progression run is disabled);
4. Documents refines on cfg.scale.yaml;
5. Simplify the logging format string;

Change-Id: Iec92f0c6c5664c9e45bb377d3273204f8f27e0d8
This commit is contained in:
Yichen Wang 2015-09-21 14:47:29 -07:00
parent 36f1ae9e47
commit b50786dbbe
8 changed files with 212 additions and 99 deletions

View File

@ -70,7 +70,7 @@ wget http://www.cmake.org/files/v3.3/cmake-3.3.0-Linux-x86_64.tar.gz
tar xzf cmake-3.3.0-Linux-x86_64.tar.gz
# Install HdrHistorgram_c
git clone git://github.com/HdrHistogram/HdrHistogram_c.git
git clone -b HdrHistogram_c-0.9.0 git://github.com/HdrHistogram/HdrHistogram_c.git
cd HdrHistogram_c
/tmp/cmake-3.3.0-Linux-x86_64/bin/cmake .
make install
@ -78,6 +78,7 @@ make install
# Remove cmake and HdrHistogram_c builds
rm /tmp/cmake-3.3.0-Linux-x86_64.tar.gz
rm -rf /tmp/cmake-3.3.0-Linux-x86_64
cd ..
rm -rf HdrHistogram_c
# Install the http traffic generator

View File

@ -190,16 +190,21 @@ class KB_VM_Agent(object):
self.stop_hello.set()
elif message['cmd'] == 'EXEC':
self.last_cmd = ""
try:
cmd_res_tuple = eval('self.exec_' + message['data']['cmd'] + '()')
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
except Exception as exc:
cmd_res_dict = {
"status": 1,
"stdout": self.last_cmd,
"stderr": str(exc)
}
self.report('DONE', message['client-type'], cmd_res_dict)
arange = message['data']['active_range']
my_id = int(self.vm_name[self.vm_name.rindex('I') + 1:])
if (not arange) or (my_id >= arange[0] and my_id <= arange[1]):
try:
par = message['data'].get('parameter', '')
str_par = 'par' if par else ''
cmd_res_tuple = eval('self.exec_%s(%s)' % (message['data']['cmd'], str_par))
cmd_res_dict = dict(zip(("status", "stdout", "stderr"), cmd_res_tuple))
except Exception as exc:
cmd_res_dict = {
"status": 1,
"stdout": self.last_cmd,
"stderr": str(exc)
}
self.report('DONE', message['client-type'], cmd_res_dict)
elif message['cmd'] == 'ABORT':
# TODO(Add support to abort a session)
pass
@ -207,7 +212,6 @@ class KB_VM_Agent(object):
# Unexpected
# TODO(Logging on Agent)
print 'ERROR: Unexpected command received!'
pass
def work(self):
for item in self.pubsub.listen():
@ -232,11 +236,11 @@ class KB_VM_Agent(object):
self.last_cmd = KB_Instance.check_http_service(self.user_data['target_url'])
return self.exec_command(self.last_cmd)
def exec_run_http_test(self):
def exec_run_http_test(self, http_tool_configs):
self.last_cmd = KB_Instance.run_http_test(
dest_path=self.user_data['http_tool']['dest_path'],
dest_path='/usr/local/bin/wrk2',
target_url=self.user_data['target_url'],
**self.user_data['http_tool_configs'])
**http_tool_configs)
return self.exec_command_report(self.last_cmd)
def exec_command(cmd):

View File

@ -37,9 +37,8 @@ class ConfigController(object):
# Save the public key into a temporary file
if 'public_key' in user_config['kb_cfg']:
pubkey_filename = '/tmp/kb_public_key.pub'
f = open(pubkey_filename, 'w')
f.write(user_config['kb_cfg']['public_key_file'])
f.close()
with open(pubkey_filename, 'w') as f:
f.write(user_config['kb_cfg']['public_key_file'])
kb_config.config_scale['public_key_file'] = pubkey_filename
if 'prompt_before_run' in user_config['kb_cfg']:

View File

@ -1,15 +1,17 @@
# KloudBuster Default configuration file
#
# This file can be copied and used as a template to specify different settings,
# then passed to KloudBuster using the '--config-file <path>' option.
#
# NOTE: In the copy, properties that are unchanged (same as default) can be simply
# removed from the file.
# This file can be copied, used as a template to specify different settings, then
# passed to KloudBuster using the '--config-file <path> option.
# In the copy, properties that are unchanged (use same as default) can be simply removed
# from the file.
# Config options common to client and server side
# =====================================================
# COMMON CONFIG OPTIONS FOR BOTH SERVER AND CLIENT SIDE
# =====================================================
# Name of the image to use for all test VMs (client, server and proxy)
# The image name must exist in OpenStack and must be built with the appropriate
# packages.
# The image name must exist in OpenStack and built with appropriate packages.
# The default test VM image is named "kloudbuster_v<version>" where
# <version> is the KloudBuster test VM image version (e.g. "kloudbuster_v3")
# Leave empty to use the default test VM image (recommended).
@ -28,23 +30,22 @@ cleanup_resources: True
# Specifies how many VMs will be created at a time. Larger numbers can be used
# but will not necessarily shorten the overall staging time (this will largely
# depend on the scalability of the OpenStack control plane).
# Well tuned control planes with multiple instances of Nova have shown to support
# a concurrency level of up to around 50
# Well tuned control planes with multiple instances of NOVA have shown to
# support a concurrency level of up to around 50
vm_creation_concurrency: 5
# Public key to use to access all test VMs
# ssh access to the test VMs launched by kloudbuster is not required
# but can be handy if the user wants to ssh manually to any of them (for example
# to debug)
#
# If empty will default to the user's public key (~/.ssh/id_rsa.pub) if it
# exists, otherwise will not provision any public key.
# If configured or available, a key pair will be added for each
# configured user.
#
# NOTE: SSH access to the test VMs launched by kloudbuster is not required,
# but can be handy if the user wants to ssh manually to any of them (for
# example to debug)
public_key_file:
# ==========================
# SERVER SIDE CONFIG OPTIONS
# ==========================
server:
# Flavor to use for the test images
flavor:
@ -86,9 +87,35 @@ server:
# Note that this is ignored/overriden if you specify a topology file (-t)
availability_zone:
# ==========================
# CLIENT SIDE CONFIG OPTIONS
# ==========================
client:
# Progression testing configuration
# If enabled, KloudBuster will give multiple runs (progression) on the cloud,
# unless it reaches the scale defined in the below sections, or the stop limit.
progression:
# Enable/Disable the progression
enabled: False
# The starting count of VMs
vm_start: 1
# The steping for the VM count for each stage
vm_step: 1
# The stop condition, it is used for KloudBuster to determine when to
# stop the progression, and do the cleanup if needed. It defines as:
# [err_count(%%), percentile_of_packet_not_timeout(%%)]
#
# e.g. [0, 99.99] means, KloudBuster will continue the progression run
# only if *ALL* below conditions are satisfied:
# (1) The error count of packets are less or equal than 0%%; (no errors)
# (2) 99.99%% of the packets are within the timeout range;
#
# Note:
# (1) The timeout value is defined in the client:http_tool_config section;
# (2) The percentile of packets must be in the below list:
# [50, 75, 90, 99, 99.9, 99.99, 99.999]
stop_limit: [0, 99.99]
# Assign floating IP for every client side test VM
# Default: no floating IP (only assign internal fixed IP)
use_floatingip: False
@ -129,18 +156,18 @@ client:
# connected at any time to the matching HTTP server
connections: 1000
# Rate limit in RPS per client (0 for unlimit)
rate_limit: 500
rate_limit: 1000
# Timeout for HTTP requests
timeout: 5
# Connection Type: "Keep-alive", "New"
# Keep-alive: the TCP connection is reused across requests
# New: create a new TCP connection for every request (and close it after receiving the reply)
# Note: "New" is not currently supported.
# NOTE: "New" is not currently supported.
connection_type: 'Keep-alive'
# Interval for periodical report in seconds
# Use 0 if you only need 1 final aggregated report for the entire run duration
# Otherwise will provide results at every interval (results are reset at the start of each period and
# are not cumulative across periods)
# Otherwise will provide results at every interval (results are reset
# at the start of each period and are not cumulative across periods)
report_interval: 0
# Duration of testing tools (seconds)
duration: 30

View File

@ -98,7 +98,7 @@ class KbReport(object):
# add values for each row
rows = []
for run_res in self.data_list:
rps_max = run_res['http_rate_limit'] * run_res['total_client_vms']
rps_max = run_res['http_rate_limit']
rx_tp = float(run_res['http_throughput_kbytes'])
rx_tp = round(rx_tp * 8 / (1024 * 1024), 1)
cells = [run_res['filename'],

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import division
from collections import deque
from distutils.version import LooseVersion
import threading
@ -46,7 +47,8 @@ class KBRunner(object):
"""
def __init__(self, client_list, config, expected_agent_version, single_cloud=True):
self.client_dict = dict(zip([x.vm_name for x in client_list], client_list))
self.full_client_dict = dict(zip([x.vm_name for x in client_list], client_list))
self.client_dict = self.full_client_dict
self.config = config
self.single_cloud = single_cloud
self.result = {}
@ -63,10 +65,18 @@ class KBRunner(object):
self.report_chan_name = "kloudbuster_report"
self.message_queue = deque()
def header_formatter(self, stage, vm_count):
conns = vm_count * self.config.http_tool_configs.connections
rate_limit = vm_count * self.config.http_tool_configs.rate_limit
msg = "Stage %d: %d VM(s), %d Connections, %d Expected RPS" %\
(stage, vm_count, conns, rate_limit)
return msg
def msg_handler(self):
for message in self.pubsub.listen():
if message['data'] == "STOP":
break
LOG.kbdebug(message)
self.message_queue.append(message)
def setup_redis(self, redis_server, redis_server_port=6379, timeout=120):
@ -78,7 +88,7 @@ class KBRunner(object):
socket_connect_timeout=1,
socket_timeout=1)
success = False
retry_count = max(timeout / self.config.polling_interval, 1)
retry_count = max(timeout // self.config.polling_interval, 1)
# Check for connections to redis server
for retry in xrange(retry_count):
try:
@ -120,7 +130,7 @@ class KBRunner(object):
'''
if not polling_interval:
polling_interval = self.config.polling_interval
retry_count = max(timeout / polling_interval, 1)
retry_count = max(timeout // polling_interval, 1)
retry = cnt_succ = cnt_failed = 0
clist = self.client_dict.copy()
samples = []
@ -136,15 +146,14 @@ class KBRunner(object):
# No new message, commands are in executing
break
LOG.kbdebug(msg)
payload = eval(msg['data'])
vm_name = payload['sender-id']
instance = self.client_dict[vm_name]
cmd = payload['cmd']
if cmd == 'READY':
# If a READY packet is received, the corresponding VM is up
# running. We mark the flag for that VM, and skip all READY
# messages received afterwards.
instance = self.full_client_dict[vm_name]
if instance.up_flag:
continue
else:
@ -195,22 +204,23 @@ class KBRunner(object):
raise KBVMUpException()
self.send_cmd('ACK', None, None)
def setup_static_route(self, timeout=30):
func = {'cmd': 'setup_static_route'}
def setup_static_route(self, active_range, timeout=30):
func = {'cmd': 'setup_static_route', 'active_range': active_range}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBSetStaticRouteException()
def check_http_service(self, timeout=30):
func = {'cmd': 'check_http_service'}
def check_http_service(self, active_range, timeout=30):
func = {'cmd': 'check_http_service', 'active_range': active_range}
self.send_cmd('EXEC', 'http', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBHTTPServerUpException()
def run_http_test(self):
func = {'cmd': 'run_http_test'}
def run_http_test(self, active_range):
func = {'cmd': 'run_http_test', 'active_range': active_range,
'parameter': self.config.http_tool_configs}
self.send_cmd('EXEC', 'http', func)
# Give additional 30 seconds for everybody to report results
timeout = self.config.http_tool_configs.duration + 30
@ -224,6 +234,7 @@ class KBRunner(object):
self.result[key] = instance.http_client_parser(**self.result[key])
def gen_host_stats(self):
self.host_stats = {}
for vm in self.result.keys():
phy_host = self.client_dict[vm].host
if phy_host not in self.host_stats:
@ -234,6 +245,48 @@ class KBRunner(object):
for phy_host in self.host_stats:
self.host_stats[phy_host] = http_tool.consolidate_results(self.host_stats[phy_host])
def single_run(self, active_range=None):
try:
if self.single_cloud:
LOG.info("Setting up static route to reach tested cloud...")
self.setup_static_route(active_range)
LOG.info("Waiting for HTTP service to come up...")
self.check_http_service(active_range)
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Running HTTP Benchmarking...")
self.run_http_test(active_range)
# Call the method in corresponding tools to consolidate results
http_tool = self.client_dict.values()[0].http_tool
LOG.kbdebug(self.result.values())
self.tool_result = http_tool.consolidate_results(self.result.values())
self.tool_result['http_rate_limit'] =\
len(self.client_dict) * self.config.http_tool_configs.rate_limit
self.tool_result['total_connections'] =\
len(self.client_dict) * self.config.http_tool_configs.connections
self.tool_result['total_client_vms'] = len(self.full_client_dict)
self.tool_result['total_server_vms'] = len(self.full_client_dict)
# self.tool_result['host_stats'] = self.gen_host_stats()
except KBSetStaticRouteException:
LOG.error("Could not set static route.")
self.dispose()
return False
except KBHTTPServerUpException:
LOG.error("HTTP service is not up in testing cloud.")
self.dispose()
return False
except KBHTTPBenchException:
LOG.error("Error while running HTTP benchmarking tool.")
self.dispose()
return False
return True
def run(self):
try:
LOG.info("Waiting for agents on VMs to come up...")
@ -247,38 +300,53 @@ class KBRunner(object):
LOG.warn("The VM image you are running (%s) is not the expected version (%s) "
"this may cause some incompatibilities" %
(self.agent_version, self.expected_agent_version))
if self.single_cloud:
LOG.info("Setting up static route to reach tested cloud...")
self.setup_static_route()
LOG.info("Waiting for HTTP service to come up...")
self.check_http_service()
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Running HTTP Benchmarking...")
self.run_http_test()
# Call the method in corresponding tools to consolidate results
http_tool = self.client_dict.values()[0].http_tool
LOG.kbdebug(self.result.values())
self.tool_result = http_tool.consolidate_results(self.result.values())
self.tool_result['http_rate_limit'] = self.config.http_tool_configs.rate_limit
self.tool_result['total_connections'] =\
len(self.client_dict) * self.config.http_tool_configs.connections
self.gen_host_stats()
self.dispose()
except (KBSetStaticRouteException):
LOG.error("Could not set static route.")
except KBVMUpException:
LOG.error("Some VMs failed to start.")
self.dispose()
return
except (KBHTTPServerUpException):
LOG.error("HTTP service is not up in testing cloud.")
if self.config.progression.enabled:
start = self.config.progression.vm_start
step = self.config.progression.vm_step
limit = self.config.progression.stop_limit
timeout = self.config.http_tool_configs.timeout
vm_list = self.full_client_dict.keys()
vm_list.sort()
self.client_dict = {}
cur_stage = 1
while True:
cur_vm_count = len(self.client_dict)
target_vm_count = start + (cur_stage - 1) * step
if target_vm_count > len(self.full_client_dict):
break
if self.tool_result:
err = self.tool_result['http_sock_err'] / self.tool_result['http_total_req']
pert_dict = dict(self.tool_result['latency_stats'])
if limit[1] in pert_dict.keys():
timeout_at_percentile = pert_dict[limit[1]] // 1000000
else:
timeout_at_percentile = 0
LOG.warn('Percentile %s%% is not a standard statistic point.' % limit[1])
if err > limit[0] or timeout_at_percentile > timeout:
LOG.warn('KloudBuster is stopping the iteration because the result '
'reaches the stop limit.')
break
for idx in xrange(cur_vm_count, target_vm_count):
self.client_dict[vm_list[idx]] = self.full_client_dict[vm_list[idx]]
description = "-- %s --" % self.header_formatter(cur_stage, len(self.client_dict))
LOG.info(description)
if not self.single_run(active_range=[0, target_vm_count - 1]):
break
LOG.info('-- Stage %s: %s --' % (cur_stage, str(self.tool_result)))
self.tool_result['description'] = description
cur_stage += 1
yield self.tool_result
self.dispose()
return
except KBHTTPBenchException():
LOG.error("Error in HTTP benchmarking.")
else:
if self.single_run():
yield self.tool_result
self.dispose()
return

View File

@ -69,10 +69,10 @@ class Kloud(object):
LOG.info("Creating kloud: " + self.prefix)
# pre-compute the placement az to use for all VMs
self.placement_az = None
if scale_cfg['availability_zone']:
self.placement_az = scale_cfg['availability_zone']
LOG.info('%s Availability Zone: %s' % (self.name, self.placement_az))
self.placement_az = scale_cfg['availability_zone']\
if scale_cfg['availability_zone'] else None
if self.placement_az:
LOG.info('%s Availability Zone: %s' % (self.name, self.placement_az))
def create_resources(self, tenant_quota):
if self.reusing_tenants:
@ -233,7 +233,7 @@ class KloudBuster(object):
self.tenants_list['client'],
testing_side=True)
self.kb_proxy = None
self.final_result = None
self.final_result = []
self.server_vm_create_thread = None
self.client_vm_create_thread = None
self.kb_runner = None
@ -333,9 +333,11 @@ class KloudBuster(object):
ins.user_data['redis_server_port'] = 6379
ins.user_data['target_subnet_ip'] = svr_list[idx].subnet_ip
ins.user_data['target_shared_interface_ip'] = svr_list[idx].shared_interface_ip
# @TODO(Move this hard coded part to kb_vm_agent.py)
# @TODO(DELETE BELOW TWO LINES WHEN V4 IS OFFICIALLY UPLOADED)
# TO BE REMOVED #
ins.user_data['http_tool'] = {'dest_path': '/usr/local/bin/wrk2'}
ins.user_data['http_tool_configs'] = ins.config['http_tool_configs']
# TO BE REMOVED #
ins.boot_info['flavor_type'] = 'kb.client' if \
not self.tenants_list['client'] else self.testing_kloud.flavor_to_use
ins.boot_info['user_data'] = str(ins.user_data)
@ -346,16 +348,14 @@ class KloudBuster(object):
"""
vm_creation_concurrency = self.client_cfg.vm_creation_concurrency
cleanup_flag = True
self.final_result = []
try:
tenant_quota = self.calc_tenant_quota()
self.kloud.create_resources(tenant_quota['server'])
self.testing_kloud.create_resources(tenant_quota['client'])
# Start the runner and ready for the incoming redis messages
client_list = self.testing_kloud.get_all_instances()
server_list = self.kloud.get_all_instances()
# Setting up the KloudBuster Proxy node
client_list = self.testing_kloud.get_all_instances()
self.kb_proxy = client_list[-1]
client_list.pop()
@ -376,6 +376,20 @@ class KloudBuster(object):
kb_vm_agent.get_image_version(),
self.single_cloud)
self.kb_runner.setup_redis(self.kb_proxy.fip_ip)
if self.client_cfg.progression['enabled']:
log_info = "Progression run is enabled, KloudBuster will schedule "\
"multiple runs as listed:"
stage = 1
start = self.client_cfg.progression.vm_start
step = self.client_cfg.progression.vm_step
cur_vm_count = start
total_vm = self.get_tenant_vm_count(self.server_cfg) *\
self.server_cfg['number_tenants']
while (cur_vm_count <= total_vm):
log_info += "\n" + self.kb_runner.header_formatter(stage, cur_vm_count)
cur_vm_count = start + stage * step
stage += 1
LOG.info(log_info)
if self.single_cloud:
# Find the shared network if the cloud used to testing is same
@ -409,12 +423,9 @@ class KloudBuster(object):
self.print_provision_info()
# Run the runner to perform benchmarkings
self.kb_runner.run()
self.final_result = self.kb_runner.tool_result
self.final_result['total_server_vms'] = len(server_list)
self.final_result['total_client_vms'] = len(client_list)
# self.final_result['host_stats'] = self.kb_runner.host_stats
LOG.info(self.final_result)
for run_result in self.kb_runner.run():
self.final_result.append(self.kb_runner.tool_result)
LOG.info('SUMMARY: %s' % self.final_result)
except KeyboardInterrupt:
traceback.format_exc()
except (ClientException, Exception):
@ -429,7 +440,7 @@ class KloudBuster(object):
traceback.print_exc()
KBResLogger.dump_and_save('svr', self.kloud.res_logger.resource_list)
if not cleanup_flag:
LOG.warn('Some resources are not cleaned up properly.')
LOG.warn('Some resources in server cloud are not cleaned up properly.')
KBResLogger.dump_and_save('svr', self.kloud.res_logger.resource_list)
if self.client_cfg['cleanup_resources']:
@ -439,6 +450,7 @@ class KloudBuster(object):
traceback.print_exc()
KBResLogger.dump_and_save('clt', self.testing_kloud.res_logger.resource_list)
if not cleanup_flag:
LOG.warn('Some resources in client cloud are not cleaned up properly.')
KBResLogger.dump_and_save('clt', self.testing_kloud.res_logger.resource_list)
def dump_logs(self, offset=0):

View File

@ -45,6 +45,7 @@ WARNING = logging.WARNING
def setup(product_name, logfile=None):
dbg_color = handlers.ColorHandler.LEVEL_COLORS[logging.DEBUG]
handlers.ColorHandler.LEVEL_COLORS[logging.KBDEBUG] = dbg_color
CONF.logging_default_format_string = '%(asctime)s %(levelname)s %(message)s'
oslogging.setup(CONF, product_name)
# Adding the FileHandler to all known loggers inside KloudBuster
@ -67,6 +68,7 @@ def getLogger(name="unknown", version="unknown"):
oslogging._loggers[name] = KloudBusterContextAdapter(
logging.getLogger(name), {"project": "kloudbuster",
"version": version})
return oslogging._loggers[name]
def delete_logfile(product_name):