From 5144e92839af70845f0e0bc88ad37d6e361048a4 Mon Sep 17 00:00:00 2001 From: Dmitry Sutyagin Date: Thu, 27 Oct 2016 17:46:52 -0700 Subject: [PATCH] Add: scripts_all_pairs - new functionality Allows to run node-to-node tests for all possible pairs of nodes, uses as many nodes at once as possible, while not allowing any node to do two operations at the same time. Example: nodes: 1, 2, 3, 4 scripts: iperf-start.sh, iperf-client.sh, iperf-stop.sh rq structure: scripts_all_pairs: __default: - server_start: iperf-start.sh server_stop: iperf-stop.sh client: iperf-client.sh network: 'private' the new function "run_scripts_all_pairs" runs three phases in the following order: 1. "server_start" (all nodes) 2. "client" (1-to-1 all pairs, one way) 3. "server_stop" (all nodes) "server_start": will run "iperf-start.sh" on all nodes and record outputfile from stdout (required to leave process in background and disconnect). This script must allocate a temporary file (mktemp), redirect the output of the desired server process to this file and use "&" to send it to background, and it has to print the temporary file name to stdout (and nothing else, should be one line only) "client": will run "iperf-client.sh" on all nodes in turns, like so: turn 1: - on 1, with "$SERVER_IP" env var = ip of 2 and on 3, with "$SERVER_IP" pointing to 4 turn 2: - 1 -> 3 2 -> 4 turn 3: - 1 -> 4 2 -> 3 "server_stop": will run "iperf-stop.sh" on all nodes, this script has to stop what was started by 1 and collect (cat) the outputfile recored on step 1, it is provided via $SERVER_OUTPUT env variable network parameter is optional. If used, nodes must have "network_data" field containing a list of dicts, each dict should have "name" and "ip". network is matched by name, ip can be ip or hostname, if ip contains '/' then '/' an what follows will be trimmed (to support "1.2.3.4/24" ip format) If network paramenter is not specified, admin IPs will be used. "fuel nodes --json" -> network_data was used as reference. Main purpose of this is to test network We can do ping or anything else, we can also skip server part by not defining it in rq and only use "client" Change-Id: Ic3df7cd607aad129c8eb8131f408ee6f8638e084 --- timmy/cli.py | 3 ++ timmy/env.py | 2 +- timmy/nodes.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++++- timmy/tools.py | 29 ++++++++++++- 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/timmy/cli.py b/timmy/cli.py index c13149a..1d1e87a 100755 --- a/timmy/cli.py +++ b/timmy/cli.py @@ -331,6 +331,9 @@ def main(argv=None): if nm.has(Node.ckey, Node.skey): pretty_run(args.quiet, 'Executing commands and scripts', nm.run_commands, args=(args.maxthreads,)) + if nm.has('scripts_all_pairs'): + pretty_run(args.quiet, 'Executing paired scripts', + nm.run_scripts_all_pairs, args=(args.maxthreads,)) if nm.has(Node.fkey, Node.flkey): pretty_run(args.quiet, 'Collecting files and filelists', nm.get_files, args=(args.maxthreads,)) diff --git a/timmy/env.py b/timmy/env.py index a193936..d19c97e 100644 --- a/timmy/env.py +++ b/timmy/env.py @@ -16,7 +16,7 @@ # under the License. project_name = 'timmy' -version = '1.20.6' +version = '1.21.0' if __name__ == '__main__': import sys diff --git a/timmy/nodes.py b/timmy/nodes.py index 09ac8c5..1029c66 100644 --- a/timmy/nodes.py +++ b/timmy/nodes.py @@ -73,7 +73,7 @@ class Node(object): def __init__(self, ip, conf, id=None, name=None, fqdn=None, mac=None, cluster=None, roles=None, os_platform=None, - online=True, status="ready", logger=None): + online=True, status="ready", logger=None, network_data=None): self.logger = logger or logging.getLogger(project_name) self.id = int(id) if id is not None else None self.mac = mac @@ -89,6 +89,7 @@ class Node(object): self.logger.critical('Node: ip address must be defined') sys.exit(111) self.ip = ip + self.network_data = network_data self.release = None self.files = [] self.filelists = [] @@ -367,6 +368,80 @@ class Node(object): prefix=self.prefix) self.check_code(code, 'exec_simple_cmd', cmd, errs, ok_codes) + def exec_pair(self, phase, server_node=None, fake=False): + sn = server_node + cl = self.cluster_repr + if sn: + self.logger.debug('%s: phase %s: server %s' % (self.repr, phase, + sn.repr)) + else: + self.logger.debug('%s: phase %s' % (self.repr, phase)) + nond_msg = ('%s: network specified but network_data not set for %s') + nonet_msg = ('%s: network %s not found in network_data of %s') + nosrv_msg = ('%s: server_node not provided') + noip_msg = ('%s: %s has no IP in network %s') + for i in self.scripts_all_pairs: + if phase not in i: + self.logger.warning('phase %s not defined in config' % phase) + return self.scripts_all_pairs + if phase.startswith('client'): + if not sn: + self.logger.warning(nosrv_msg % self.repr) + return self.scripts_all_pairs + if 'network' in i: + if not sn.network_data: + self.logger.warning(nond_msg % (self.repr, sn.repr)) + return self.scripts_all_pairs + nd = sn.network_data + net_dict = dict((v['name'], v) for v in nd) + if i['network'] not in net_dict: + self.logger.warning(nonet_msg % (self.repr, sn.repr)) + return self.scripts_all_pairs + if 'ip' not in net_dict[i['network']]: + self.logger.warning(noip_msg % (self.repr, sn.repr, + i['network'])) + return self.scripts_all_pairs + ip = net_dict[i['network']]['ip'] + if '/' in ip: + server_ip = ip.split('/')[0] + else: + server_ip = ip + else: + server_ip = sn.ip + phase_val = i[phase] + ddir = os.path.join(self.outdir, 'scripts_all_pairs', cl, phase, + self.repr) + tools.mdir(ddir) + if type(phase_val) is dict: + env_vars = [phase_val.values()[0]] + phase_val = phase_val.keys()[0] + else: + env_vars = self.env_vars + if os.path.sep in phase_val: + f = phase_val + else: + f = os.path.join(self.rqdir, Node.skey, phase_val) + dfile = os.path.join(ddir, os.path.basename(f)) + if phase.startswith('client'): + env_vars.append('SERVER_IP=%s' % server_ip) + dname = os.path.basename(f) + '-%s' % server_ip + dfile = os.path.join(ddir, dname) + elif phase == 'server_stop' and 'server_output' in i: + env_vars.append('SERVER_OUTPUT=%s' % i['server_output']) + if fake: + return self.scripts_all_pairs + outs, errs, code = tools.ssh_node(ip=self.ip, + filename=f, + ssh_opts=self.ssh_opts, + env_vars=env_vars, + timeout=self.timeout, + prefix=self.prefix) + self.check_code(code, 'exec_pair, phase:%s' % phase, f, errs) + if phase == 'server_start' and code == 0: + i['server_output'] = outs.strip() + open(dfile, 'a+').write(outs) + return self.scripts_all_pairs + def get_files(self, timeout=15): self.logger.info('%s: getting files' % self.repr) cl = self.cluster_repr @@ -914,7 +989,7 @@ class NodeManager(object): for node_data in self.nodes_json: params = {'conf': self.conf} keys = ['id', 'cluster', 'roles', 'fqdn', 'name', 'mac', - 'os_platform', 'status', 'online', 'ip'] + 'os_platform', 'status', 'online', 'ip', 'network_data'] for key in keys: if key in node_data: params[key] = node_data[key] @@ -1164,6 +1239,39 @@ class NodeManager(object): run_items.append(tools.RunItem(target=n.put_files)) tools.run_batch(run_items, 10) + @run_with_lock + def run_scripts_all_pairs(self, maxthreads, fake=False): + if len(self.selected_nodes()) < 2: + self.logger.warning('less than 2 nodes are available, ' + 'skipping paired scripts') + return + run_server_start_items = [] + run_server_stop_items = [] + for n in self.selected_nodes(): + start_args = {'phase': 'server_start', 'fake': fake} + run_server_start_items.append(tools.RunItem(target=n.exec_pair, + args=start_args, + key=n.ip)) + stop_args = {'phase': 'server_stop', 'fake': fake} + run_server_stop_items.append(tools.RunItem(target=n.exec_pair, + args=stop_args)) + result = tools.run_batch(run_server_start_items, maxthreads, + dict_result=True) + for key in result: + self.nodes[key].scripts_all_pairs = result[key] + for pairset in tools.all_pairs(self.selected_nodes()): + run_client_items = [] + self.logger.info(['%s->%s' % (p[0].ip, p[1].ip) for p in pairset]) + for pair in pairset: + client = pair[0] + server = pair[1] + client_args = {'phase': 'client', 'server_node': server, + 'fake': fake} + run_client_items.append(tools.RunItem(target=client.exec_pair, + args=client_args)) + tools.run_batch(run_client_items, len(run_client_items)) + tools.run_batch(run_server_stop_items, maxthreads) + def has(self, *keys): nodes = {} for k in keys: @@ -1176,6 +1284,9 @@ class NodeManager(object): nodes[k].append(n) return nodes + def selected_nodes(self): + return [n for n in self.nodes.values() if not n.filtered_out] + def main(argv=None): return 0 diff --git a/timmy/tools.py b/timmy/tools.py index 533d754..80885c4 100644 --- a/timmy/tools.py +++ b/timmy/tools.py @@ -317,7 +317,7 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15, bstr = "%s timeout '%s' bash -c " % ( env_vars, timeout) else: - bstr = "timeout '%s' ssh -t -T %s '%s' '%s' " % ( + bstr = "timeout '%s' ssh -T %s '%s' '%s' " % ( timeout, ssh_opts, ip, env_vars) if filename is None: cmd = '%s %s' % (bstr, quote(prefix + ' ' + command)) @@ -389,5 +389,32 @@ def w_list(value): return value if type(value) == list else [value] +def all_pairs(items): + def incomplete(i_set, p_dict): + for i, p_set in p_dict.items(): + not_paired = i_set.difference(p_set).difference([i]) + if not_paired: + return not_paired + + items_set = set(items) + pairs = [] + paired = {} + for i in items_set: + paired[i] = set() + while incomplete(items_set, paired): + busy = set() + current_pairs = [] + for i in [i for i in items if items_set.difference(paired[i])]: + can_pair = incomplete(items_set.difference(busy), {i: paired[i]}) + if i not in busy and can_pair: + pair_i = next(iter(can_pair)) + current_pairs.append([i, pair_i]) + busy.add(i) + busy.add(pair_i) + paired[i].add(pair_i) + pairs.append(current_pairs) + return pairs + + if __name__ == '__main__': sys.exit(0)