commit
121dd75c75
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
*.swp
|
||||
*.pyc
|
||||
doc/build
|
||||
|
@ -16,6 +16,8 @@ rqfile: './rq.yaml'
|
||||
soft_filter:
|
||||
status: ['ready']
|
||||
online: True
|
||||
# no_id: [0] # skip Fuel
|
||||
# no_roles: ['fuel'] # another way to skip Fuel
|
||||
timeout: 15
|
||||
compress_timeout: 3600
|
||||
logs:
|
||||
|
@ -44,7 +44,7 @@ If you want to do a set of actions on the nodes and you do not want to write a l
|
||||
::
|
||||
|
||||
rqdir: './pacemaker-debug' # a folder which should contain any filelists and/or scripts if they are defined later, should contain folders 'filelists' and/or 'scripts'
|
||||
rqfile: None # explicitly undefine rqfile to skip default filelists and scripts
|
||||
rqfile: null # explicitly undefine rqfile to skip default filelists and scripts
|
||||
hard_filter:
|
||||
roles: # only execute on Fuel and controllers
|
||||
- fuel
|
||||
|
3
setup.py
3
setup.py
@ -18,7 +18,8 @@ setup(name=pname,
|
||||
url='https://github.com/adobdin/timmy',
|
||||
long_description=open('README.md').read(),
|
||||
packages=[pname],
|
||||
install_requires=['pyyaml'],
|
||||
data_files=rqfiles,
|
||||
include_package_data=True,
|
||||
entry_points={'console_scripts': ['%s=%s.cli:main' %(pname, pname) ]},
|
||||
entry_points={'console_scripts': ['%s=%s.cli:main' % (pname, pname)]},
|
||||
)
|
||||
|
50
timmy/cli.py
50
timmy/cli.py
@ -33,6 +33,7 @@ def pretty_run(quiet, msg, f, args=[], kwargs={}):
|
||||
print('%s: done' % msg)
|
||||
return result
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description=('Parallel remote command'
|
||||
' execution and file'
|
||||
@ -107,7 +108,7 @@ def parse_args():
|
||||
parser.add_argument('-q', '--quiet',
|
||||
help=('Print only command execution results and log'
|
||||
' messages. Good for quick runs / "watch" wrap.'
|
||||
' Also sets default loglevel to ERROR.'),
|
||||
' This option disables any -v parameters.'),
|
||||
action='store_true')
|
||||
parser.add_argument('-m', '--maxthreads', type=int, default=100,
|
||||
help=('Maximum simultaneous nodes for command'
|
||||
@ -128,15 +129,12 @@ def parse_args():
|
||||
' results. Do not forget to clean up the results'
|
||||
' manually when using this option.',
|
||||
action='store_true')
|
||||
parser.add_argument('-w', '--warning',
|
||||
help='Sets log level to warning (default).',
|
||||
action='store_true')
|
||||
parser.add_argument('-v', '--verbose',
|
||||
help='Be verbose.',
|
||||
action='store_true')
|
||||
parser.add_argument('-d', '--debug',
|
||||
help='Be extremely verbose.',
|
||||
action='store_true')
|
||||
parser.add_argument('-v', '--verbose', action='count', default=0,
|
||||
help=('This works for -vvvv, -vvv, -vv, -v, -v -v,'
|
||||
'etc, If no -v then logging.WARNING is '
|
||||
'selected if more -v are provided it will '
|
||||
'step to INFO and DEBUG unless the option '
|
||||
'-q(--quiet) is specified'))
|
||||
return parser
|
||||
|
||||
|
||||
@ -146,17 +144,16 @@ def main(argv=None):
|
||||
argv = sys.argv
|
||||
parser = parse_args()
|
||||
args = parser.parse_args(argv[1:])
|
||||
if args.quiet and not args.warning:
|
||||
loglevel = logging.ERROR
|
||||
else:
|
||||
loglevel = logging.WARNING
|
||||
if args.verbose:
|
||||
loglevel = logging.INFO
|
||||
if args.debug:
|
||||
loglevel = logging.DEBUG
|
||||
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
||||
if args.quiet:
|
||||
args.verbose = 0
|
||||
loglevel = loglevels[min(len(loglevels)-1, args.verbose)]
|
||||
FORMAT = ('%(asctime)s %(levelname)s: %(module)s: '
|
||||
'%(funcName)s(): %(message)s')
|
||||
logging.basicConfig(filename=args.log_file,
|
||||
level=loglevel,
|
||||
format='%(asctime)s %(levelname)s %(message)s')
|
||||
format=FORMAT)
|
||||
logger = logging.getLogger(__name__)
|
||||
conf = load_conf(args.config)
|
||||
if args.fuel_ip:
|
||||
conf['fuel_ip'] = args.fuel_ip
|
||||
@ -166,6 +163,7 @@ def main(argv=None):
|
||||
conf['fuel_pass'] = args.fuel_pass
|
||||
if args.put or args.command or args.script or args.get:
|
||||
conf['shell_mode'] = True
|
||||
conf['do_print_results'] = True
|
||||
if args.no_clean:
|
||||
conf['clean'] = False
|
||||
if conf['shell_mode']:
|
||||
@ -203,6 +201,8 @@ def main(argv=None):
|
||||
if args.dest_file:
|
||||
conf['archive_dir'] = os.path.split(args.dest_file)[0]
|
||||
conf['archive_name'] = os.path.split(args.dest_file)[1]
|
||||
logger.info('Using rqdir: %s, rqfile: %s' %
|
||||
(conf['rqdir'], conf['rqfile']))
|
||||
nm = pretty_run(args.quiet, 'Initializing node data',
|
||||
NodeManager,
|
||||
kwargs={'conf': conf, 'extended': args.extended,
|
||||
@ -223,7 +223,7 @@ def main(argv=None):
|
||||
size = pretty_run(args.quiet, 'Calculating logs size',
|
||||
nm.calculate_log_size, args=(args.maxthreads,))
|
||||
if size == 0:
|
||||
logging.warning('Size zero - no logs to collect.')
|
||||
logger.warning('Size zero - no logs to collect.')
|
||||
return
|
||||
enough = pretty_run(args.quiet, 'Checking free space',
|
||||
nm.is_enough_space)
|
||||
@ -233,14 +233,14 @@ def main(argv=None):
|
||||
kwargs={'maxthreads': args.logs_maxthreads,
|
||||
'fake': args.fake_logs})
|
||||
else:
|
||||
logging.warning(('Not enough space for logs in "%s", skipping'
|
||||
'log collection.') % nm.conf['archive_dir'])
|
||||
logging.info("Nodes:\n%s" % nm)
|
||||
logger.warning(('Not enough space for logs in "%s", skipping'
|
||||
'log collection.') % nm.conf['archive_dir'])
|
||||
logger.info("Nodes:\n%s" % nm)
|
||||
if not args.quiet:
|
||||
print('Run complete. Node information:')
|
||||
print(nm)
|
||||
if conf['shell_mode']:
|
||||
if args.command or args.script:
|
||||
if conf['do_print_results']:
|
||||
if nm.has(Node.ckey, Node.skey):
|
||||
if not args.quiet:
|
||||
print('Results:')
|
||||
for node in nm.sorted_nodes():
|
||||
|
@ -51,6 +51,8 @@ def load_conf(filename):
|
||||
place specified by conf['outdir'], archive will also be created and put
|
||||
in a place specified by conf['archive_dir'].'''
|
||||
conf['shell_mode'] = False
|
||||
'''Print output of commands and scripts to stdout'''
|
||||
conf['do_print_results'] = False
|
||||
'''Clean - erase previous results in outdir and archive_dir dir, if any.'''
|
||||
conf['clean'] = True
|
||||
if filename:
|
||||
|
@ -1,6 +1,5 @@
|
||||
project_name = 'timmy'
|
||||
version = '1.3.0'
|
||||
version = '1.4.0'
|
||||
|
||||
if __name__ == '__main__':
|
||||
exit(0)
|
||||
|
||||
|
153
timmy/nodes.py
153
timmy/nodes.py
@ -19,7 +19,7 @@
|
||||
main module
|
||||
"""
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import logging
|
||||
@ -50,7 +50,7 @@ class Node(object):
|
||||
print_template += ' {6:<6} {7}'
|
||||
|
||||
def __init__(self, id, mac, cluster, roles, os_platform,
|
||||
online, status, ip, conf):
|
||||
online, status, ip, conf, logger=None):
|
||||
self.id = id
|
||||
self.mac = mac
|
||||
self.cluster = cluster
|
||||
@ -74,6 +74,7 @@ class Node(object):
|
||||
self.outputs_timestamp = False
|
||||
self.outputs_timestamp_dir = None
|
||||
self.apply_conf(conf)
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
|
||||
def __str__(self):
|
||||
if not self.filtered_out:
|
||||
@ -157,30 +158,31 @@ class Node(object):
|
||||
timeout=self.timeout,
|
||||
prefix=self.prefix)
|
||||
if code != 0:
|
||||
logging.warning('get_release: node: %s: could not determine'
|
||||
' MOS release' % self.id)
|
||||
self.logger.warning('node: %s: could not determine'
|
||||
' MOS release' % self.id)
|
||||
else:
|
||||
self.release = release.strip('\n "\'')
|
||||
logging.info('get_release: node: %s, MOS release: %s' %
|
||||
(self.id, self.release))
|
||||
return self
|
||||
self.logger.info('node: %s, MOS release: %s' %
|
||||
(self.id, self.release))
|
||||
return release
|
||||
|
||||
def exec_cmd(self, fake=False, ok_codes=None):
|
||||
sn = 'node-%s' % self.id
|
||||
cl = 'cluster-%s' % self.cluster
|
||||
logging.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn))
|
||||
self.logger.debug('%s/%s/%s/%s' % (self.outdir, Node.ckey, cl, sn))
|
||||
ddir = os.path.join(self.outdir, Node.ckey, cl, sn)
|
||||
if self.cmds:
|
||||
tools.mdir(ddir)
|
||||
self.cmds = sorted(self.cmds)
|
||||
mapcmds = {}
|
||||
for c in self.cmds:
|
||||
for cmd in c:
|
||||
dfile = os.path.join(ddir, 'node-%s-%s-%s' %
|
||||
(self.id, self.ip, cmd))
|
||||
if self.outputs_timestamp:
|
||||
dfile += self.outputs_timestamp_str
|
||||
logging.info('outfile: %s' % dfile)
|
||||
self.mapcmds[cmd] = dfile
|
||||
self.logger.info('outfile: %s' % dfile)
|
||||
mapcmds[cmd] = dfile
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=c[cmd],
|
||||
@ -193,20 +195,21 @@ class Node(object):
|
||||
with open(dfile, 'w') as df:
|
||||
df.write(outs.encode('utf-8'))
|
||||
except:
|
||||
logging.error("exec_cmd: can't write to file %s" %
|
||||
dfile)
|
||||
self.logger.error("can't write to file %s" %
|
||||
dfile)
|
||||
if self.scripts:
|
||||
tools.mdir(ddir)
|
||||
self.scripts = sorted(self.scripts)
|
||||
mapscr = {}
|
||||
for scr in self.scripts:
|
||||
f = os.path.join(self.rqdir, Node.skey, scr)
|
||||
logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, f))
|
||||
self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, f))
|
||||
dfile = os.path.join(ddir, 'node-%s-%s-%s' %
|
||||
(self.id, self.ip, os.path.basename(f)))
|
||||
if self.outputs_timestamp:
|
||||
dfile += self.outputs_timestamp_str
|
||||
logging.info('outfile: %s' % dfile)
|
||||
self.mapscr[scr] = dfile
|
||||
self.logger.info('outfile: %s' % dfile)
|
||||
mapscr[scr] = dfile
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
filename=f,
|
||||
@ -219,12 +222,12 @@ class Node(object):
|
||||
with open(dfile, 'w') as df:
|
||||
df.write(outs.encode('utf-8'))
|
||||
except:
|
||||
logging.error("exec_cmd: can't write to file %s" % dfile)
|
||||
return self
|
||||
self.logger.error("can't write to file %s" % dfile)
|
||||
return mapcmds, mapscr
|
||||
|
||||
def exec_simple_cmd(self, cmd, timeout=15, infile=None, outfile=None,
|
||||
fake=False, ok_codes=None, input=None):
|
||||
logging.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd))
|
||||
self.logger.info('node:%s(%s), exec: %s' % (self.id, self.ip, cmd))
|
||||
if not fake:
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=cmd,
|
||||
@ -238,7 +241,7 @@ class Node(object):
|
||||
self.check_code(code, 'exec_simple_cmd', cmd, ok_codes)
|
||||
|
||||
def get_files(self, timeout=15):
|
||||
logging.info('get_files: node: %s, IP: %s' % (self.id, self.ip))
|
||||
self.logger.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
sn = 'node-%s' % self.id
|
||||
cl = 'cluster-%s' % self.cluster
|
||||
if self.files or self.filelists:
|
||||
@ -261,9 +264,9 @@ class Node(object):
|
||||
if not line.isspace() and line[0] != '#':
|
||||
data += line
|
||||
except:
|
||||
logging.error('could not read file: %s' % fname)
|
||||
self.logger.error('could not read file: %s' % fname)
|
||||
data += '\n'.join(self.files)
|
||||
logging.debug('node: %s, data:\n%s' % (self.id, data))
|
||||
self.logger.debug('node: %s, data:\n%s' % (self.id, data))
|
||||
if data:
|
||||
o, e, c = tools.get_files_rsync(ip=self.ip,
|
||||
data=data,
|
||||
@ -273,7 +276,7 @@ class Node(object):
|
||||
self.check_code(c, 'get_files', 'tools.get_files_rsync')
|
||||
|
||||
def put_files(self):
|
||||
logging.info('put_files: node: %s, IP: %s' % (self.id, self.ip))
|
||||
self.logger.info('node: %s, IP: %s' % (self.id, self.ip))
|
||||
for f in self.put:
|
||||
outs, errs, code = tools.put_file_scp(ip=self.ip,
|
||||
file=f[0],
|
||||
@ -295,8 +298,8 @@ class Node(object):
|
||||
start = ''
|
||||
cmd = ("find '%s' -type f%s -exec du -b {} +" % (item['path'],
|
||||
start))
|
||||
logging.info('logs_populate: node: %s, logs du-cmd: %s' %
|
||||
(self.id, cmd))
|
||||
self.logger.info('node: %s, logs du-cmd: %s' %
|
||||
(self.id, cmd))
|
||||
outs, errs, code = tools.ssh_node(ip=self.ip,
|
||||
command=cmd,
|
||||
ssh_opts=self.ssh_opts,
|
||||
@ -304,9 +307,9 @@ class Node(object):
|
||||
timeout=timeout,
|
||||
prefix=self.prefix)
|
||||
if code == 124:
|
||||
logging.error("node: %s, ip: %s, command: %s, "
|
||||
"timeout code: %s, error message: %s" %
|
||||
(self.id, self.ip, cmd, code, errs))
|
||||
self.logger.error("node: %s, ip: %s, command: %s, "
|
||||
"timeout code: %s, error message: %s" %
|
||||
(self.id, self.ip, cmd, code, errs))
|
||||
break
|
||||
if len(outs):
|
||||
item['files'] = {}
|
||||
@ -315,8 +318,8 @@ class Node(object):
|
||||
size, f = line.split('\t')
|
||||
if filter_by_re(item, f):
|
||||
item['files'][f] = int(size)
|
||||
logging.debug('logs_populate: logs: %s' % (item['files']))
|
||||
return self
|
||||
self.logger.debug('logs: %s' % (item['files']))
|
||||
return self.logs
|
||||
|
||||
def logs_dict(self):
|
||||
result = {}
|
||||
@ -332,9 +335,9 @@ class Node(object):
|
||||
def check_code(self, code, func_name, cmd, ok_codes=None):
|
||||
if code:
|
||||
if not ok_codes or code not in ok_codes:
|
||||
logging.warning("%s: got bad exit code %s,"
|
||||
" node: %s, ip: %s, cmd: %s" %
|
||||
(func_name, code, self.id, self.ip, cmd))
|
||||
self.logger.warning("%s: got bad exit code %s,"
|
||||
" node: %s, ip: %s, cmd: %s" %
|
||||
(func_name, code, self.id, self.ip, cmd))
|
||||
|
||||
def print_results(self, result_map):
|
||||
# result_map should be either mapcmds or mapscr
|
||||
@ -348,8 +351,9 @@ class Node(object):
|
||||
class NodeManager(object):
|
||||
"""Class nodes """
|
||||
|
||||
def __init__(self, conf, extended=False, nodes_json=None):
|
||||
def __init__(self, conf, extended=False, nodes_json=None, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
if conf['outputs_timestamp'] or conf['dir_timestamp']:
|
||||
timestamp_str = datetime.now().strftime('_%F_%H-%M-%S')
|
||||
if conf['outputs_timestamp']:
|
||||
@ -363,17 +367,17 @@ class NodeManager(object):
|
||||
if not conf['shell_mode']:
|
||||
self.rqdir = conf['rqdir']
|
||||
if (not os.path.exists(self.rqdir)):
|
||||
logging.error(('NodeManager __init__: directory %s does not'
|
||||
'exist') % self.rqdir)
|
||||
self.logger.critical(('NodeManager: directory %s does not'
|
||||
' exist') % self.rqdir)
|
||||
sys.exit(1)
|
||||
if self.conf['rqfile']:
|
||||
self.import_rq()
|
||||
self.nodes = {}
|
||||
self.fuel_init()
|
||||
if nodes_json:
|
||||
self.nodes_json = tools.load_yaml_file(nodes_json)
|
||||
self.nodes_json = tools.load_json_file(nodes_json)
|
||||
else:
|
||||
self.nodes_json = yaml.load(self.get_nodes_json())
|
||||
self.nodes_json = json.loads(self.get_nodes_json())
|
||||
self.nodes_init()
|
||||
# apply soft-filter on all nodes
|
||||
for node in self.nodes.values():
|
||||
@ -384,7 +388,7 @@ class NodeManager(object):
|
||||
self.nodes_reapply_conf()
|
||||
self.conf_assign_once()
|
||||
if extended:
|
||||
logging.info('NodeManager __init__: extended mode enabled')
|
||||
self.logger.info('NodeManager: extended mode enabled')
|
||||
'''TO-DO: load smth like extended.yaml
|
||||
do additional apply_conf(clean=False) with this yaml.
|
||||
Move some stuff from rq.yaml to extended.yaml'''
|
||||
@ -454,7 +458,7 @@ class NodeManager(object):
|
||||
|
||||
def fuel_init(self):
|
||||
if not self.conf['fuel_ip']:
|
||||
logging.error('NodeManager fuel_init: fuel_ip not set')
|
||||
self.logger.critical('NodeManager: fuel_ip not set')
|
||||
sys.exit(7)
|
||||
fuelnode = Node(id=0,
|
||||
cluster=0,
|
||||
@ -481,8 +485,8 @@ class NodeManager(object):
|
||||
timeout=fuelnode.timeout,
|
||||
prefix=fuelnode.prefix)
|
||||
if code != 0:
|
||||
logging.error(('NodeManager get_nodes: cannot get '
|
||||
'fuel node list: %s') % err)
|
||||
self.logger.critical(('NodeManager: cannot get '
|
||||
'fuel node list: %s') % err)
|
||||
sys.exit(4)
|
||||
return nodes_json
|
||||
|
||||
@ -497,7 +501,9 @@ class NodeManager(object):
|
||||
roles = str(node_roles).split(', ')
|
||||
keys = "mac os_platform status online ip".split()
|
||||
params = {'id': int(node_data['id']),
|
||||
'cluster': int(node_data['cluster']),
|
||||
# please do NOT convert cluster id to int type
|
||||
# because None can be valid
|
||||
'cluster': node_data['cluster'],
|
||||
'roles': roles,
|
||||
'conf': self.conf}
|
||||
for key in keys:
|
||||
@ -514,7 +520,7 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, 100, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].release = result[key]
|
||||
|
||||
def conf_assign_once(self):
|
||||
once = Node.conf_once_prefix
|
||||
@ -542,16 +548,27 @@ class NodeManager(object):
|
||||
|
||||
def filter(self, node, node_filter):
|
||||
f = node_filter
|
||||
# soft-skip Fuel node if shell mode is enabled
|
||||
# soft-skip Fuel node for shell mode
|
||||
if node.id == 0 and self.conf['shell_mode']:
|
||||
return False
|
||||
else:
|
||||
fnames = [k for k in f if hasattr(node, k) and f[k]]
|
||||
elems = []
|
||||
for k in f:
|
||||
if k.startswith('no_') and hasattr(node, k[3:]):
|
||||
elems.append({'node_k': k[3:], 'k': k, 'negative': True})
|
||||
elif hasattr(node, k) and f[k]:
|
||||
elems.append({'node_k': k, 'k': k, 'negative': False})
|
||||
checks = []
|
||||
for fn in fnames:
|
||||
node_v = w_list(getattr(node, fn))
|
||||
filter_v = w_list(f[fn])
|
||||
checks.append(not set(node_v).isdisjoint(filter_v))
|
||||
for el in elems:
|
||||
node_v = w_list(getattr(node, el['node_k']))
|
||||
filter_v = w_list(f[el['k']])
|
||||
if el['negative']:
|
||||
checks.append(set(node_v).isdisjoint(filter_v))
|
||||
elif node.id != 0:
|
||||
'''Do not apply normal (positive) filters to Fuel node
|
||||
, Fuel node will only be filtered by negative filters
|
||||
such as no_id = [0] or no_roles = ['fuel']'''
|
||||
checks.append(not set(node_v).isdisjoint(filter_v))
|
||||
return all(checks)
|
||||
|
||||
@run_with_lock
|
||||
@ -564,7 +581,8 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].mapcmds = result[key][0]
|
||||
self.nodes[key].mapscr = result[key][1]
|
||||
|
||||
def calculate_log_size(self, timeout=15, maxthreads=100):
|
||||
total_size = 0
|
||||
@ -576,11 +594,11 @@ class NodeManager(object):
|
||||
key=key))
|
||||
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
||||
for key in result:
|
||||
self.nodes[key] = result[key]
|
||||
self.nodes[key].logs = result[key]
|
||||
for node in self.nodes.values():
|
||||
total_size += sum(node.logs_dict().values())
|
||||
logging.info('Full log size on nodes(with fuel): %s bytes' %
|
||||
total_size)
|
||||
self.logger.info('Full log size on nodes(with fuel): %s bytes' %
|
||||
total_size)
|
||||
self.alogsize = total_size / 1024
|
||||
return self.alogsize
|
||||
|
||||
@ -588,31 +606,36 @@ class NodeManager(object):
|
||||
tools.mdir(self.conf['outdir'])
|
||||
outs, errs, code = tools.free_space(self.conf['outdir'], timeout=1)
|
||||
if code != 0:
|
||||
logging.error("Can't get free space: %s" % errs)
|
||||
self.logger.error("Can't get free space: %s" % errs)
|
||||
return False
|
||||
try:
|
||||
fs = int(outs.rstrip('\n'))
|
||||
except:
|
||||
logging.error("is_enough_space: can't get free space\nouts: %s" %
|
||||
outs)
|
||||
self.logger.error("can't get free space\nouts: %s" %
|
||||
outs)
|
||||
return False
|
||||
logging.info('logsize: %s Kb, free space: %s Kb' % (self.alogsize, fs))
|
||||
self.logger.info('logsize: %s Kb, free space: %s Kb' %
|
||||
(self.alogsize, fs))
|
||||
if (self.alogsize*coefficient > fs):
|
||||
logging.error('Not enough space on device')
|
||||
self.logger.error('Not enough space on device')
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
@run_with_lock
|
||||
def create_archive_general(self, timeout):
|
||||
if not os.path.isdir(self.conf['outdir']):
|
||||
logging.warning("Nothing to do, directory %s doesn't exist" %
|
||||
self.conf['outdir'])
|
||||
return
|
||||
outfile = os.path.join(self.conf['archive_dir'],
|
||||
self.conf['archive_name'])
|
||||
cmd = "tar zcf '%s' -C %s %s" % (outfile, self.conf['outdir'], ".")
|
||||
tools.mdir(self.conf['archive_dir'])
|
||||
logging.debug("create_archive_general: cmd: %s" % cmd)
|
||||
self.logger.debug("cmd: %s" % cmd)
|
||||
outs, errs, code = tools.launch_cmd(cmd, timeout)
|
||||
if code != 0:
|
||||
logging.error("Can't create archive %s" % (errs))
|
||||
self.logger.error("Can't create archive %s" % (errs))
|
||||
|
||||
def find_adm_interface_speed(self, defspeed):
|
||||
'''Returns interface speed through which logs will be dowloaded'''
|
||||
@ -622,7 +645,7 @@ class NodeManager(object):
|
||||
('cat /sys/class/net/', node.ip))
|
||||
out, err, code = tools.launch_cmd(cmd, node.timeout)
|
||||
if code != 0:
|
||||
logging.error("can't get interface speed: error: %s" % err)
|
||||
self.logger.error("can't get iface speed: error: %s" % err)
|
||||
return defspeed
|
||||
try:
|
||||
speed = int(out)
|
||||
@ -633,7 +656,7 @@ class NodeManager(object):
|
||||
@run_with_lock
|
||||
def get_logs(self, timeout, fake=False, maxthreads=10, speed=100):
|
||||
if fake:
|
||||
logging.info('get_logs: fake = True, skipping')
|
||||
self.logger.info('fake = True, skipping')
|
||||
return
|
||||
txtfl = []
|
||||
speed = self.find_adm_interface_speed(speed)
|
||||
@ -642,8 +665,8 @@ class NodeManager(object):
|
||||
run_items = []
|
||||
for node in [n for n in self.nodes.values() if not n.filtered_out]:
|
||||
if not node.logs_dict():
|
||||
logging.info(("get_logs: node %s - no logs "
|
||||
"to collect") % node.id)
|
||||
self.logger.info(("node %s - no logs "
|
||||
"to collect") % node.id)
|
||||
continue
|
||||
node.archivelogsfile = os.path.join(self.conf['archive_dir'],
|
||||
'logs-node-%s.tar.gz' %
|
||||
@ -669,7 +692,7 @@ class NodeManager(object):
|
||||
try:
|
||||
os.remove(tfile)
|
||||
except:
|
||||
logging.error("get_logs: can't delete file %s" % tfile)
|
||||
self.logger.error("can't delete file %s" % tfile)
|
||||
|
||||
@run_with_lock
|
||||
def get_files(self, timeout=15):
|
||||
|
@ -26,11 +26,13 @@ import threading
|
||||
from multiprocessing import Process, Queue, BoundedSemaphore
|
||||
import subprocess
|
||||
import yaml
|
||||
import json
|
||||
from flock import FLock
|
||||
from tempfile import gettempdir
|
||||
from pipes import quote
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
slowpipe = '''
|
||||
import sys
|
||||
import time
|
||||
@ -49,15 +51,15 @@ def interrupt_wrapper(f):
|
||||
try:
|
||||
f(*args, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
logging.warning('Interrupted, exiting.')
|
||||
logger.warning('Interrupted, exiting.')
|
||||
except Exception as e:
|
||||
logging.error('Error: %s' % e, exc_info=True)
|
||||
logger.error('Error: %s' % e, exc_info=True)
|
||||
for k in dir(e):
|
||||
'''debug: print all exception attrs except internal
|
||||
and except 'message', which is deprecated since Python 2.6'''
|
||||
if not k.startswith('__') and k != 'message':
|
||||
v = getattr(e, k)
|
||||
logging.debug('Error details: %s = %s' % (k, v))
|
||||
logger.debug('Error details: %s = %s' % (k, v))
|
||||
return wrapper
|
||||
|
||||
|
||||
@ -65,8 +67,8 @@ def run_with_lock(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
lock = FLock(os.path.join(gettempdir(), 'timmy_%s.lock' % f.__name__))
|
||||
if not lock.lock():
|
||||
logging.warning('Unable to obtain lock, skipping "%s"' %
|
||||
f.__name__)
|
||||
logger.warning('Unable to obtain lock, skipping "%s"' %
|
||||
f.__name__)
|
||||
return ''
|
||||
f(*args, **kwargs)
|
||||
lock.unlock()
|
||||
@ -74,17 +76,19 @@ def run_with_lock(f):
|
||||
|
||||
|
||||
class RunItem():
|
||||
def __init__(self, target, args=None, key=None):
|
||||
def __init__(self, target, args=None, key=None, logger=None):
|
||||
self.target = target
|
||||
self.args = args
|
||||
self.key = key
|
||||
self.process = None
|
||||
self.queue = None
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SemaphoreProcess(Process):
|
||||
def __init__(self, semaphore, target, args=None, queue=None):
|
||||
def __init__(self, semaphore, target, args=None, queue=None, logger=None):
|
||||
Process.__init__(self)
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.semaphore = semaphore
|
||||
self.target = target
|
||||
if not args:
|
||||
@ -98,18 +102,18 @@ class SemaphoreProcess(Process):
|
||||
if self.queue:
|
||||
self.queue.put_nowait(result)
|
||||
except Exception as error:
|
||||
logging.exception(error)
|
||||
self.logger.exception(error)
|
||||
if self.queue:
|
||||
self.queue.put_nowait(error)
|
||||
finally:
|
||||
logging.debug('finished call: %s' % self.target)
|
||||
self.logger.debug('finished call: %s' % self.target)
|
||||
self.semaphore.release()
|
||||
logging.debug('semaphore released')
|
||||
self.logger.debug('semaphore released')
|
||||
|
||||
|
||||
def run_batch(item_list, maxthreads, dict_result=False):
|
||||
def cleanup():
|
||||
logging.debug('cleanup processes')
|
||||
logger.debug('cleanup processes')
|
||||
for run_item in item_list:
|
||||
if run_item.process:
|
||||
run_item.process.terminate()
|
||||
@ -127,7 +131,7 @@ def run_batch(item_list, maxthreads, dict_result=False):
|
||||
for run_item in item_list:
|
||||
run_item.result = run_item.queue.get()
|
||||
if isinstance(run_item.result, Exception):
|
||||
logging.error('%s, exiting' % run_item.result)
|
||||
logger.critical('%s, exiting' % run_item.result)
|
||||
cleanup()
|
||||
sys.exit(42)
|
||||
run_item.process.join()
|
||||
@ -158,11 +162,28 @@ def get_dir_structure(rootdir):
|
||||
parent = reduce(dict.get, folders[:-1], dir)
|
||||
parent[folders[-1]] = subdir
|
||||
except:
|
||||
logging.error('failed to create list of the directory: %s' % rootdir)
|
||||
logger.critical('failed to create list of the directory: %s' %
|
||||
rootdir)
|
||||
sys.exit(1)
|
||||
return dir
|
||||
|
||||
|
||||
def load_json_file(filename):
|
||||
"""
|
||||
Loads json data from file
|
||||
"""
|
||||
try:
|
||||
with open(filename, 'r') as f:
|
||||
return json.load(f)
|
||||
except IOError as e:
|
||||
logger.critical("I/O error(%s): file: %s; msg: %s" %
|
||||
(e.errno, e.filename, e.strerror))
|
||||
sys.exit(1)
|
||||
except ValueError:
|
||||
logger.critical("Could not convert data")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def load_yaml_file(filename):
|
||||
"""
|
||||
Loads yaml data from file
|
||||
@ -171,15 +192,15 @@ def load_yaml_file(filename):
|
||||
with open(filename, 'r') as f:
|
||||
return yaml.load(f)
|
||||
except IOError as e:
|
||||
logging.error("load_conf: I/O error(%s): file: %s; msg: %s" %
|
||||
(e.errno, e.filename, e.strerror))
|
||||
logger.critical("I/O error(%s): file: %s; msg: %s" %
|
||||
(e.errno, e.filename, e.strerror))
|
||||
sys.exit(1)
|
||||
except ValueError:
|
||||
logging.error("load_conf: Could not convert data")
|
||||
logger.critical("Could not convert data")
|
||||
sys.exit(1)
|
||||
except yaml.parser.ParserError as e:
|
||||
logging.error("load_conf: Could not parse %s:\n%s" %
|
||||
(filename, str(e)))
|
||||
logger.critical("Could not parse %s:\n%s" %
|
||||
(filename, str(e)))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@ -188,11 +209,11 @@ def mdir(directory):
|
||||
Creates a directory if it doesn't exist
|
||||
"""
|
||||
if not os.path.exists(directory):
|
||||
logging.debug('creating directory %s' % directory)
|
||||
logger.debug('creating directory %s' % directory)
|
||||
try:
|
||||
os.makedirs(directory)
|
||||
except:
|
||||
logging.error("Can't create a directory: %s" % directory)
|
||||
logger.critical("Can't create a directory: %s" % directory)
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
@ -210,11 +231,11 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None):
|
||||
def _timeout_terminate(pid):
|
||||
try:
|
||||
os.kill(pid, 15)
|
||||
logging.error("launch_cmd: pid %d killed by timeout" % pid)
|
||||
logger.error("launch_cmd: pid %d killed by timeout" % pid)
|
||||
except:
|
||||
pass
|
||||
|
||||
logging.info('launch_cmd: cmd %s' % cmd)
|
||||
logger.info('cmd %s' % cmd)
|
||||
p = subprocess.Popen(cmd,
|
||||
shell=True,
|
||||
stdin=subprocess.PIPE,
|
||||
@ -233,21 +254,22 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None):
|
||||
p.kill()
|
||||
except:
|
||||
pass
|
||||
p.stdin = None
|
||||
outs, errs = p.communicate()
|
||||
outs = outs.decode('utf-8')
|
||||
errs = errs.decode('utf-8')
|
||||
errs = errs.rstrip('\n')
|
||||
logging.error(_log_msg(cmd, errs, p.returncode))
|
||||
logger.error(_log_msg(cmd, errs, p.returncode))
|
||||
finally:
|
||||
if timeout_killer:
|
||||
timeout_killer.cancel()
|
||||
logging.info(_log_msg(cmd, errs, p.returncode))
|
||||
logger.info(_log_msg(cmd, errs, p.returncode))
|
||||
input = input.decode('utf-8') if input else None
|
||||
logging.debug(_log_msg(cmd, errs, p.returncode, debug=True,
|
||||
stdin=input, stdout=outs))
|
||||
logger.debug(_log_msg(cmd, errs, p.returncode, debug=True,
|
||||
stdin=input, stdout=outs))
|
||||
if p.returncode:
|
||||
if not ok_codes or p.returncode not in ok_codes:
|
||||
logging.warning(_log_msg(cmd, errs, p.returncode))
|
||||
logger.warning(_log_msg(cmd, errs, p.returncode))
|
||||
return outs, errs, p.returncode
|
||||
|
||||
|
||||
@ -263,11 +285,11 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
if type(env_vars) is list:
|
||||
env_vars = ' '.join(env_vars)
|
||||
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
|
||||
logging.info("skip ssh")
|
||||
logger.info("skip ssh")
|
||||
bstr = "%s timeout '%s' bash -c " % (
|
||||
env_vars, timeout)
|
||||
else:
|
||||
logging.info("exec ssh")
|
||||
logger.info("exec ssh")
|
||||
bstr = "timeout '%s' ssh -t -T %s '%s' '%s' " % (
|
||||
timeout, ssh_opts, ip, env_vars)
|
||||
if filename is None:
|
||||
@ -279,7 +301,7 @@ def ssh_node(ip, command='', ssh_opts=None, env_vars=None, timeout=15,
|
||||
cmd = "%s < '%s'" % (cmd, inputfile)
|
||||
else:
|
||||
cmd = "%s'%s bash -s' < '%s'" % (bstr, prefix, filename)
|
||||
logging.info("ssh_node: inputfile selected, cmd: %s" % cmd)
|
||||
logger.info("inputfile selected, cmd: %s" % cmd)
|
||||
if outputfile is not None:
|
||||
cmd = "%s > '%s'" % (cmd, outputfile)
|
||||
cmd = ("input=\"$(cat | xxd -p)\"; trap 'kill $pid' 15; " +
|
||||
@ -292,7 +314,7 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
|
||||
if type(ssh_opts) is list:
|
||||
ssh_opts = ' '.join(ssh_opts)
|
||||
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
|
||||
logging.info("skip ssh rsync")
|
||||
logger.info("skip ssh rsync")
|
||||
cmd = ("timeout '%s' rsync -avzr --files-from=- / '%s'"
|
||||
" --progress --partial --delete-before" %
|
||||
(timeout, dpath))
|
||||
@ -301,7 +323,7 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
|
||||
" -oCompression=no' --files-from=- '%s':/ '%s'"
|
||||
" --progress --partial --delete-before"
|
||||
) % (timeout, ssh_opts, ip, dpath)
|
||||
logging.debug("command:%s\ndata:\n%s" % (cmd, data))
|
||||
logger.debug("command:%s\ndata:\n%s" % (cmd, data))
|
||||
if data == '':
|
||||
return cmd, '', 127
|
||||
return launch_cmd(cmd, timeout, input=data)
|
||||
|
Loading…
x
Reference in New Issue
Block a user