Feature commit #1 to support running fio

Change-Id: Id211e499fbd25703777d56cb7d7e4a8342e00f67
This commit is contained in:
Yichen Wang 2016-01-15 17:14:10 -08:00
parent c506437efb
commit 781f59015b
11 changed files with 207 additions and 205 deletions

View File

@ -8,7 +8,7 @@ nginx:
nodejs:
nodejs-legacy:
npm:
nuttcp:
# nuttcp:
python-pip:
python-dev:
redis-server:

View File

@ -105,8 +105,11 @@ mv dist/* ../kb_server/public/ui
# =======
# Cleanup
# =======
# Remove HdrHistogram_c, wrk2, and fio builds
rm -rf /tmp/
rm -rf /tmp/HdrHistogram_c
rm -rf /tmp/wrk2
rm -rf /tmp/fio
# Remove KloudBuster Web UI builds
rm -rf /kb_test/kloudbuster/kb_web/node_modules
@ -123,6 +126,5 @@ apt-get -y --purge remove python-dev
apt-get -y --purge remove build-essential
apt-get -y --purge remove cmake
apt-get -y --purge remove npm
apt-get -y --purge remove xfsprogs
apt-get -y --purge autoremove
apt-get -y autoclean

View File

@ -101,9 +101,9 @@ class KB_Instance(object):
# Run the HTTP benchmarking tool
@staticmethod
def run_http_test(dest_path, target_url, threads, connections,
rate_limit, duration, timeout, connection_type,
report_interval):
def run_wrk2(dest_path, target_url, threads, connections,
rate_limit, duration, timeout, connection_type,
report_interval):
if not rate_limit:
rate_limit = 65535
@ -112,6 +112,29 @@ class KB_Instance(object):
report_interval, timeout, target_url)
return cmd
# Init volume
@staticmethod
def init_volume(size):
cmd = 'mkfs.xfs /dev/vdb && '
cmd += 'mkdir -p /mnt/volume && '
cmd += 'mount /dev/vdb /mnt/volume && '
cmd += 'dd if=/dev/zero of=/mnt/volume/kb_storage_test.bin bs=%s count=1' % size
return cmd
# Run fio
@staticmethod
def run_fio(dest_path, name, rw, bs, iodepth, runtime, rate_iops, rate, status_interval):
fixed_opt = '--thread --ioengine=libaio --out-format=json+ '
fixed_opt += '--filename=/mnt/volume/kb_storage_test.bin '
required_opt = '--name=%s --rw=%s --bs=%s --iodepth=%s --runtime=%s ' %\
(name, rw, bs, iodepth, runtime)
optional_opt = ''
optional_opt += '--rate_iops=%s ' % rate_iops if rate_iops else ''
optional_opt += '--rate=%s ' % rate if rate else ''
optional_opt += '--status_interval=%s ' % status_interval if status_interval else ''
cmd = '%s %s %s %s' % (dest_path, fixed_opt, required_opt, optional_opt)
return cmd
class KBA_Client(object):
@ -234,6 +257,8 @@ class KBA_Client(object):
# Unexpected
print 'ERROR: Unexpected command received!'
class KBA_HTTP_Client(KBA_Client):
def exec_setup_static_route(self):
self.last_cmd = KB_Instance.get_static_route(self.user_data['target_subnet_ip'])
result = self.exec_command(self.last_cmd)
@ -250,12 +275,24 @@ class KBA_Client(object):
return self.exec_command(self.last_cmd)
def exec_run_http_test(self, http_tool_configs):
self.last_cmd = KB_Instance.run_http_test(
self.last_cmd = KB_Instance.run_wrk2(
dest_path='/usr/local/bin/wrk2',
target_url=self.user_data['target_url'],
**http_tool_configs)
return self.exec_command_report(self.last_cmd)
class KBA_Storage_Client(KBA_Client):
def exec_init_volume(self, size):
self.last_cmd = KB_Instance.init_volume(size)
return self.exec_command(self.last_cmd)
def exec_run_storage_test(self, fio_configs):
self.last_cmd = KB_Instance.run_fio(
dest_path='usr/local/bin/fio',
**fio_configs)
return self.exec_command_report(self.last_cmd)
class KBA_Server(object):
@ -279,9 +316,6 @@ class KBA_Server(object):
class KBA_Proxy(object):
def __init__(self):
pass
def start_redis_server(self):
cmd = ['sudo', 'service', 'redis-server', 'start']
return exec_command(cmd)
@ -309,8 +343,9 @@ if __name__ == "__main__":
sys.exit(agent.start_nginx_server())
else:
sys.exit(1)
elif user_data.get('role') == 'Client':
agent = KBA_Client(user_data)
elif user_data.get('role')[-6:] == 'Client':
agent = KBA_HTTP_Client(user_data) if user_data['role'][:-7] == 'HTTP'\
else KBA_Storage_Client(user_data)
agent.setup_channels()
agent.hello_thread = threading.Thread(target=agent.send_hello)
agent.hello_thread.daemon = True

58
kloudbuster/fio_tool.py Normal file
View File

@ -0,0 +1,58 @@
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
from perf_tool import PerfTool
# from hdrh.histogram import HdrHistogram
import log as logging
LOG = logging.getLogger(__name__)
class FioTool(PerfTool):
def __init__(self, instance):
PerfTool.__init__(self, instance, 'fio')
def cmd_parser_run_client(self, status, stdout, stderr):
if status:
return self.parse_error(stderr)
# Sample Output:
# {
# }
try:
result = json.loads(stdout)
result = result
except Exception:
return self.parse_error('Could not parse: "%s"' % (stdout))
parsed_output = {'tool': self.name}
# TODO()
return parsed_output
@staticmethod
def consolidate_results(results):
# TODO()
return {'Test': 'Test'}
@staticmethod
def consolidate_samples(results, vm_count):
# TODO()
return {'Test': 'Test'}

View File

@ -130,7 +130,7 @@ class KBRunner(object):
retry = cnt_succ = cnt_failed = 0
clist = self.client_dict.copy()
samples = []
http_tool = self.client_dict.values()[0].http_tool
perf_tool = self.client_dict.values()[0].perf_tool
while (retry < retry_count and len(clist)):
time.sleep(polling_interval)
@ -161,7 +161,7 @@ class KBRunner(object):
sample_count = sample_count + 1
# Parse the results from HTTP Tools
instance = self.client_dict[vm_name]
self.result[vm_name] = instance.http_client_parser(**payload['data'])
self.result[vm_name] = instance.perf_client_parser(**payload['data'])
samples.append(self.result[vm_name])
elif cmd == 'DONE':
self.result[vm_name] = payload['data']
@ -185,7 +185,7 @@ class KBRunner(object):
LOG.info(log_msg)
if sample_count != 0:
report = http_tool.consolidate_samples(samples, len(self.client_dict))
report = perf_tool.consolidate_samples(samples, len(self.client_dict))
self.report['seq'] = self.report['seq'] + 1
self.report['report'] = report
LOG.info('Periodical report: %s.' % str(self.report))
@ -218,9 +218,9 @@ class KBRunner(object):
self.host_stats[phy_host] = []
self.host_stats[phy_host].append(self.result[vm])
http_tool = self.client_dict.values()[0].http_tool
perf_tool = self.client_dict.values()[0].perf_tool
for phy_host in self.host_stats:
self.host_stats[phy_host] = http_tool.consolidate_results(self.host_stats[phy_host])
self.host_stats[phy_host] = perf_tool.consolidate_results(self.host_stats[phy_host])
@abc.abstractmethod
def run(self, http_test_only=False):
@ -228,4 +228,4 @@ class KBRunner(object):
return None
def stop(self):
self.send_cmd('ABORT', 'http', None)
self.send_cmd('ABORT', None, None)

View File

@ -65,9 +65,9 @@ class KBRunner_HTTP(KBRunner):
LOG.warn("Testing VMs are not returning results within grace period, "
"summary shown below may not be accurate!")
# Parse the results from HTTP Tools
# Parse the results from HTTP benchmarking tool
for key, instance in self.client_dict.items():
self.result[key] = instance.http_client_parser(**self.result[key])
self.result[key] = instance.perf_client_parser(**self.result[key])
def single_run(self, active_range=None, http_test_only=False):
try:
@ -89,9 +89,9 @@ class KBRunner_HTTP(KBRunner):
self.run_http_test(active_range)
# Call the method in corresponding tools to consolidate results
http_tool = self.client_dict.values()[0].http_tool
perf_tool = self.client_dict.values()[0].perf_tool
LOG.kbdebug(self.result.values())
self.tool_result = http_tool.consolidate_results(self.result.values())
self.tool_result = perf_tool.consolidate_results(self.result.values())
self.tool_result['http_rate_limit'] =\
len(self.client_dict) * self.config.http_tool_configs.rate_limit
self.tool_result['total_connections'] =\

View File

@ -20,6 +20,9 @@ import log as logging
LOG = logging.getLogger(__name__)
class KBInitVolumeException(KBException):
pass
class KBRunner_Storage(KBRunner):
"""
Control the testing VMs on the testing cloud
@ -28,5 +31,56 @@ class KBRunner_Storage(KBRunner):
def __init__(self, client_list, config, expected_agent_version):
KBRunner.__init__(self, client_list, config, expected_agent_version, single_cloud=True)
def run(self, http_test_only=False):
raise KBException("NOT IMPLEMENT")
def init_volume(self, active_range, timeout=30):
func = {'cmd': 'init_volume', 'active_range': active_range,
'parameter': str(self.config.volume_size) + 'G'}
self.send_cmd('EXEC', 'storage', func)
cnt_succ = self.polling_vms(timeout)[0]
if cnt_succ != len(self.client_dict):
raise KBInitVolumeException()
def run_storage_test(self, active_range):
func = {'cmd': 'run_storage_test', 'active_range': active_range,
'parameter': dict(self.config.storage_tool_configs)}
self.send_cmd('EXEC', 'storage', func)
# Give additional 30 seconds for everybody to report results
timeout = self.config.storage_tool_configs.runtime + 30
cnt_pending = self.polling_vms(timeout)[2]
if cnt_pending != 0:
LOG.warn("Testing VMs are not returning results within grace period, "
"summary shown below may not be accurate!")
# Parse the results from storage benchmarking tool
for key, instance in self.client_dict.items():
self.result[key] = instance.storage_client_parser(**self.result[key])
def single_run(self, active_range=None, test_only=False):
try:
if not test_only:
LOG.info("Initilizing volume and setting up filesystem...")
self.init_volume(active_range)
if self.config.prompt_before_run:
print "Press enter to start running benchmarking tools..."
raw_input()
LOG.info("Running Storage Benchmarking...")
self.report = {'seq': 0, 'report': None}
self.result = {}
# self.run_storage_test(active_range)
# Call the method in corresponding tools to consolidate results
except KBInitVolumeException:
raise KBException("Could not initilize the volume.")
def run(self, test_only=False):
if not test_only:
# Resources are already staged, just re-run the storage benchmarking tool
self.wait_for_vm_up()
if self.config.progression.enabled:
# TODO(Implement progression runs)
pass
else:
self.single_run(test_only=test_only)
yield self.tool_result

View File

@ -55,11 +55,12 @@ def create_keystone_client(creds):
return (keystoneclient.Client(endpoint_type='publicURL', **creds), creds['auth_url'])
class Kloud(object):
def __init__(self, scale_cfg, cred, reusing_tenants, testing_side=False):
def __init__(self, scale_cfg, cred, reusing_tenants, testing_side=False, storage_mode=False):
self.tenant_list = []
self.testing_side = testing_side
self.scale_cfg = scale_cfg
self.reusing_tenants = reusing_tenants
self.storage_mode = storage_mode
self.keystone, self.auth_url = create_keystone_client(cred)
self.flavor_to_use = None
self.vm_up_count = 0
@ -387,7 +388,7 @@ class KloudBuster(object):
server_list = self.kloud.get_all_instances()
KBScheduler.setup_vm_mappings(client_list, server_list, "1:1")
for idx, ins in enumerate(client_list):
ins.user_data['role'] = 'Client'
ins.user_data['role'] = 'HTTP_Client'
ins.user_data['vm_name'] = ins.vm_name
ins.user_data['redis_server'] = self.kb_proxy.fixed_ip
ins.user_data['redis_server_port'] = 6379
@ -398,7 +399,7 @@ class KloudBuster(object):
ins.boot_info['user_data'] = str(ins.user_data)
elif test_mode == 'storage':
for idx, ins in enumerate(client_list):
ins.user_data['role'] = 'Client'
ins.user_data['role'] = 'Storage_Client'
ins.user_data['vm_name'] = ins.vm_name
ins.user_data['redis_server'] = self.kb_proxy.fixed_ip
ins.user_data['redis_server_port'] = 6379
@ -425,12 +426,14 @@ class KloudBuster(object):
vm_creation_concurrency = self.client_cfg.vm_creation_concurrency
tenant_quota = self.calc_tenant_quota()
if not self.storage_mode:
self.kloud = Kloud(self.server_cfg, self.server_cred, self.tenants_list['server'])
self.kloud = Kloud(self.server_cfg, self.server_cred, self.tenants_list['server'],
storage_mode=self.storage_mode)
self.server_vm_create_thread = threading.Thread(target=self.kloud.create_vms,
args=[vm_creation_concurrency])
self.server_vm_create_thread.daemon = True
self.testing_kloud = Kloud(self.client_cfg, self.client_cred,
self.tenants_list['client'], testing_side=True)
self.tenants_list['client'], testing_side=True,
storage_mode=self.storage_mode)
self.client_vm_create_thread = threading.Thread(target=self.testing_kloud.create_vms,
args=[vm_creation_concurrency])
self.client_vm_create_thread.daemon = True

View File

@ -14,6 +14,7 @@
#
from base_compute import BaseCompute
from fio_tool import FioTool
import log as logging
from wrk_tool import WrkTool
@ -35,50 +36,19 @@ class PerfInstance(BaseCompute):
self.ssh = None
self.az = None
self.storage_mode = network.router.user.tenant.kloud.storage_mode
self.perf_tool = FioTool(self) if self.storage_mode else WrkTool(self)
# self.tp_tool = nuttcp_tool.NuttcpTool(self)
self.http_tool = WrkTool(self)
def run_tp_client(self, label, dest_ip, target_instance,
mss=None, bandwidth=0, bidirectional=False, az_to=None):
# NOTE: This function will not work, and pending to convert to use redis
'''test iperf client using the default TCP window size
(tcp window scaling is normally enabled by default so setting explicit window
size is not going to help achieve better results)
:return: a dictionary containing the results of the run
'''
# TCP/UDP throughput with tp_tool, returns a list of dict
if self.tp_tool:
tp_tool_res = self.tp_tool.run_client(dest_ip,
target_instance,
mss=mss,
bandwidth=bandwidth,
bidirectional=bidirectional)
else:
tp_tool_res = []
res = {'ip_to': dest_ip}
res['ip_from'] = self.ssh_access.host
if label:
res['desc'] = label
if self.az:
res['az_from'] = self.az
if az_to:
res['az_to'] = az_to
res['distro_id'] = self.ssh.distro_id
res['distro_version'] = self.ssh.distro_version
# consolidate results for all tools
res['results'] = tp_tool_res
return res
def http_client_parser(self, status, stdout, stderr):
http_tool_res = self.http_tool.cmd_parser_run_client(status, stdout, stderr)
def perf_client_parser(self, status, stdout, stderr):
res = {'vm_name': self.vm_name}
res['target_url'] = self.target_url
res['ip_from'] = self.ssh_ip
perf_tool_res = self.perf_tool.cmd_parser_run_client(status, stdout, stderr)
if not self.storage_mode:
res['target_url'] = self.target_url
res['ip_from'] = self.ssh_ip
# consolidate results for all tools
res['results'] = http_tool_res
res['results'] = perf_tool_res
return res
# Send a command on the ssh session

View File

@ -41,46 +41,6 @@ class PerfTool(object):
def parse_error(self, msg):
return {'error': msg, 'tool': self.name}
def parse_results(self, protocol=None, throughput=None, lossrate=None, retrans=None,
rtt_ms=None, reverse_dir=False, msg_size=None, cpu_load=None,
http_total_req=None, http_rps=None, http_tp_kbytes=None,
http_sock_err=None, http_sock_timeout=None, http_err=None,
latency_stats=None):
res = {'tool': self.name}
if throughput is not None:
res['throughput_kbps'] = throughput
if protocol is not None:
res['protocol'] = protocol
if 'vm_bandwidth' in self.instance.config:
res['bandwidth_limit_kbps'] = self.instance.config.vm_bandwidth
if lossrate is not None:
res['loss_rate'] = lossrate
if retrans:
res['retrans'] = retrans
if rtt_ms:
res['rtt_ms'] = rtt_ms
if reverse_dir:
res['direction'] = 'reverse'
if msg_size:
res['pkt_size'] = msg_size
if cpu_load:
res['cpu_load'] = cpu_load
if http_total_req:
res['http_total_req'] = http_total_req
if http_rps:
res['http_rps'] = http_rps
if http_tp_kbytes:
res['http_throughput_kbytes'] = http_tp_kbytes
if http_sock_err:
res['http_sock_err'] = http_sock_err
if http_sock_timeout:
res['http_sock_timeout'] = http_sock_timeout
if http_err:
res['http_err'] = http_err
if latency_stats:
res['latency_stats'] = latency_stats
return res
@abc.abstractmethod
def cmd_parser_run_client(self, status, stdout, stderr):
# must be implemented by sub classes
@ -91,93 +51,3 @@ class PerfTool(object):
def consolidate_results(results):
# must be implemented by sub classes
return None
def find_udp_bdw(self, pkt_size, target_ip):
'''Find highest UDP bandwidth within max loss rate for given packet size
:return: a dictionary describing the optimal bandwidth (see parse_results())
'''
# we use a binary search to converge to the optimal throughput
# start with 5Gbps - mid-range between 1 and 10Gbps
# Convergence can be *very* tricky because UDP throughput behavior
# can vary dramatically between host runs and guest runs.
# The packet rate limitation is going to dictate the effective
# send rate, meaning that small packet sizes will yield the worst
# throughput.
# The measured throughput can be vastly smaller than the requested
# throughput even when the loss rate is zero when the sender cannot
# send fast enough to fill the network, in that case increasing the
# requested rate will not make it any better
# Examples:
# 1. too much difference between requested/measured bw - regardless of loss rate
# => retry with bw mid-way between the requested bw and the measured bw
# /tmp/nuttcp-7.3.2 -T2 -u -l128 -R5000000K -p5001 -P5002 -fparse 192.168.1.2
# megabytes=36.9785 real_seconds=2.00 rate_Mbps=154.8474 tx_cpu=23 rx_cpu=32
# drop=78149 pkt=381077 data_loss=20.50746
# /tmp/nuttcp-7.3.2 -T2 -u -l128 -R2500001K -p5001 -P5002 -fparse 192.168.1.2
# megabytes=47.8063 real_seconds=2.00 rate_Mbps=200.2801 tx_cpu=24 rx_cpu=34
# drop=0 pkt=391629 data_loss=0.00000
# 2. measured and requested bw are very close :
# if loss_rate is too low
# increase bw mid-way between requested and last max bw
# if loss rate is too high
# decrease bw mid-way between the measured bw and the last min bw
# else stop iteration (converged)
# /tmp/nuttcp-7.3.2 -T2 -u -l8192 -R859376K -p5001 -P5002 -fparse 192.168.1.2
# megabytes=204.8906 real_seconds=2.00 rate_Mbps=859.2992 tx_cpu=99 rx_cpu=10
# drop=0 pkt=26226 data_loss=0.00000
min_kbps = 1
max_kbps = 10000000
kbps = 5000000
min_loss_rate = self.instance.config.udp_loss_rate_range[0]
max_loss_rate = self.instance.config.udp_loss_rate_range[1]
# stop if the remaining range to cover is less than 5%
while (min_kbps * 100 / max_kbps) < 95:
res_list = self.run_client_dir(target_ip, 0, bandwidth_kbps=kbps,
udp=True, length=pkt_size,
no_cpu_timed=1)
# always pick the first element in the returned list of dict(s)
# should normally only have 1 element
res = res_list[0]
if 'error' in res:
return res
loss_rate = res['loss_rate']
measured_kbps = res['throughput_kbps']
LOG.kbdebug(
"[%s] pkt-size=%d throughput=%d<%d/%d<%d Kbps loss-rate=%d" %
(self.instance.vm_name, pkt_size, min_kbps, measured_kbps,
kbps, max_kbps, loss_rate))
# expected rate must be at least 80% of the requested rate
if (measured_kbps * 100 / kbps) < 80:
# the measured bw is too far away from the requested bw
# take half the distance or 3x the measured bw whichever is lowest
kbps = measured_kbps + (kbps - measured_kbps) / 2
if measured_kbps:
kbps = min(kbps, measured_kbps * 3)
max_kbps = kbps
continue
# The measured bw is within striking distance from the requested bw
# increase bw if loss rate is too small
if loss_rate < min_loss_rate:
# undershot
if measured_kbps > min_kbps:
min_kbps = measured_kbps
else:
# to make forward progress we need to increase min_kbps
# and try a higher bw since the loss rate is too low
min_kbps = int((max_kbps + min_kbps) / 2)
kbps = int((max_kbps + min_kbps) / 2)
# LOG.info(" undershot, min=%d kbps=%d max=%d" % (min_kbps, kbps, max_kbps))
elif loss_rate > max_loss_rate:
# overshot
max_kbps = kbps
if measured_kbps < kbps:
kbps = measured_kbps
else:
kbps = int((max_kbps + min_kbps) / 2)
# LOG.info(" overshot, min=%d kbps=%d max=%d" % (min_kbps, kbps, max_kbps))
else:
# converged within loss rate bracket
break
return res

View File

@ -74,13 +74,23 @@ class WrkTool(PerfTool):
except Exception:
return self.parse_error('Could not parse: "%s"' % (stdout))
return self.parse_results(http_total_req=http_total_req,
http_rps=http_rps,
http_tp_kbytes=http_tp_kbytes,
http_sock_err=http_sock_err,
http_sock_timeout=http_sock_timeout,
http_err=http_err,
latency_stats=latency_stats)
parsed_output = {'tool': self.name}
if http_total_req:
parsed_output['http_total_req'] = http_total_req
if http_rps:
parsed_output['http_rps'] = http_rps
if http_tp_kbytes:
parsed_output['http_throughput_kbytes'] = http_tp_kbytes
if http_sock_err:
parsed_output['http_sock_err'] = http_sock_err
if http_sock_timeout:
parsed_output['http_sock_timeout'] = http_sock_timeout
if http_err:
parsed_output['http_err'] = http_err
if latency_stats:
parsed_output['latency_stats'] = latency_stats
return parsed_output
@staticmethod
def consolidate_results(results):