diff --git a/scale/base_compute.py b/scale/base_compute.py index ee91c5c..528a816 100644 --- a/scale/base_compute.py +++ b/scale/base_compute.py @@ -169,7 +169,7 @@ class KeyPair(object): self.novaclient = novaclient - def add_public_key(self, name, public_key_file): + def add_public_key(self, name, public_key_file=None): """ Add the KloudBuster public key to openstack """ diff --git a/scale/base_network.py b/scale/base_network.py index 43e3a31..d9a4c54 100644 --- a/scale/base_network.py +++ b/scale/base_network.py @@ -14,6 +14,8 @@ import time +from perf_instance import PerfInstance + import base_compute import netaddr from neutronclient.common.exceptions import NetworkInUseClient @@ -115,11 +117,11 @@ class BaseNetwork(object): external_network = find_external_network(self.neutron_client) print "Creating Virtual machines for user %s" % (self.user_name) for instance_count in range(config_scale['vms_per_network']): - nova_instance = base_compute.BaseCompute(self.nova_client, self.user_name) - self.instance_list.append(nova_instance) + perf_instance = PerfInstance(self.nova_client, self.user_name) + self.instance_list.append(perf_instance) vm_name = "kloudbuster_vm" + "_" + self.network['id'] + str(instance_count) nic_used = [{'net-id': self.network['id']}] - nova_instance.create_server(vm_name, config_scale['image_name'], + perf_instance.create_server(vm_name, config_scale['image_name'], config_scale['flavor_type'], self.keypair_list[0].keypair_name, nic_used, @@ -129,23 +131,23 @@ class BaseNetwork(object): None, None) # Store the subnet info and fixed ip address in instance - nova_instance.subnet_ip = self.network['subnet_ip'] - nova_instance.fixed_ip = nova_instance.instance.networks.values()[0][0] + perf_instance.subnet_ip = self.network['subnet_ip'] + perf_instance.fixed_ip = perf_instance.instance.networks.values()[0][0] if self.shared_interface_ip: - nova_instance.shared_interface_ip = self.shared_interface_ip + perf_instance.shared_interface_ip = self.shared_interface_ip # Create the floating ip for the instance store it and the ip address in instance object if config_scale['use_floatingip']: - nova_instance.fip = create_floating_ip(self.neutron_client, external_network) - nova_instance.fip_ip = nova_instance.fip['floatingip']['floating_ip_address'] + perf_instance.fip = create_floating_ip(self.neutron_client, external_network) + perf_instance.fip_ip = perf_instance.fip['floatingip']['floating_ip_address'] # Associate the floating ip with this instance - nova_instance.instance.add_floating_ip(nova_instance.fip_ip) - nova_instance.ssh_ip = nova_instance.fip_ip + perf_instance.instance.add_floating_ip(perf_instance.fip_ip) + perf_instance.ssh_ip = perf_instance.fip_ip else: # Store the fixed ip as ssh ip since there is no floating ip - nova_instance.ssh_ip = nova_instance.fixed_ip + perf_instance.ssh_ip = perf_instance.fixed_ip print "VM Information" - print "SSH IP:%s" % (nova_instance.ssh_ip) - print "Subnet Info: %s" % (nova_instance.subnet_ip) + print "SSH IP:%s" % (perf_instance.ssh_ip) + print "Subnet Info: %s" % (perf_instance.subnet_ip) if self.shared_interface_ip: print "Shared router interface ip %s" % (self.shared_interface_ip) @@ -259,8 +261,6 @@ class Router(object): # Create the compute resources in the network network_instance.create_compute_resources(config_scale) - - def delete_network_resources(self): """ Delete all network and compute resources diff --git a/scale/cfg.scale.yaml b/scale/cfg.scale.yaml index 36e4329..50ea713 100644 --- a/scale/cfg.scale.yaml +++ b/scale/cfg.scale.yaml @@ -23,13 +23,13 @@ server: # Number of keypairs per user keypairs_per_network: 1 - use_floatingip: False + use_floatingip: True #Configs that remain constant keystone_admin_role: "admin" cleanup_resources : True public_key_file : '../ssh/id_rsa.pub' - image_name : 'Ubuntu Server 14.04' + image_name : 'Scale Image v2' flavor_type: 'm1.small' client: @@ -57,11 +57,13 @@ client: keypairs_per_network: 1 use_floatingip: True + # Specify whether the testing cloud is running in same cloud run_on_same_cloud: True + #Configs that remain constant keystone_admin_role: "admin" cleanup_resources : True public_key_file : '../ssh/id_rsa.pub' - image_name : 'Ubuntu Server 14.04' - flavor_type: 'm1.small' \ No newline at end of file + image_name : 'Scale Image v2' + flavor_type: 'm1.small' diff --git a/scale/kloudbuster.py b/scale/kloudbuster.py index dbac96b..5b0b1c5 100644 --- a/scale/kloudbuster.py +++ b/scale/kloudbuster.py @@ -13,6 +13,7 @@ # under the License. import argparse +import time import credentials @@ -76,13 +77,13 @@ class KloudBuster(object): """ pass - def runner(self): """ The runner for KloudBuster Tests Executes tests serially Support concurrency in fututure """ + # Create the keystone client for tenant and user creation operations # for the tested cloud keystone, auth_url = self.create_keystone_client(cred) @@ -99,6 +100,23 @@ class KloudBuster(object): # Function that print all the provisioning info self.print_provision_info() + svr = self.tenant.tenant_user_list[0].router_list[0].network_list[0].instance_list[0] + client = self.tenant_testing.tenant_user_list[0].router_list[0].network_list[0].\ + instance_list[0] + target_url = "http://" + svr.fip_ip + "/index.html" + + print "Server IP: " + svr.fip_ip + print "Client IP: " + client.fip_ip + print target_url + + client.setup_ssh(client.fip_ip, "ubuntu") + # HACK ALERT!!! + # Need to wait until all servers are up running before starting to inject traffic + time.sleep(20) + res = client.run_http_client(target_url, threads=2, connections=10000, + timeout=5, connection_type="New") + print res + if config_scale.server['cleanup_resources']: self.teardown_resources("tested") if config_scale.client['cleanup_resources']: @@ -128,7 +146,6 @@ class KloudBuster(object): """ return credentials.Credentials(rc, passwd, no_env) - if __name__ == '__main__': # The default configuration file for CloudScale default_cfg_file = "cfg.scale.yaml" @@ -168,7 +185,6 @@ if __name__ == '__main__': config_scale = configure.Configuration.from_file(default_cfg_file).configure() config_scale.debug = opts.debug - # The KloudBuster class is just a wrapper class # levarages tenant and user class for resource creations and # deletion diff --git a/scale/perf_instance.py b/scale/perf_instance.py new file mode 100644 index 0000000..e671244 --- /dev/null +++ b/scale/perf_instance.py @@ -0,0 +1,276 @@ +# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import os +import re +import stat +import subprocess + +from perf_tool import PingTool +import sshutils + +from base_compute import BaseCompute +from wrk_tool import WrkTool + +# An openstack instance (can be a VM or a LXC) +class PerfInstance(BaseCompute): + + def __init__(self, nova_client, user_name, config=None, is_server=False): + + BaseCompute.__init__(self, nova_client, user_name) + + if not config: + # HACK ALERT!!! + # We are expecting to see a valid config, here we just hack + class config: + ssh_vm_username = "ubuntu" + gmond_svr_ip = None + gmond_svr_port = None + tp_tool = None + http_tool = WrkTool + perf_tool_path = './tools' + private_key_file = './ssh/id_rsa' + ssh_retry_count = 50 + debug = True + time = 10 + vm_bandwidth = None + + self.config = config + self.internal_ip = None + self.ssh_ip = None + self.ssh_user = config.ssh_vm_username + self.ssh = None + self.port = None + self.is_server = is_server + self.name = "Test" + + if config.gmond_svr_ip: + self.gmond_svr = config.gmond_svr_ip + else: + self.gmond_svr = None + if config.gmond_svr_port: + self.gmond_port = int(config.gmond_svr_port) + else: + self.gmond_port = 0 + + self.ping = PingTool(self) + if config.tp_tool: + self.tp_tool = config.tp_tool(self, config.perf_tool_path) + else: + self.tp_tool = None + if config.http_tool: + self.http_tool = config.http_tool(self, config.perf_tool_path) + else: + self.http_tool = None + + def run_tp_client(self, label, dest_ip, target_instance, + mss=None, bandwidth=0, bidirectional=False, az_to=None): + '''test iperf client using the default TCP window size + (tcp window scaling is normally enabled by default so setting explicit window + size is not going to help achieve better results) + :return: a dictionary containing the results of the run + ''' + # Latency (ping rtt) + ping_res = self.ping.run_client(dest_ip) + + # TCP/UDP throughput with tp_tool, returns a list of dict + if self.tp_tool and (not ping_res or 'error' not in ping_res): + tp_tool_res = self.tp_tool.run_client(dest_ip, + target_instance, + mss=mss, + bandwidth=bandwidth, + bidirectional=bidirectional) + else: + tp_tool_res = [] + + res = {'ip_to': dest_ip} + if self.internal_ip: + res['ip_from'] = self.internal_ip + if label: + res['desc'] = label + if self.az: + res['az_from'] = self.az + if az_to: + res['az_to'] = az_to + res['distro_id'] = self.ssh.distro_id + res['distro_version'] = self.ssh.distro_version + + # consolidate results for all tools + if ping_res: + tp_tool_res.append(ping_res) + res['results'] = tp_tool_res + return res + + def run_http_client(self, target_url, threads, connections, + timeout=5, connection_type="New"): + # Latency (ping rtt) + # ping_res = self.ping.run_client(dest_ip) + + # HTTP Performance Measurement + if self.http_tool: + http_tool_res = self.http_tool.run_client(target_url, + threads, + connections, + timeout, + connection_type) + res = {'target_url': target_url} + if self.internal_ip: + res['ip_from'] = self.internal_ip + res['distro_id'] = self.ssh.distro_id + res['distro_version'] = self.ssh.distro_version + else: + http_tool_res = [] + + # consolidate results for all tools + # if ping_res: + # http_tool_res.append(ping_res) + res['results'] = http_tool_res + return res + + # Setup the ssh connectivity + # Returns True if success + def setup_ssh(self, ssh_ip, ssh_user): + # used for displaying the source IP in json results + if not self.internal_ip: + self.internal_ip = ssh_ip + self.ssh_ip = ssh_ip + self.ssh_user = ssh_user + self.ssh = sshutils.SSH(self.ssh_user, self.ssh_ip, + key_filename=self.config.private_key_file, + connect_retry_count=self.config.ssh_retry_count) + return True + + # Send a command on the ssh session + # returns stdout + def exec_command(self, cmd, timeout=30): + (status, cmd_output, err) = self.ssh.execute(cmd, timeout=timeout) + if status: + self.display('ERROR cmd=%s' % (cmd)) + if cmd_output: + self.display("%s", cmd_output) + if err: + self.display('error=%s' % (err)) + return None + self.buginf('%s', cmd_output) + return cmd_output + + # Display a status message with the standard header that has the instance + # name (e.g. [foo] some text) + def display(self, fmt, *args): + print('[%s] ' + fmt) % ((self.name,) + args) + + # Debugging message, to be printed only in debug mode + def buginf(self, fmt, *args): + if self.config.debug: + self.display(fmt, *args) + + # Ping an IP from this instance + def ping_check(self, target_ip, ping_count, pass_threshold): + return self.ssh.ping_check(target_ip, ping_count, pass_threshold) + + # Given a message size verify if ping without fragmentation works or fails + # Returns True if success + def ping_do_not_fragment(self, msg_size, ip_address): + cmd = "ping -M do -c 1 -s " + str(msg_size) + " " + ip_address + cmd_output = self.exec_command(cmd) + match = re.search('100% packet loss', cmd_output) + if match: + return False + else: + return True + + # Set the interface IP address and mask + def set_interface_ip(self, if_name, ip, mask): + self.buginf('Setting interface %s to %s mask %s', if_name, ip, mask) + cmd2apply = "sudo ifconfig %s %s netmask %s" % (if_name, ip, mask) + (rc, _, _) = self.ssh.execute(cmd2apply) + return rc + + # Get an interface IP address (returns None if error) + def get_interface_ip(self, if_name): + self.buginf('Getting interface %s IP and mask', if_name) + cmd2apply = "ifconfig %s" % (if_name) + (rc, res, _) = self.ssh.execute(cmd2apply) + if rc: + return None + # eth5 Link encap:Ethernet HWaddr 90:e2:ba:40:74:05 + # inet addr:172.29.87.29 Bcast:172.29.87.31 Mask:255.255.255.240 + # inet6 addr: fe80::92e2:baff:fe40:7405/64 Scope:Link + match = re.search(r'inet addr:([\d\.]*) ', res) + if not match: + return None + return match.group(1) + + # Set an interface MTU to passed in value + def set_interface_mtu(self, if_name, mtu): + self.buginf('Setting interface %s mtu to %d', if_name, mtu) + cmd2apply = "sudo ifconfig %s mtu %d" % (if_name, mtu) + (rc, _, _) = self.ssh.execute(cmd2apply) + return rc + + # Get the MTU of an interface + def get_interface_mtu(self, if_name): + cmd = "cat /sys/class/net/%s/mtu" % (if_name) + cmd_output = self.exec_command(cmd) + return int(cmd_output) + + # scp a file from the local host to the instance + # Returns True if dest file already exists or scp succeeded + # False in case of scp error + def scp(self, tool_name, source, dest): + + # check if the dest file is already present + if self.ssh.stat(dest): + self.buginf('tool %s already present - skipping install', tool_name) + return True + # scp over the tool binary + # first chmod the local copy since git does not keep the permission + os.chmod(source, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + # scp to the target + scp_opts = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' + scp_cmd = 'scp -i %s %s %s %s@%s:%s' % (self.config.private_key_file, + scp_opts, + source, + self.ssh_user, + self.ssh_ip, + dest) + self.buginf('Copying %s to target...', tool_name) + self.buginf(scp_cmd) + devnull = open(os.devnull, 'wb') + rc = subprocess.call(scp_cmd, shell=True, + stdout=devnull, stderr=devnull) + if rc: + self.display('Copy to target failed rc=%d', rc) + self.display(scp_cmd) + return False + return True + + def get_cmd_duration(self): + '''Get the duration of the client run + Will normally return the time configured in config.time + If cpu monitoring is enabled will make sure that this time is at least + 30 seconds (to be adjusted based on metric collection frequency) + ''' + if self.gmond_svr: + return max(30, self.config.time) + return self.config.time + + # Dispose the ssh session + def dispose(self): + if self.ssh: + self.ssh.close() + self.ssh = None + # BaseCompute.dispose(self) diff --git a/scale/perf_tool.py b/scale/perf_tool.py new file mode 100644 index 0000000..7cee03c --- /dev/null +++ b/scale/perf_tool.py @@ -0,0 +1,241 @@ +# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import abc +import re + +# where to copy the tool on the target, must end with slash +SCP_DEST_DIR = '/var/tmp/' + +# A base class for all tools that can be associated to an instance +class PerfTool(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, name, perf_tool_path, instance): + self.name = name + self.instance = instance + self.dest_path = SCP_DEST_DIR + name + self.pid = None + self.perf_tool_path = perf_tool_path + + # Terminate pid if started + def dispose(self): + if self.pid: + # Terminate the iperf server + self.instance.buginf('Terminating %s', self.name) + self.instance.ssh.kill_proc(self.pid) + self.pid = None + + def parse_error(self, msg): + return {'error': msg, 'tool': self.name} + + def parse_results(self, protocol=None, throughput=None, lossrate=None, retrans=None, + rtt_ms=None, reverse_dir=False, msg_size=None, cpu_load=None, + http_rps=None, http_rates=None, http_sock_err=None, http_err=None): + res = {'tool': self.name} + if throughput is not None: + res['throughput_kbps'] = throughput + if protocol is not None: + res['protocol'] = protocol + if self.instance.config.vm_bandwidth: + res['bandwidth_limit_kbps'] = self.instance.config.vm_bandwidth + if lossrate is not None: + res['loss_rate'] = lossrate + if retrans: + res['retrans'] = retrans + if rtt_ms: + res['rtt_ms'] = rtt_ms + if reverse_dir: + res['direction'] = 'reverse' + if msg_size: + res['pkt_size'] = msg_size + if cpu_load: + res['cpu_load'] = cpu_load + if http_rps: + res['http_rps'] = http_rps + if http_rates: + res['http_rates'] = http_rates + if http_sock_err: + res['http_sock_err'] = http_sock_err + if http_err: + res['http_err'] = http_err + return res + + @abc.abstractmethod + def run_client(**kwargs): + # must be implemented by sub classes + return None + + def find_udp_bdw(self, pkt_size, target_ip): + '''Find highest UDP bandwidth within max loss rate for given packet size + :return: a dictionary describing the optimal bandwidth (see parse_results()) + ''' + # we use a binary search to converge to the optimal throughput + # start with 5Gbps - mid-range between 1 and 10Gbps + # Convergence can be *very* tricky because UDP throughput behavior + # can vary dramatically between host runs and guest runs. + # The packet rate limitation is going to dictate the effective + # send rate, meaning that small packet sizes will yield the worst + # throughput. + # The measured throughput can be vastly smaller than the requested + # throughput even when the loss rate is zero when the sender cannot + # send fast enough to fill the network, in that case increasing the + # requested rate will not make it any better + # Examples: + # 1. too much difference between requested/measured bw - regardless of loss rate + # => retry with bw mid-way between the requested bw and the measured bw + # /tmp/nuttcp-7.3.2 -T2 -u -l128 -R5000000K -p5001 -P5002 -fparse 192.168.1.2 + # megabytes=36.9785 real_seconds=2.00 rate_Mbps=154.8474 tx_cpu=23 rx_cpu=32 + # drop=78149 pkt=381077 data_loss=20.50746 + # /tmp/nuttcp-7.3.2 -T2 -u -l128 -R2500001K -p5001 -P5002 -fparse 192.168.1.2 + # megabytes=47.8063 real_seconds=2.00 rate_Mbps=200.2801 tx_cpu=24 rx_cpu=34 + # drop=0 pkt=391629 data_loss=0.00000 + # 2. measured and requested bw are very close : + # if loss_rate is too low + # increase bw mid-way between requested and last max bw + # if loss rate is too high + # decrease bw mid-way between the measured bw and the last min bw + # else stop iteration (converged) + # /tmp/nuttcp-7.3.2 -T2 -u -l8192 -R859376K -p5001 -P5002 -fparse 192.168.1.2 + # megabytes=204.8906 real_seconds=2.00 rate_Mbps=859.2992 tx_cpu=99 rx_cpu=10 + # drop=0 pkt=26226 data_loss=0.00000 + + min_kbps = 1 + max_kbps = 10000000 + kbps = 5000000 + min_loss_rate = self.instance.config.udp_loss_rate_range[0] + max_loss_rate = self.instance.config.udp_loss_rate_range[1] + # stop if the remaining range to cover is less than 5% + while (min_kbps * 100 / max_kbps) < 95: + res_list = self.run_client_dir(target_ip, 0, bandwidth_kbps=kbps, + udp=True, length=pkt_size, + no_cpu_timed=1) + # always pick the first element in the returned list of dict(s) + # should normally only have 1 element + res = res_list[0] + if 'error' in res: + return res + loss_rate = res['loss_rate'] + measured_kbps = res['throughput_kbps'] + self.instance.buginf('pkt-size=%d throughput=%d<%d/%d<%d Kbps loss-rate=%d' % + (pkt_size, min_kbps, measured_kbps, kbps, max_kbps, loss_rate)) + # expected rate must be at least 80% of the requested rate + if (measured_kbps * 100 / kbps) < 80: + # the measured bw is too far away from the requested bw + # take half the distance or 3x the measured bw whichever is lowest + kbps = measured_kbps + (kbps - measured_kbps) / 2 + if measured_kbps: + kbps = min(kbps, measured_kbps * 3) + max_kbps = kbps + continue + # The measured bw is within striking distance from the requested bw + # increase bw if loss rate is too small + if loss_rate < min_loss_rate: + # undershot + if measured_kbps > min_kbps: + min_kbps = measured_kbps + else: + # to make forward progress we need to increase min_kbps + # and try a higher bw since the loss rate is too low + min_kbps = int((max_kbps + min_kbps) / 2) + + kbps = int((max_kbps + min_kbps) / 2) + # print ' undershot, min=%d kbps=%d max=%d' % (min_kbps, kbps, max_kbps) + elif loss_rate > max_loss_rate: + # overshot + max_kbps = kbps + if measured_kbps < kbps: + kbps = measured_kbps + else: + kbps = int((max_kbps + min_kbps) / 2) + # print ' overshot, min=%d kbps=%d max=%d' % (min_kbps, kbps, max_kbps) + else: + # converged within loss rate bracket + break + return res + + def get_proto_profile(self): + '''Return a tuple containing the list of protocols (tcp/udp) and + list of packet sizes (udp only) + ''' + # start with TCP (udp=False) then UDP + proto_list = [] + proto_pkt_sizes = [] + if 'T' in self.instance.config.protocols: + proto_list.append(False) + proto_pkt_sizes.append(self.instance.config.tcp_pkt_sizes) + if 'U' in self.instance.config.protocols: + proto_list.append(True) + proto_pkt_sizes.append(self.instance.config.udp_pkt_sizes) + return (proto_list, proto_pkt_sizes) + +class PingTool(PerfTool): + ''' + A class to run ping and get loss rate and round trip time + ''' + + def __init__(self, instance): + PerfTool.__init__(self, 'ping', None, instance) + + def run_client(self, target_ip, ping_count=5): + '''Perform the ping operation + :return: a dict containing the results stats + + Example of output: + 10 packets transmitted, 10 packets received, 0.0% packet loss + round-trip min/avg/max/stddev = 55.855/66.074/103.915/13.407 ms + or + 5 packets transmitted, 5 received, 0% packet loss, time 3998ms + rtt min/avg/max/mdev = 0.455/0.528/0.596/0.057 ms + ''' + if self.instance.config.ipv6_mode: + cmd = "ping6 -c " + str(ping_count) + " " + str(target_ip) + else: + cmd = "ping -c " + str(ping_count) + " " + str(target_ip) + cmd_out = self.instance.exec_command(cmd) + if not cmd_out: + res = {'protocol': 'ICMP', + 'tool': 'ping', + 'error': 'failed'} + return res + match = re.search(r'(\d*) packets transmitted, (\d*) ', + cmd_out) + if match: + tx_packets = match.group(1) + rx_packets = match.group(2) + else: + tx_packets = 0 + rx_packets = 0 + match = re.search(r'min/avg/max/[a-z]* = ([\d\.]*)/([\d\.]*)/([\d\.]*)/([\d\.]*)', + cmd_out) + if match: + rtt_min = match.group(1) + rtt_avg = match.group(2) + rtt_max = match.group(3) + rtt_stddev = match.group(4) + else: + rtt_min = 0 + rtt_max = 0 + rtt_avg = 0 + rtt_stddev = 0 + res = {'protocol': 'ICMP', + 'tool': 'ping', + 'tx_packets': tx_packets, + 'rx_packets': rx_packets, + 'rtt_min_ms': rtt_min, + 'rtt_max_ms': rtt_max, + 'rtt_avg_ms': rtt_avg, + 'rtt_stddev': rtt_stddev} + return res diff --git a/scale/ssh/id_rsa b/scale/ssh/id_rsa new file mode 100644 index 0000000..057e6d9 --- /dev/null +++ b/scale/ssh/id_rsa @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAu1wjIM/GgPbbLPoyKfN+I1uSeqrF4PYcbsTcHaQ6mEF/Ufqe +6uZVJFR1mT1ECfxCckUMM6aaf5ESNAfxEjE9Hrs/Yd3Qw5hwSlG3HJ4uZg79m1yg +ifXfnkp4rGNI6sqZxgyMbeQW3KoDaQ6zOe7e/KIkxX4XzR8I4d5lRx4ofIb806AB +QJkDrq48dHLGZnOBPvpNImpg5u6EWHGa4HI4Dl2pdyEQXTXOErupalOC1Cr8oxwu +fimSxftnl9Nh94wQtTQADnCE2GBaMMxS/ClHtJLDfmtnVC51Y4F7Ux9F3MSTSRBP +gNxcd9OikMKSj6RNq/PHw5+9R0h5v2lJXvalCQIDAQABAoIBAArCu/HCfTAi/WuT +4xWtumzlcYBCFqNY/0ENZWb+a68a8+kNb9sl53Xys95dOm8oYdiWRqEgzHbPKjB6 +1EmrMkt1japdRwQ02R4rm0y1eQy7h61IoJ/L01AQDuY3vZReln5dciNNmlKKITAD +fB+zrHLuDRaaq1tIkQYH8+ElxkWAkd/vRQC4FP1OMIGnX4TdQ8lcG2DxwMs5jqJ6 +ufTeR6QMDEymNYQwcdFhe5wNi57IEbN9B+N95yaktWsYV34HuYV2ndZtrhMLFhcq +Psw3vgrXBrreVPZ/iX1zeWgrjJb1AVOCtsOZ+O4YfZIIBWnhjj9sJnDCpMWmioH5 +a0UmF0ECgYEA+NyIS5MmbrVJKVqWUJubSbaZXQEW3Jv4njRFAyG7NVapSbllF5t2 +lq5usUI+l1XaZ3v6IpYPG+K+U1Ggo3+E6RFEDwVrZF0NYLOPXBydhkFFB4nHpTSX +uBo65/SiMDSassrqs/PFCDdsiUQ87sMFp+gouDePcBDC1OyHRDxR220CgYEAwLv6 +zvqi5AvzO+tZQHRHcQdvCNC436KsUZlV6jQ5j3tUlqXlLRl8bWfih7Hu2uBGNjy2 +Fao517Nd/kBdjVaechn/fvmLwgflQso0q1j63u2nZ32uYTd+zLnW1yJM4UCs/Hqb +hebRYDeZuRfobp62lEl6rdGij5LLRzQClOArso0CgYAaHClApKO3odWXPSXgNzNH +vJzCoUagxsyC7MEA3x0hL4J7dbQhkfITRSHf/y9J+Xv8t4k677uOFXAalcng3ZQ4 +T9NwMAVgdlLc/nngFDCC0X5ImDAWKTpx2m6rv4L0w9AnShrt3nmhrw74J+yssFF7 +mGQNT+cAvwFyDY7zndCI0QKBgEkZw0on5ApsweezHxoEQGiNcj68s7IWyBb2+pAn +GMHj/DRbXa4aYYg5g8EF6ttXfynpIwLamq/GV1ss3I7UEKqkU7S8P5brWbhYa1um +FxjguMLW94HmA5Dw15ynZNN2rWXhtwU1g6pjzElY2Q7D4eoiaIZu4aJlAfbSsjv3 +PnutAoGBAMBRX8BbFODtQr68c6LWWda5zQ+kNgeCv+2ejG6rsEQ+Lxwi86Oc6udG +kTP4xuZo80MEW/t+kibFgU6gm1WTVltpbjo0XTaHE1OV4JeNC8edYFTi1DVO5r1M +ch+pkN20FQmZ+cLLn6nOeTJ6/9KXWKAZMPZ4SH4BnmF7iEa7yc8f +-----END RSA PRIVATE KEY----- diff --git a/scale/ssh/id_rsa.pub b/scale/ssh/id_rsa.pub new file mode 100644 index 0000000..0313858 --- /dev/null +++ b/scale/ssh/id_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC7XCMgz8aA9tss+jIp834jW5J6qsXg9hxuxNwdpDqYQX9R+p7q5lUkVHWZPUQJ/EJyRQwzppp/kRI0B/ESMT0euz9h3dDDmHBKUbccni5mDv2bXKCJ9d+eSnisY0jqypnGDIxt5BbcqgNpDrM57t78oiTFfhfNHwjh3mVHHih8hvzToAFAmQOurjx0csZmc4E++k0iamDm7oRYcZrgcjgOXal3IRBdNc4Su6lqU4LUKvyjHC5+KZLF+2eX02H3jBC1NAAOcITYYFowzFL8KUe0ksN+a2dULnVjgXtTH0XcxJNJEE+A3Fx306KQwpKPpE2r88fDn71HSHm/aUle9qUJ openstack-pns diff --git a/scale/sshutils.py b/scale/sshutils.py new file mode 100644 index 0000000..b7b2a34 --- /dev/null +++ b/scale/sshutils.py @@ -0,0 +1,391 @@ +# Copyright 2013: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +"""High level ssh library. + +Usage examples: + +Execute command and get output: + + ssh = sshclient.SSH('root', 'example.com', port=33) + status, stdout, stderr = ssh.execute('ps ax') + if status: + raise Exception('Command failed with non-zero status.') + print stdout.splitlines() + +Execute command with huge output: + + class PseudoFile(object): + def write(chunk): + if 'error' in chunk: + email_admin(chunk) + + ssh = sshclient.SSH('root', 'example.com') + ssh.run('tail -f /var/log/syslog', stdout=PseudoFile(), timeout=False) + +Execute local script on remote side: + + ssh = sshclient.SSH('user', 'example.com') + status, out, err = ssh.execute('/bin/sh -s arg1 arg2', + stdin=open('~/myscript.sh', 'r')) + +Upload file: + + ssh = sshclient.SSH('user', 'example.com') + ssh.run('cat > ~/upload/file.gz', stdin=open('/store/file.gz', 'rb')) + +Eventlet: + + eventlet.monkey_patch(select=True, time=True) + or + eventlet.monkey_patch() + or + sshclient = eventlet.import_patched("opentstack.common.sshclient") + +""" + +import re +import select +import socket +import StringIO +import time + +import paramiko +import scp + +# from rally.openstack.common.gettextutils import _ + + +class SSHError(Exception): + pass + + +class SSHTimeout(SSHError): + pass + + +class SSH(object): + """Represent ssh connection.""" + + def __init__(self, user, host, port=22, pkey=None, + key_filename=None, password=None, + connect_timeout=60, + connect_retry_count=30, + connect_retry_wait_sec=2): + """Initialize SSH client. + + :param user: ssh username + :param host: hostname or ip address of remote ssh server + :param port: remote ssh port + :param pkey: RSA or DSS private key string or file object + :param key_filename: private key filename + :param password: password + :param connect_timeout: timeout when connecting ssh + :param connect_retry_count: how many times to retry connecting + :param connect_retry_wait_sec: seconds to wait between retries + """ + + self.user = user + self.host = host + self.port = port + self.pkey = self._get_pkey(pkey) if pkey else None + self.password = password + self.key_filename = key_filename + self._client = False + self.connect_timeout = connect_timeout + self.connect_retry_count = connect_retry_count + self.connect_retry_wait_sec = connect_retry_wait_sec + self.distro_id = None + self.distro_id_like = None + self.distro_version = None + self.__get_distro() + + def _get_pkey(self, key): + if isinstance(key, basestring): + key = StringIO.StringIO(key) + errors = [] + for key_class in (paramiko.rsakey.RSAKey, paramiko.dsskey.DSSKey): + try: + return key_class.from_private_key(key) + except paramiko.SSHException as exc: + errors.append(exc) + raise SSHError('Invalid pkey: %s' % (errors)) + + def _get_client(self): + if self._client: + return self._client + self._client = paramiko.SSHClient() + self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + for _ in range(self.connect_retry_count): + try: + self._client.connect(self.host, username=self.user, + port=self.port, pkey=self.pkey, + key_filename=self.key_filename, + password=self.password, + timeout=self.connect_timeout) + return self._client + except (paramiko.AuthenticationException, + paramiko.BadHostKeyException, + paramiko.SSHException, + socket.error): + time.sleep(self.connect_retry_wait_sec) + + self._client = None + msg = '[%s] SSH Connection failed after %s attempts' % (self.host, + self.connect_retry_count) + raise SSHError(msg) + + def close(self): + self._client.close() + self._client = False + + def run(self, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600): + """Execute specified command on the server. + + :param cmd: Command to be executed. + :param stdin: Open file or string to pass to stdin. + :param stdout: Open file to connect to stdout. + :param stderr: Open file to connect to stderr. + :param raise_on_error: If False then exit code will be return. If True + then exception will be raized if non-zero code. + :param timeout: Timeout in seconds for command execution. + Default 1 hour. No timeout if set to 0. + """ + + client = self._get_client() + + if isinstance(stdin, basestring): + stdin = StringIO.StringIO(stdin) + + return self._run(client, cmd, stdin=stdin, stdout=stdout, + stderr=stderr, raise_on_error=raise_on_error, + timeout=timeout) + + def _run(self, client, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600): + + transport = client.get_transport() + session = transport.open_session() + session.exec_command(cmd) + start_time = time.time() + + data_to_send = '' + stderr_data = None + + # If we have data to be sent to stdin then `select' should also + # check for stdin availability. + if stdin and not stdin.closed: + writes = [session] + else: + writes = [] + + while True: + # Block until data can be read/write. + select.select([session], writes, [session], 1) + + if session.recv_ready(): + data = session.recv(4096) + if stdout is not None: + stdout.write(data) + continue + + if session.recv_stderr_ready(): + stderr_data = session.recv_stderr(4096) + if stderr is not None: + stderr.write(stderr_data) + continue + + if session.send_ready(): + if stdin is not None and not stdin.closed: + if not data_to_send: + data_to_send = stdin.read(4096) + if not data_to_send: + stdin.close() + session.shutdown_write() + writes = [] + continue + sent_bytes = session.send(data_to_send) + data_to_send = data_to_send[sent_bytes:] + + if session.exit_status_ready(): + break + + if timeout and (time.time() - timeout) > start_time: + args = {'cmd': cmd, 'host': self.host} + raise SSHTimeout(('Timeout executing command ' + '"%(cmd)s" on host %(host)s') % args) + # if e: + # raise SSHError('Socket error.') + + exit_status = session.recv_exit_status() + if 0 != exit_status and raise_on_error: + fmt = ('Command "%(cmd)s" failed with exit_status %(status)d.') + details = fmt % {'cmd': cmd, 'status': exit_status} + if stderr_data: + details += (' Last stderr data: "%s".') % stderr_data + raise SSHError(details) + return exit_status + + def execute(self, cmd, stdin=None, timeout=3600): + """Execute the specified command on the server. + + :param cmd: Command to be executed. + :param stdin: Open file to be sent on process stdin. + :param timeout: Timeout for execution of the command. + + Return tuple (exit_status, stdout, stderr) + + """ + stdout = StringIO.StringIO() + stderr = StringIO.StringIO() + + exit_status = self.run(cmd, stderr=stderr, + stdout=stdout, stdin=stdin, + timeout=timeout, raise_on_error=False) + stdout.seek(0) + stderr.seek(0) + return (exit_status, stdout.read(), stderr.read()) + + def wait(self, timeout=120, interval=1): + """Wait for the host will be available via ssh.""" + start_time = time.time() + while True: + try: + return self.execute('uname') + except (socket.error, SSHError): + time.sleep(interval) + if time.time() > (start_time + timeout): + raise SSHTimeout(('Timeout waiting for "%s"') % self.host) + + def __extract_property(self, name, input_str): + expr = name + r'="?([\w\.]*)"?' + match = re.search(expr, input_str) + if match: + return match.group(1) + return 'Unknown' + + # Get the linux distro + def __get_distro(self): + '''cat /etc/*-release | grep ID + Ubuntu: + DISTRIB_ID=Ubuntu + ID=ubuntu + ID_LIKE=debian + VERSION_ID="14.04" + RHEL: + ID="rhel" + ID_LIKE="fedora" + VERSION_ID="7.0" + ''' + distro_cmd = "grep ID /etc/*-release" + (status, distro_out, _) = self.execute(distro_cmd) + if status: + distro_out = '' + self.distro_id = self.__extract_property('ID', distro_out) + self.distro_id_like = self.__extract_property('ID_LIKE', distro_out) + self.distro_version = self.__extract_property('VERSION_ID', distro_out) + + def pidof(self, proc_name): + ''' + Return a list containing the pids of all processes of a given name + the list is empty if there is no pid + ''' + # the path update is necessary for RHEL + cmd = "PATH=$PATH:/usr/sbin pidof " + proc_name + (status, cmd_output, _) = self.execute(cmd) + if status: + return [] + cmd_output = cmd_output.strip() + result = cmd_output.split() + return result + + # kill pids in the given list of pids + def kill_proc(self, pid_list): + cmd = "kill -9 " + ' '.join(pid_list) + self.execute(cmd) + + # check stats for a given path + def stat(self, path): + (status, cmd_output, _) = self.execute('stat ' + path) + if status: + return None + return cmd_output + + def ping_check(self, target_ip, ping_count=2, pass_threshold=80): + '''helper function to ping from one host to an IP address, + for a given count and pass_threshold; + Steps: + ssh to the host and then ping to the target IP + then match the output and verify that the loss% is + less than the pass_threshold% + Return 1 if the criteria passes + Return 0, if it fails + ''' + cmd = "ping -c " + str(ping_count) + " " + str(target_ip) + (_, cmd_output, _) = self.execute(cmd) + + match = re.search(r'(\d*)% packet loss', cmd_output) + pkt_loss = match.group(1) + if int(pkt_loss) < int(pass_threshold): + return 1 + else: + print 'Ping to %s failed: %s' % (target_ip, cmd_output) + return 0 + + def get_file_from_host(self, from_path, to_path): + ''' + A wrapper api on top of paramiko scp module, to scp + a local file to the host. + ''' + sshcon = self._get_client() + scpcon = scp.SCPClient(sshcon.get_transport()) + try: + scpcon.get(from_path, to_path) + except scp.SCPException as exp: + print ("Send failed: [%s]", exp) + return 0 + return 1 + + def read_remote_file(self, from_path): + ''' + Read a remote file and save it to a buffer. + ''' + cmd = "cat " + from_path + (status, cmd_output, _) = self.execute(cmd) + if status: + return None + return cmd_output + + +################################################## +# Only invoke the module directly for test purposes. Should be +# invoked from pns script. +################################################## +def main(): + # ssh = SSH('localadmin', '172.29.87.29', key_filename='./ssh/id_rsa') + ssh = SSH('localadmin', '172.22.191.173', key_filename='./ssh/id_rsa') + + print 'ID=' + ssh.distro_id + print 'ID_LIKE=' + ssh.distro_id_like + print 'VERSION_ID=' + ssh.distro_version + + # ssh.wait() + # print ssh.pidof('bash') + # print ssh.stat('/tmp') + +if __name__ == "__main__": + main() diff --git a/scale/tools/wrk b/scale/tools/wrk new file mode 100755 index 0000000..a10aeee Binary files /dev/null and b/scale/tools/wrk differ diff --git a/scale/wrk_tool.py b/scale/wrk_tool.py new file mode 100644 index 0000000..663ff76 --- /dev/null +++ b/scale/wrk_tool.py @@ -0,0 +1,109 @@ +# Copyright 2014 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import re + +from perf_tool import PerfTool +import sshutils + +class WrkTool(PerfTool): + + def __init__(self, instance, perf_tool_path): + PerfTool.__init__(self, 'wrk-4.0.1', perf_tool_path, instance) + + def get_server_launch_cmd(self): + '''This requires HTTP server is running already + ''' + return None + + def run_client(self, target_url, threads, connections, + timeout=5, connetion_type='New'): + '''Run the test + :return: list containing one or more dictionary results + ''' + + duration_sec = self.instance.get_cmd_duration() + + # Set the ulimit to 1024000 + cmd = 'sudo sh -c "ulimit -n 102400 && exec su $LOGNAME -c \'' + cmd += '%s -t%d -c%d -d%ds --timeout %ds --latency %s; exit\'"' % \ + (self.dest_path, threads, connections, duration_sec, + timeout, target_url) + + self.instance.display('Measuring HTTP performance...') + self.instance.buginf(cmd) + try: + # force the timeout value with 20 seconds extra for the command to + # complete and do not collect CPU + cmd_out = self.instance.exec_command(cmd, duration_sec + 20) + except sshutils.SSHError as exc: + # Timout or any SSH error + self.instance.display('SSH Error:' + str(exc)) + return [self.parse_error(str(exc))] + + # Sample Output: + # Running 10s test @ http://192.168.1.1/index.html + # 8 threads and 5000 connections + # Thread Stats Avg Stdev Max +/- Stdev + # Latency 314.97ms 280.34ms 999.98ms 74.05% + # Req/Sec 768.45 251.19 2.61k 74.47% + # Latency Distribution + # 50% 281.43ms + # 75% 556.37ms + # 90% 790.04ms + # 99% 969.79ms + # 61420 requests in 10.10s, 2.79GB read + # Socket errors: connect 0, read 0, write 0, timeout 10579 + # Non-2xx or 3xx responses: 828 + # Requests/sec: 6080.66 + # Transfer/sec: 282.53MB + + try: + re_str = r'Requests/sec:\s+(\d+\.\d+)' + http_rps = re.search(re_str, cmd_out).group(1) + + re_str = r'Transfer/sec:\s+(\d+\.\d+.B)' + http_rates = re.search(re_str, cmd_out).group(1) + # Uniform in unit MB + ex_unit = 'KMG'.find(http_rates[-2]) + if ex_unit == -1: + raise ValueError + val = float(http_rates[0:-2]) + http_rates = float(val * (1024 ** (ex_unit))) + + re_str = r'Socket errors: connect (\d+), read (\d+), write (\d+), timeout (\d+)' + http_sock_err = re.search(re_str, cmd_out) + if http_sock_err: + v1 = int(http_sock_err.group(1)) + v2 = int(http_sock_err.group(2)) + v3 = int(http_sock_err.group(3)) + v4 = int(http_sock_err.group(4)) + http_sock_err = v1 + v2 + v3 + v4 + else: + http_sock_err = 0 + + re_str = r'Non-2xx or 3xx responses: (\d+)' + http_err = re.search(re_str, cmd_out) + if http_err: + http_err = http_err.group(1) + else: + http_err = 0 + except Exception: + return self.parse_error('Could not parse: %s' % (cmd_out)) + + return self.parse_results(http_rps=http_rps, + http_rates=http_rates, + http_sock_err=http_sock_err, + http_err=http_err)