From 77634b956538dabfeb616be1419298dc0372b8ca Mon Sep 17 00:00:00 2001 From: Dmitry Sutyagin Date: Thu, 22 Dec 2016 17:23:10 -0800 Subject: [PATCH] Add: max_pairs argument; other minor changes - Add max_pairs parameter to limit concurrency for scripts_all_pairs run - Move maxthreads and logs_maxthreads into configuration - Rework some of the configuration comments Change-Id: I74c308c9a7aa5177eba8d6c9afb69c79bc1f809a --- specs/python-timmy.spec | 5 ++++- timmy/cli.py | 29 ++++++++++++++++++----------- timmy/conf.py | 17 ++++++++++++++++- timmy/env.py | 2 +- timmy/nodes.py | 35 ++++++++++++++++++++--------------- timmy/tools.py | 4 +++- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/specs/python-timmy.spec b/specs/python-timmy.spec index ff3fd02..855e414 100644 --- a/specs/python-timmy.spec +++ b/specs/python-timmy.spec @@ -4,7 +4,7 @@ %global pypi_name timmy Name: python-%{pypi_name} -Version: 1.26.3 +Version: 1.26.4 Release: 1%{?dist}~mos0 Summary: Log collector tool for OpenStack Fuel @@ -107,6 +107,9 @@ popd %changelog +* Thu Dec 22 2016 Dmitry Sutyagin - 1.26.4 +- Add: max_pairs argument; other minor changes + * Wed Dec 21 2016 Dmitry Sutyagin - 1.26.3 - Fix: scripts_all_pairs sometimes uses same node diff --git a/timmy/cli.py b/timmy/cli.py index 98f5fed..c711324 100755 --- a/timmy/cli.py +++ b/timmy/cli.py @@ -91,6 +91,11 @@ def parser_init(add_help=False): ' for each pair of nodes [A, B] run client' ' script only on A (A->B connection).' ' Default is to run both A->B and B->A.')) + parser.add_argument('--max-pairs', type=int, metavar='NUMBER', + help=('When executing scripts_all_pairs (if defined),' + ' limit the amount of pairs processed' + ' simultaneously. Default is to run max number' + ' of pairs possible, which is num. nodes / 2.')) parser.add_argument('-P', '--put', nargs=2, action='append', metavar=('SOURCE', 'DESTINATION'), help=('Enables shell mode. Can be specified multiple' @@ -156,12 +161,10 @@ def parser_init(add_help=False): ' messages. Good for quick runs / "watch" wrap.' ' This option disables any -v parameters.'), action='store_true') - parser.add_argument('--maxthreads', type=int, default=100, - metavar='NUMBER', + parser.add_argument('--maxthreads', type=int, metavar='NUMBER', help=('Maximum simultaneous nodes for command' 'execution.')) - parser.add_argument('--logs-maxthreads', type=int, default=10, - metavar='NUMBER', + parser.add_argument('--logs-maxthreads', type=int, metavar='NUMBER', help='Maximum simultaneous nodes for log collection.') parser.add_argument('-t', '--outputs-timestamp', help=('Add timestamp to outputs - allows accumulating' @@ -233,6 +236,10 @@ def main(argv=None): conf['do_print_results'] = True if args.no_clean: conf['clean'] = False + if args.maxthreads: + conf['maxthreads'] = args.maxthreads + if args.logs_maxthreads: + conf['logs_maxthreads'] = args.logs_maxthreads if args.rqfile: conf['rqfile'] = [] for file in args.rqfile: @@ -307,6 +314,8 @@ def main(argv=None): conf['offline'] = True if args.one_way: conf['scripts_all_pairs_one_way'] = True + if args.max_pairs: + conf['scripts_all_pairs_max_pairs'] = args.max_pairs logger.info('Using rqdir: %s, rqfile: %s' % (conf['rqdir'], conf['rqfile'])) nm = pretty_run(args.quiet, 'Initializing node data', @@ -317,7 +326,7 @@ def main(argv=None): if not conf['offline'] and (args.only_logs or args.logs): logs = True size = pretty_run(args.quiet, 'Calculating logs size', - nm.calculate_log_size, args=(args.maxthreads,)) + nm.calculate_log_size) if size == 0: logger.warning('Size zero - no logs to collect.') has_logs = False @@ -337,17 +346,16 @@ def main(argv=None): pretty_run(args.quiet, 'Uploading files', nm.put_files) if nm.has(Node.ckey, Node.skey): pretty_run(args.quiet, 'Executing commands and scripts', - nm.run_commands, kwargs={'maxthreads': args.maxthreads, - 'fake': args.fake}) + nm.run_commands, kwargs={'fake': args.fake}) if conf['analyze']: pretty_run(args.quiet, 'Analyzing outputs', analyze, args=[nm]) if not conf['offline'] and not args.only_logs: if nm.has('scripts_all_pairs'): pretty_run(args.quiet, 'Executing paired scripts', - nm.run_scripts_all_pairs, args=(args.maxthreads,)) + nm.run_scripts_all_pairs) if nm.has(Node.fkey, Node.flkey): pretty_run(args.quiet, 'Collecting files and filelists', - nm.get_files, args=(args.maxthreads,)) + nm.get_files) if not args.no_archive and nm.has(*Node.conf_archive_general): pretty_run(args.quiet, 'Creating outputs and files archive', nm.create_archive_general, args=(60,)) @@ -355,8 +363,7 @@ def main(argv=None): msg = 'Collecting and packing logs' pretty_run(args.quiet, msg, nm.get_logs, args=(conf['compress_timeout'],), - kwargs={'maxthreads': args.logs_maxthreads, - 'fake': args.fake_logs}) + kwargs={'fake': args.fake_logs}) logger.info("Nodes:\n%s" % nm) if not args.quiet: print('Run complete. Node information:') diff --git a/timmy/conf.py b/timmy/conf.py index d68517e..7494048 100644 --- a/timmy/conf.py +++ b/timmy/conf.py @@ -72,9 +72,24 @@ def init_default_conf(): conf['do_print_results'] = False '''Clean - erase previous results in outdir and archive_dir dir, if any.''' conf['clean'] = True + '''Analyze collected data and provide cluster health insight.''' conf['analyze'] = False - conf['offline'] = False # mark all nodes as offline + '''Mark all nodes as inaccessible. Useful for offline analysis.''' + conf['offline'] = False + '''Limit the amount of workers which run simultanelously. Impacts all + concurrent operations except log collection and client phase of + scripts_all_pairs. Mandatory.''' + conf['maxthreads'] = 100 + '''Limit the amount of workers which collect logs (one per node = the + amount of nodes from which logs are simultaneously collected). Impacts + only log collection routine. Mandatory.''' + conf['logs_maxthreads'] = 10 + '''For each pair of nodes A & B only run client script on node A. + Decreases the amount of iterations in scripts_all_pairs twice.''' conf['scripts_all_pairs_one_way'] = False + '''How many pairs to process simultaneously. 0 = unlimited = num. nodes + divided by 2. Limits concurrency for scripts_all_pairs client phase.''' + conf['scripts_all_pairs_max_pairs'] = 0 return conf diff --git a/timmy/env.py b/timmy/env.py index 980e304..f88e0d1 100644 --- a/timmy/env.py +++ b/timmy/env.py @@ -16,7 +16,7 @@ # under the License. project_name = 'timmy' -version = '1.26.3' +version = '1.26.4' if __name__ == '__main__': import sys diff --git a/timmy/nodes.py b/timmy/nodes.py index ef57254..72cd0d4 100644 --- a/timmy/nodes.py +++ b/timmy/nodes.py @@ -538,6 +538,8 @@ class NodeManager(object): def base_init(self, conf, logger=None): self.conf = conf + self.maxthreads = conf['maxthreads'] # shortcut + self.logs_maxthreads = conf['maxthreads'] # shortcut self.logger = logger or logging.getLogger(project_name) if conf['outputs_timestamp'] or conf['dir_timestamp']: timestamp_str = datetime.now().strftime('_%F_%H-%M-%S') @@ -711,23 +713,23 @@ class NodeManager(object): for node in self.nodes.values(): node.apply_conf(self.conf) - def nodes_get_os(self, maxthreads=100): + def nodes_get_os(self): run_items = [] for key, node in self.selected_nodes.items(): if not node.os_platform: run_items.append(tools.RunItem(target=node.get_os, key=key)) - result = tools.run_batch(run_items, maxthreads, dict_result=True) + result = tools.run_batch(run_items, self.maxthreads, dict_result=True) for key in result: if result[key]: self.nodes[key].os_platform = result[key] - def nodes_check_access(self, maxthreads=100): + def nodes_check_access(self): self.logger.debug('checking if nodes are accessible') run_items = [] for key, node in self.selected_nodes.items(): run_items.append(tools.RunItem(target=node.check_access, key=key)) - result = tools.run_batch(run_items, maxthreads, dict_result=True) + result = tools.run_batch(run_items, self.maxthreads, dict_result=True) for key in result: self.nodes[key].accessible = result[key] @@ -759,25 +761,25 @@ class NodeManager(object): return all(checks) @run_with_lock - def run_commands(self, timeout=15, fake=False, maxthreads=100): + def run_commands(self, timeout=15, fake=False): run_items = [] for key, node in self.selected_nodes.items(): run_items.append(tools.RunItem(target=node.exec_cmd, args={'fake': fake}, key=key)) - result = tools.run_batch(run_items, maxthreads, dict_result=True) + result = tools.run_batch(run_items, self.maxthreads, dict_result=True) for key in result: self.nodes[key].mapcmds = result[key][0] self.nodes[key].mapscr = result[key][1] - def calculate_log_size(self, timeout=15, maxthreads=100): + def calculate_log_size(self, timeout=15): total_size = 0 run_items = [] for key, node in self.selected_nodes.items(): run_items.append(tools.RunItem(target=node.logs_populate, args={'timeout': timeout}, key=key)) - result = tools.run_batch(run_items, maxthreads, dict_result=True) + result = tools.run_batch(run_items, self.maxthreads, dict_result=True) for key in result: self.nodes[key].logs = result[key] for node in self.selected_nodes.values(): @@ -847,7 +849,7 @@ class NodeManager(object): return speed @run_with_lock - def get_logs(self, timeout, fake=False, maxthreads=10): + def get_logs(self, timeout, fake=False): if fake: self.logger.info('fake = True, skipping') return @@ -856,7 +858,8 @@ class NodeManager(object): speed = self.conf['logs_speed'] else: speed = self.find_adm_interface_speed() - speed = int(speed * 0.9 / min(maxthreads, len(self.nodes))) + speed = int(speed * 0.9 / min(self.logs_maxthreads, + len(self.nodes))) py_slowpipe = tools.slowpipe % speed limitcmd = "| python -c '%s'; exit ${PIPESTATUS}" % py_slowpipe run_items = [] @@ -884,7 +887,7 @@ class NodeManager(object): 'decode': False} run_items.append(tools.RunItem(target=node.exec_simple_cmd, args=args)) - tools.run_batch(run_items, maxthreads) + tools.run_batch(run_items, self.logs_maxthreads) @run_with_lock def get_files(self, timeout=15): @@ -901,8 +904,9 @@ class NodeManager(object): tools.run_batch(run_items, 10) @run_with_lock - def run_scripts_all_pairs(self, maxthreads, fake=False): + def run_scripts_all_pairs(self, fake=False): nodes = self.selected_nodes.values() + max_pairs = self.conf['scripts_all_pairs_max_pairs'] if len(nodes) < 2: self.logger.warning('less than 2 nodes are available, ' 'skipping paired scripts') @@ -917,12 +921,13 @@ class NodeManager(object): stop_args = {'phase': 'server_stop', 'fake': fake} run_server_stop_items.append(tools.RunItem(target=n.exec_pair, args=stop_args)) - result = tools.run_batch(run_server_start_items, maxthreads, + result = tools.run_batch(run_server_start_items, self.maxthreads, dict_result=True) for key in result: self.nodes[key].scripts_all_pairs = result[key] one_way = self.conf['scripts_all_pairs_one_way'] - for pairset in tools.all_pairs(nodes, one_way=one_way): + chain = tools.all_pairs(nodes, one_way=one_way, max_pairs=max_pairs) + for pairset in chain: run_client_items = [] self.logger.info(['%s->%s' % (p[0].ip, p[1].ip) for p in pairset]) for pair in pairset: @@ -933,7 +938,7 @@ class NodeManager(object): run_client_items.append(tools.RunItem(target=client.exec_pair, args=client_args)) tools.run_batch(run_client_items, len(run_client_items)) - tools.run_batch(run_server_stop_items, maxthreads) + tools.run_batch(run_server_stop_items, self.maxthreads) def has(self, *keys): nodes = {} diff --git a/timmy/tools.py b/timmy/tools.py index bb30517..bcfd1cd 100644 --- a/timmy/tools.py +++ b/timmy/tools.py @@ -408,7 +408,7 @@ def w_list(value): return value if type(value) == list else [value] -def all_pairs(items, one_way=False): +def all_pairs(items, one_way=False, max_pairs=0): def incomplete(items_set, paired_dict): for paired_set in paired_dict.values(): if items_set.difference(paired_set): @@ -433,6 +433,8 @@ def all_pairs(items, one_way=False): paired[i].add(pair_i) if one_way: paired[pair_i].add(i) + if max_pairs and len(busy) >= max_pairs: + break pairs.append(current_pairs) return pairs