Add: max_pairs argument; other minor changes

- Add max_pairs parameter to limit concurrency for
scripts_all_pairs run
- Move maxthreads and logs_maxthreads into configuration
- Rework some of the configuration comments

Change-Id: I74c308c9a7aa5177eba8d6c9afb69c79bc1f809a
This commit is contained in:
Dmitry Sutyagin 2016-12-22 17:23:10 -08:00
parent f292a9fccd
commit 77634b9565
6 changed files with 62 additions and 30 deletions

View File

@ -4,7 +4,7 @@
%global pypi_name timmy
Name: python-%{pypi_name}
Version: 1.26.3
Version: 1.26.4
Release: 1%{?dist}~mos0
Summary: Log collector tool for OpenStack Fuel
@ -107,6 +107,9 @@ popd
%changelog
* Thu Dec 22 2016 Dmitry Sutyagin <dsutyagin@mirantis.com> - 1.26.4
- Add: max_pairs argument; other minor changes
* Wed Dec 21 2016 Dmitry Sutyagin <dsutyagin@mirantis.com> - 1.26.3
- Fix: scripts_all_pairs sometimes uses same node

View File

@ -91,6 +91,11 @@ def parser_init(add_help=False):
' for each pair of nodes [A, B] run client'
' script only on A (A->B connection).'
' Default is to run both A->B and B->A.'))
parser.add_argument('--max-pairs', type=int, metavar='NUMBER',
help=('When executing scripts_all_pairs (if defined),'
' limit the amount of pairs processed'
' simultaneously. Default is to run max number'
' of pairs possible, which is num. nodes / 2.'))
parser.add_argument('-P', '--put', nargs=2, action='append',
metavar=('SOURCE', 'DESTINATION'),
help=('Enables shell mode. Can be specified multiple'
@ -156,12 +161,10 @@ def parser_init(add_help=False):
' messages. Good for quick runs / "watch" wrap.'
' This option disables any -v parameters.'),
action='store_true')
parser.add_argument('--maxthreads', type=int, default=100,
metavar='NUMBER',
parser.add_argument('--maxthreads', type=int, metavar='NUMBER',
help=('Maximum simultaneous nodes for command'
'execution.'))
parser.add_argument('--logs-maxthreads', type=int, default=10,
metavar='NUMBER',
parser.add_argument('--logs-maxthreads', type=int, metavar='NUMBER',
help='Maximum simultaneous nodes for log collection.')
parser.add_argument('-t', '--outputs-timestamp',
help=('Add timestamp to outputs - allows accumulating'
@ -233,6 +236,10 @@ def main(argv=None):
conf['do_print_results'] = True
if args.no_clean:
conf['clean'] = False
if args.maxthreads:
conf['maxthreads'] = args.maxthreads
if args.logs_maxthreads:
conf['logs_maxthreads'] = args.logs_maxthreads
if args.rqfile:
conf['rqfile'] = []
for file in args.rqfile:
@ -307,6 +314,8 @@ def main(argv=None):
conf['offline'] = True
if args.one_way:
conf['scripts_all_pairs_one_way'] = True
if args.max_pairs:
conf['scripts_all_pairs_max_pairs'] = args.max_pairs
logger.info('Using rqdir: %s, rqfile: %s' %
(conf['rqdir'], conf['rqfile']))
nm = pretty_run(args.quiet, 'Initializing node data',
@ -317,7 +326,7 @@ def main(argv=None):
if not conf['offline'] and (args.only_logs or args.logs):
logs = True
size = pretty_run(args.quiet, 'Calculating logs size',
nm.calculate_log_size, args=(args.maxthreads,))
nm.calculate_log_size)
if size == 0:
logger.warning('Size zero - no logs to collect.')
has_logs = False
@ -337,17 +346,16 @@ def main(argv=None):
pretty_run(args.quiet, 'Uploading files', nm.put_files)
if nm.has(Node.ckey, Node.skey):
pretty_run(args.quiet, 'Executing commands and scripts',
nm.run_commands, kwargs={'maxthreads': args.maxthreads,
'fake': args.fake})
nm.run_commands, kwargs={'fake': args.fake})
if conf['analyze']:
pretty_run(args.quiet, 'Analyzing outputs', analyze, args=[nm])
if not conf['offline'] and not args.only_logs:
if nm.has('scripts_all_pairs'):
pretty_run(args.quiet, 'Executing paired scripts',
nm.run_scripts_all_pairs, args=(args.maxthreads,))
nm.run_scripts_all_pairs)
if nm.has(Node.fkey, Node.flkey):
pretty_run(args.quiet, 'Collecting files and filelists',
nm.get_files, args=(args.maxthreads,))
nm.get_files)
if not args.no_archive and nm.has(*Node.conf_archive_general):
pretty_run(args.quiet, 'Creating outputs and files archive',
nm.create_archive_general, args=(60,))
@ -355,8 +363,7 @@ def main(argv=None):
msg = 'Collecting and packing logs'
pretty_run(args.quiet, msg, nm.get_logs,
args=(conf['compress_timeout'],),
kwargs={'maxthreads': args.logs_maxthreads,
'fake': args.fake_logs})
kwargs={'fake': args.fake_logs})
logger.info("Nodes:\n%s" % nm)
if not args.quiet:
print('Run complete. Node information:')

View File

@ -72,9 +72,24 @@ def init_default_conf():
conf['do_print_results'] = False
'''Clean - erase previous results in outdir and archive_dir dir, if any.'''
conf['clean'] = True
'''Analyze collected data and provide cluster health insight.'''
conf['analyze'] = False
conf['offline'] = False # mark all nodes as offline
'''Mark all nodes as inaccessible. Useful for offline analysis.'''
conf['offline'] = False
'''Limit the amount of workers which run simultanelously. Impacts all
concurrent operations except log collection and client phase of
scripts_all_pairs. Mandatory.'''
conf['maxthreads'] = 100
'''Limit the amount of workers which collect logs (one per node = the
amount of nodes from which logs are simultaneously collected). Impacts
only log collection routine. Mandatory.'''
conf['logs_maxthreads'] = 10
'''For each pair of nodes A & B only run client script on node A.
Decreases the amount of iterations in scripts_all_pairs twice.'''
conf['scripts_all_pairs_one_way'] = False
'''How many pairs to process simultaneously. 0 = unlimited = num. nodes
divided by 2. Limits concurrency for scripts_all_pairs client phase.'''
conf['scripts_all_pairs_max_pairs'] = 0
return conf

View File

@ -16,7 +16,7 @@
# under the License.
project_name = 'timmy'
version = '1.26.3'
version = '1.26.4'
if __name__ == '__main__':
import sys

View File

@ -538,6 +538,8 @@ class NodeManager(object):
def base_init(self, conf, logger=None):
self.conf = conf
self.maxthreads = conf['maxthreads'] # shortcut
self.logs_maxthreads = conf['maxthreads'] # shortcut
self.logger = logger or logging.getLogger(project_name)
if conf['outputs_timestamp'] or conf['dir_timestamp']:
timestamp_str = datetime.now().strftime('_%F_%H-%M-%S')
@ -711,23 +713,23 @@ class NodeManager(object):
for node in self.nodes.values():
node.apply_conf(self.conf)
def nodes_get_os(self, maxthreads=100):
def nodes_get_os(self):
run_items = []
for key, node in self.selected_nodes.items():
if not node.os_platform:
run_items.append(tools.RunItem(target=node.get_os, key=key))
result = tools.run_batch(run_items, maxthreads, dict_result=True)
result = tools.run_batch(run_items, self.maxthreads, dict_result=True)
for key in result:
if result[key]:
self.nodes[key].os_platform = result[key]
def nodes_check_access(self, maxthreads=100):
def nodes_check_access(self):
self.logger.debug('checking if nodes are accessible')
run_items = []
for key, node in self.selected_nodes.items():
run_items.append(tools.RunItem(target=node.check_access,
key=key))
result = tools.run_batch(run_items, maxthreads, dict_result=True)
result = tools.run_batch(run_items, self.maxthreads, dict_result=True)
for key in result:
self.nodes[key].accessible = result[key]
@ -759,25 +761,25 @@ class NodeManager(object):
return all(checks)
@run_with_lock
def run_commands(self, timeout=15, fake=False, maxthreads=100):
def run_commands(self, timeout=15, fake=False):
run_items = []
for key, node in self.selected_nodes.items():
run_items.append(tools.RunItem(target=node.exec_cmd,
args={'fake': fake},
key=key))
result = tools.run_batch(run_items, maxthreads, dict_result=True)
result = tools.run_batch(run_items, self.maxthreads, dict_result=True)
for key in result:
self.nodes[key].mapcmds = result[key][0]
self.nodes[key].mapscr = result[key][1]
def calculate_log_size(self, timeout=15, maxthreads=100):
def calculate_log_size(self, timeout=15):
total_size = 0
run_items = []
for key, node in self.selected_nodes.items():
run_items.append(tools.RunItem(target=node.logs_populate,
args={'timeout': timeout},
key=key))
result = tools.run_batch(run_items, maxthreads, dict_result=True)
result = tools.run_batch(run_items, self.maxthreads, dict_result=True)
for key in result:
self.nodes[key].logs = result[key]
for node in self.selected_nodes.values():
@ -847,7 +849,7 @@ class NodeManager(object):
return speed
@run_with_lock
def get_logs(self, timeout, fake=False, maxthreads=10):
def get_logs(self, timeout, fake=False):
if fake:
self.logger.info('fake = True, skipping')
return
@ -856,7 +858,8 @@ class NodeManager(object):
speed = self.conf['logs_speed']
else:
speed = self.find_adm_interface_speed()
speed = int(speed * 0.9 / min(maxthreads, len(self.nodes)))
speed = int(speed * 0.9 / min(self.logs_maxthreads,
len(self.nodes)))
py_slowpipe = tools.slowpipe % speed
limitcmd = "| python -c '%s'; exit ${PIPESTATUS}" % py_slowpipe
run_items = []
@ -884,7 +887,7 @@ class NodeManager(object):
'decode': False}
run_items.append(tools.RunItem(target=node.exec_simple_cmd,
args=args))
tools.run_batch(run_items, maxthreads)
tools.run_batch(run_items, self.logs_maxthreads)
@run_with_lock
def get_files(self, timeout=15):
@ -901,8 +904,9 @@ class NodeManager(object):
tools.run_batch(run_items, 10)
@run_with_lock
def run_scripts_all_pairs(self, maxthreads, fake=False):
def run_scripts_all_pairs(self, fake=False):
nodes = self.selected_nodes.values()
max_pairs = self.conf['scripts_all_pairs_max_pairs']
if len(nodes) < 2:
self.logger.warning('less than 2 nodes are available, '
'skipping paired scripts')
@ -917,12 +921,13 @@ class NodeManager(object):
stop_args = {'phase': 'server_stop', 'fake': fake}
run_server_stop_items.append(tools.RunItem(target=n.exec_pair,
args=stop_args))
result = tools.run_batch(run_server_start_items, maxthreads,
result = tools.run_batch(run_server_start_items, self.maxthreads,
dict_result=True)
for key in result:
self.nodes[key].scripts_all_pairs = result[key]
one_way = self.conf['scripts_all_pairs_one_way']
for pairset in tools.all_pairs(nodes, one_way=one_way):
chain = tools.all_pairs(nodes, one_way=one_way, max_pairs=max_pairs)
for pairset in chain:
run_client_items = []
self.logger.info(['%s->%s' % (p[0].ip, p[1].ip) for p in pairset])
for pair in pairset:
@ -933,7 +938,7 @@ class NodeManager(object):
run_client_items.append(tools.RunItem(target=client.exec_pair,
args=client_args))
tools.run_batch(run_client_items, len(run_client_items))
tools.run_batch(run_server_stop_items, maxthreads)
tools.run_batch(run_server_stop_items, self.maxthreads)
def has(self, *keys):
nodes = {}

View File

@ -408,7 +408,7 @@ def w_list(value):
return value if type(value) == list else [value]
def all_pairs(items, one_way=False):
def all_pairs(items, one_way=False, max_pairs=0):
def incomplete(items_set, paired_dict):
for paired_set in paired_dict.values():
if items_set.difference(paired_set):
@ -433,6 +433,8 @@ def all_pairs(items, one_way=False):
paired[i].add(pair_i)
if one_way:
paired[pair_i].add(i)
if max_pairs and len(busy) >= max_pairs:
break
pairs.append(current_pairs)
return pairs