Merge pull request #12 from adobdin/issue9

merge "Issue9" branch
This commit is contained in:
Dmitry 2016-05-03 13:41:34 -07:00
commit 485a69d91e
7 changed files with 385 additions and 439 deletions

View File

@ -1,23 +1,29 @@
ssh_opts:
- -oConnectTimeout=2
- -oStrictHostKeyChecking=no
- -oUserKnownHostsFile=/dev/null
- -oLogLevel=error
- -lroot
- -oBatchMode=yes
- '-oConnectTimeout=2'
- '-oStrictHostKeyChecking=no'
- '-oUserKnownHostsFile=/dev/null'
- '-oLogLevel=error'
- '-lroot'
- '-oBatchMode=yes'
env_vars:
- OPENRC=/root/openrc
- IPTABLES_STR="iptables -nvL"
fuelip: 127.0.0.1
rqdir: ./rq
- 'OPENRC=/root/openrc'
- 'IPTABLES_STR="iptables -nvL"'
fuelip: '127.0.0.1'
rqdir: './rq'
soft_filter:
status: ['ready']
timeout: 15
compress_timeout: 3600
log_files:
path: /var/log
filter:
default:
include: '(.)*'
exclude: '[-_]\d{8}$|atop[-_]|\.gz$'
log_path: '/var/log'
log_filter:
include: '(.)*'
exclude: '[-_]\d{8}$|atop[-_]|\.gz$'
# by_role:
# contrail:
# log_filter:
# include: 'contrail'
# by_node_id:
# 3:
# env_vars:
# OPENRC: '/root/openrc'
# IPTABLES_STR: 'iptables -L'

View File

@ -4,20 +4,17 @@ from setuptools import setup
import os
rqfiles = [('/usr/share/timmy/' + root, [os.path.join(root, f) for f in files])
for root, dirs, files in os.walk('rq')]
for root, dirs, files in os.walk('rq')]
rqfiles.append(('/usr/share/timmy/configs', ['config.yaml']))
setup(name='timmy',
version='0.1',
author = "Aleksandr Dobdin",
author_email = 'dobdin@gmail.com',
license = 'Apache2',
url = 'https://github.com/adobdin/timmy',
author="Aleksandr Dobdin",
author_email='dobdin@gmail.com',
license='Apache2',
url='https://github.com/adobdin/timmy',
long_description=open('README.md').read(),
packages = ["timmy"],
data_files = rqfiles,
include_package_data = True,
entry_points = {
'console_scripts': ['timmy = timmy.cli:main']
}
)
packages=["timmy"],
data_files=rqfiles,
include_package_data=True,
entry_points={'console_scripts': ['timmy=timmy.cli:main']})

View File

@ -16,15 +16,16 @@
# under the License.
import argparse
import timmy
from timmy import nodes
import logging
import sys
import os
from timmy.conf import Conf
from timmy import flock
from timmy.tools import interrupt_wrapper
@interrupt_wrapper
def main(argv=None):
if argv is None:
argv = sys.argv
@ -42,10 +43,12 @@ def main(argv=None):
parser.add_argument('-l', '--logs',
help='collect logs from nodes',
action='store_true', dest='getlogs')
parser.add_argument('-L', '--logs-maxthreads', type=int, default=100,
help="maximum simultaneous log collection operations")
parser.add_argument('--only-logs',
action='store_true',
help='Collect only logs from fuel-node')
parser.add_argument('--log-file',
parser.add_argument('--log-file', default=None,
help='timmy log file')
parser.add_argument('--fake-logs',
help="Do not collect logs, only calculate size",
@ -63,12 +66,7 @@ def main(argv=None):
loglevel = logging.DEBUG
else:
loglevel = logging.INFO
if args.log_file:
logfile = args.log_file
else:
logfile = None
logging.basicConfig(
filename=logfile,
logging.basicConfig(filename=args.log_file,
level=loglevel,
format='%(asctime)s %(levelname)s %(message)s')
config = Conf()
@ -92,15 +90,19 @@ def main(argv=None):
lf = '/tmp/timmy-logs.lock'
lock = flock.FLock(lf)
if lock.lock():
n.get_node_file_list()
n.calculate_log_size()
if n.is_enough_space(config.archives):
n.create_log_archives(config.archives,
config.compress_timeout,
fake=args.fake_logs)
lock.unlock()
try:
n.get_node_file_list()
n.calculate_log_size()
if n.is_enough_space(config.archives):
n.archive_logs(config.archives,
config.compress_timeout,
maxthreads=args.logs_maxthreads,
fake=args.fake_logs)
finally:
lock.unlock()
else:
logging.warning('Unable to obtain lock %s, skipping "logs"-part' % lf)
logging.warning('Unable to obtain lock %s, skipping "logs"-part' %
lf)
logging.info("Nodes:\n%s" % n)
print(n)
return 0

View File

@ -20,9 +20,9 @@ class Conf(object):
compress_timeout = 3600
archives = '/tmp/timmy/archives'
cmds_archive = ''
log_files = {}
log_files['filter'] = {'default': {'include': "(.)*", 'exclude': '[-_]\d{8}$|atop[-_]|\.gz$'}}
log_files['path'] = '/var/log/'
log_path = '/var/log'
log_filter = {'include': '',
'exclude': '[-_]\d{8}$|atop[-_]|\.gz$'}
def __init__(self, **entries):
self.__dict__.update(entries)
@ -38,16 +38,19 @@ class Conf(object):
conf = yaml.load(f)
return Conf(**conf)
except IOError as e:
logging.error("load_conf: I/O error(%s): %s" % (e.errno, e.strerror))
logging.error("load_conf: I/O error(%s): %s" %
(e.errno, e.strerror))
sys.exit(1)
except ValueError:
logging.error("load_conf: Could not convert data")
sys.exit(1)
except yaml.parser.ParserError as e:
logging.error("load_conf: Could not parse %s:\n%s" % (filename, str(e)))
logging.error("load_conf: Could not parse %s:\n%s" %
(filename, str(e)))
sys.exit(1)
except:
logging.error("load_conf: Unexpected error: %s" % sys.exc_info()[0])
logging.error("load_conf: Unexpected error: %s" %
sys.exc_info()[0])
sys.exit(1)

View File

@ -46,7 +46,8 @@ class FLock:
self.lockfd = os.open(self.lockfile,
os.O_TRUNC | os.O_CREAT | os.O_RDWR)
# Acquire exclusive lock on the file, but don't block waiting for it
# Acquire exclusive lock on the file,
# but don't block waiting for it
fcntl.flock(self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Writing to file is pointless, nobody can see it
@ -54,7 +55,8 @@ class FLock:
return True
except (OSError, IOError), e:
# Lock cannot be acquired is okay, everything else reraise exception
# Lock cannot be acquired is okay,
# everything else reraise exception
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else:
@ -67,7 +69,7 @@ class FLock:
os.unlink(self.lockfile)
# Just in case, let's not leak file descriptors
os.close(self.lockfd)
except (OSError, IOError), e:
except (OSError, IOError):
# Ignore error destroying lock file. See class doc about how
# lockfile can be erased and everything still works normally.
pass

View File

@ -24,9 +24,8 @@ import json
import os
import logging
import sys
import threading
import re
from tools import *
import tools
ckey = 'cmds'
fkey = 'files'
@ -36,6 +35,9 @@ varlogdir = '/var/log'
class Node(object):
override_by_id = ['ssh_opts', 'env_vars', 'log_path', 'log_filter']
aggregate_by_role = ['log_path', 'log_filter']
def __init__(self, node_id, mac, cluster, roles, os_platform,
online, status, ip, conf):
self.node_id = node_id
@ -51,28 +53,33 @@ class Node(object):
self.logsize = 0
self.flogs = {}
self.mapcmds = {}
self.logs = {}
self.set_conf(conf)
def override_conf(self, conf):
for field in Node.aggregate_by_role:
for role in self.roles:
try:
getattr(self, field).append(conf.by_role[self.role][field])
except:
pass
for field in Node.override_by_id:
try:
setattr(self, field, conf.by_node_id[self.node_id][field])
except:
pass
def set_conf(self, conf):
logging.info(conf.ssh_opts)
self.ssh_opts = " ".join(conf.ssh_opts)
self.env_vars = " ".join(conf.env_vars)
self.log_files = conf.log_files
self.ssh_opts = conf.ssh_opts
self.env_vars = conf.env_vars
self.log_path = list([conf.log_path])
self.log_filter = list([conf.log_filter])
self.timeout = conf.timeout
try:
conf.by_node_id
except:
return
if self.node_id in conf.by_node_id:
if 'ssh_opts' in conf.by_node_id[self.node_id]:
self.ssh_opts = " ".join(conf.by_node_id[self.node_id]['ssh_opts'])
if 'env_vars' in conf.by_node_id[self.node_id]:
self.env_vars = " ".join(conf.by_node_id[self.node_id]['env_vars'])
if 'log_files' in conf.by_node_id[self.node_id]:
self.log_files = conf.by_node_id[self.node_id]['log_files']
self.override_conf(conf)
def set_files(self, dirname, key, ds, version):
files = []
dfs = 'default'
for role in self.roles:
if 'by-role' in ds[key] and role in ds[key]['by-role'].keys():
for f in ds[key]['by-role'][role]:
@ -86,9 +93,9 @@ class Node(object):
for f in ds[key]['by-os'][self.os_platform].keys():
files += [os.path.join(dirname, key, 'by-os',
self.os_platform, f)]
if 'default' in ds[key] and 'default' in ds[key]['default']:
for f in ds[key]['default']['default'].keys():
files += [os.path.join(dirname, key, 'default', 'default', f)]
if dfs in ds[key] and dfs in ds[key][dfs]:
for f in ds[key][dfs][dfs].keys():
files += [os.path.join(dirname, key, dfs, dfs, f)]
self.files[key] = sorted(set(files))
logging.debug('set_files:\nkey: %s, node: %s, file_list: %s' %
(key, self.node_id, self.files[key]))
@ -112,7 +119,8 @@ class Node(object):
def add_files(self, dirname, key, ds):
for role in self.roles:
if 'once-by-role' in ds[key] and role in ds[key]['once-by-role'].keys():
if ('once-by-role' in ds[key] and
role in ds[key]['once-by-role'].keys()):
for f in ds[key]['once-by-role'][role]:
self.files[key] += [os.path.join(dirname, key,
'once-by-role', role, f)]
@ -125,17 +133,17 @@ class Node(object):
cl = 'cluster-%s' % self.cluster
logging.debug('%s/%s/%s/%s' % (odir, label, cl, sn))
ddir = os.path.join(odir, label, cl, sn)
mdir(ddir)
tools.mdir(ddir)
for f in self.files[label]:
logging.info('node:%s(%s), exec: %s' % (self.node_id, self.ip, f))
if not fake:
outs, errs, code = ssh_node(ip=self.ip,
filename=f,
ssh_opts=self.ssh_opts,
env_vars=self.env_vars,
timeout=self.timeout,
command=''
)
outs, errs, code = tools.ssh_node(ip=self.ip,
filename=f,
ssh_opts=self.ssh_opts,
env_vars=self.env_vars,
timeout=self.timeout,
command=''
)
if code != 0:
logging.error("node: %s, ip: %s, cmdfile: %s,"
" code: %s, error message: %s" %
@ -154,13 +162,13 @@ class Node(object):
def exec_simple_cmd(self, cmd, infile, outfile, timeout=15, fake=False):
logging.info('node:%s(%s), exec: %s' % (self.node_id, self.ip, cmd))
if not fake:
outs, errs, code = ssh_node(ip=self.ip,
command=cmd,
ssh_opts=self.ssh_opts,
env_vars=self.env_vars,
timeout=timeout,
outputfile=outfile,
inputfile=infile)
outs, errs, code = tools.ssh_node(ip=self.ip,
command=cmd,
ssh_opts=self.ssh_opts,
env_vars=self.env_vars,
timeout=timeout,
outputfile=outfile,
inputfile=infile)
if code != 0:
logging.warning("node: %s, ip: %s, cmdfile: %s,"
" code: %s, error message: %s" %
@ -171,11 +179,11 @@ class Node(object):
(self.node_id, self.ip, label))
cmd = 'du -b %s' % self.data[label].replace('\n', ' ')
logging.info('node: %s, logs du-cmd: %s' % (self.node_id, cmd))
outs, errs, code = ssh_node(ip=self.ip,
command=cmd,
sshopts=sshopts,
sshvars='',
timeout=timeout)
outs, errs, code = tools.ssh_node(ip=self.ip,
command=cmd,
sshopts=sshopts,
sshvars='',
timeout=timeout)
if code != 0:
logging.warning("node: %s, ip: %s, cmdfile: %s, "
"code: %s, error message: %s" %
@ -194,18 +202,18 @@ class Node(object):
logging.info("node: %s, ip: %s, size: %s" %
(self.node_id, self.ip, self.logsize))
def get_files(self, label, sshopts, odir='info', timeout=15):
def get_files(self, label, odir='info', timeout=15):
logging.info('node:%s(%s), filelist: %s' %
(self.node_id, self.ip, label))
sn = 'node-%s' % self.node_id
cl = 'cluster-%s' % self.cluster
ddir = os.path.join(odir, label, cl, sn)
mdir(ddir)
outs, errs, code = get_files_rsync(ip=self.ip,
data=self.data[label],
ssh_opts=self.ssh_opts,
dpath=ddir,
timeout=self.timeout)
tools.mdir(ddir)
outs, errs, code = tools.get_files_rsync(ip=self.ip,
data=self.data[label],
ssh_opts=self.ssh_opts,
dpath=ddir,
timeout=self.timeout)
if code != 0:
logging.warning("get_files: node: %s, ip: %s, label: %s, "
"code: %s, error message: %s" %
@ -224,108 +232,41 @@ class Node(object):
logging.debug('node: %s, key: %s, data:\n%s' %
(self.node_id, key, self.data[key]))
def apply_include_filter(self, lfilter):
logging.info('apply_include_filter: node: %s, filter: %s' % (self.node_id, lfilter))
flogs = {}
if 'include' in lfilter and lfilter['include'] is not None:
for f in self.dulogs.splitlines():
try:
if ('include' in lfilter and re.search(lfilter['include'], f)):
flogs[f.split("\t")[1]] = int(f.split("\t")[0])
else:
logging.debug("filter %s by %s" % (f, lfilter))
except re.error as e:
logging.error('logs_include_filter: filter: %s, str: %s, re.error: %s' %
(lfilter, f, str(e)))
sys.exit(5)
def logs_filter(self):
result = {}
for re_pair in self.log_filter:
for f, s in self.logs.items():
if (('include' not in re_pair or
re.search(re_pair['include'], f)) and
('exclude' not in re_pair or
not re.search(re_pair['exclude'], f))):
result[f] = s
self.logs = result
self.flogs.update(flogs)
return True
else:
return False
def apply_exclude_filter(self, lfilter):
logging.info('apply_exclude_filter: node: %s, filter: %s' % (self.node_id, lfilter))
rflogs = []
if 'exclude' in lfilter and lfilter['exclude'] is None:
return True
if 'exclude' in lfilter and lfilter['exclude'] is not None:
for f in self.flogs:
try:
if re.search(lfilter['exclude'], f):
rflogs.append(f)
logging.info('logs_exclude_filter: %s' % f)
except re.error as e:
logging.error('logs_include_filter: filter: %s, str: %s, re.error: %s' %
(lfilter, f, str(e)))
sys.exit(5)
for f in rflogs:
logging.debug('apply_exclude_filter: node: %s remove file: %s from log list' % (self.node_id, f ))
self.flogs.pop(f, None)
return True
else:
return False
def logs_filter(self, filterconf):
brstr = 'by_role'
flogs = {}
logging.info('logs_filter: node: %s, filter: %s' % (self.node_id, filterconf))
bynodeidinc = False
bynodeidexc = False
# need to check the following logic:
if 'by_node_id' in filterconf and self.node_id in filterconf['by_node_id']:
if self.apply_include_filter(filterconf['by_node_id'][self.node_id]):
bynodeidinc = True
if self.apply_exclude_filter(filterconf['by_node_id'][self.node_id]):
bynodeidexc = True
if bynodeidinc:
return
if bynodeidexc:
return
byrole = False
if brstr in filterconf:
for role in self.roles:
if role in filterconf[brstr].keys():
logging.info('logs_filter: apply filter for role %s' % role)
byrole = True
if self.apply_include_filter(filterconf[brstr][role]):
byrole = True
if not byrole:
if 'default' in filterconf:
self.apply_include_filter(filterconf['default'])
else:
# unexpected
logging.warning('default log filter is not defined')
self.flogs = {}
byrole = False
if brstr in filterconf:
for role in self.roles:
if role in filterconf[brstr].keys():
logging.info('logs_filter: apply filter for role %s' % role)
if self.apply_exclude_filter(filterconf[brstr][role]):
byrole = True
if not byrole:
if 'default' in filterconf:
logging.info('logs_filter: apply default exclude filter')
self.apply_exclude_filter(filterconf['default'])
def log_size_from_find(self, path, sshopts, timeout=5):
cmd = ("find '%s' -type f -exec du -b {} +" % (path))
logging.info('log_size_from_find: node: %s, logs du-cmd: %s' % (self.node_id, cmd))
outs, errs, code = ssh_node(ip=self.ip,
command=cmd,
ssh_opts=self.ssh_opts,
env_vars='',
timeout=timeout)
if code == 124:
logging.error("node: %s, ip: %s, command: %s, "
"timeout code: %s, error message: %s" %
(self.node_id, self.ip, cmd, code, errs))
self.dulogs = ""
return False
self.dulogs = outs
logging.info('log_size_from_find: dulogs: %s' % (self.dulogs))
return True
def logs_populate(self, timeout=5):
got_logs = False
for path in self.log_path:
cmd = ("find '%s' -type f -exec du -b {} +" % (path))
logging.info('logs_populate: node: %s, logs du-cmd: %s' %
(self.node_id, cmd))
outs, errs, code = tools.ssh_node(ip=self.ip,
command=cmd,
ssh_opts=self.ssh_opts,
env_vars='',
timeout=timeout)
if code == 124:
logging.error("node: %s, ip: %s, command: %s, "
"timeout code: %s, error message: %s" %
(self.node_id, self.ip, cmd, code, errs))
break
if len(outs):
got_logs = True
for line in outs.split('\n'):
if '\t' in line:
size, filename = line.split('\t')
self.logs[filename] = int(size)
logging.debug('logs_populate: logs: %s' % (self.logs))
return got_logs
def print_files(self):
for k in self.files.keys():
@ -349,14 +290,14 @@ class Nodes(object):
"""Class nodes """
def __init__(self, cluster, extended, conf, filename=None):
import_subprocess()
self.dirname = conf.rqdir.rstrip('/')
if (not os.path.exists(self.dirname)):
logging.error("directory %s doesn't exist" % (self.dirname))
sys.exit(1)
self.files = get_dir_structure(conf.rqdir)[os.path.basename(self.dirname)]
dn = os.path.basename(self.dirname)
self.files = tools.get_dir_structure(conf.rqdir)[dn]
if (conf.fuelip is None) or (conf.fuelip == ""):
logging.error('Nodes: looks like fuelip is not set(%s)' % conf.fuelip)
logging.error('looks like fuelip is not set(%s)' % conf.fuelip)
sys.exit(7)
self.fuelip = conf.fuelip
self.conf = conf
@ -375,8 +316,7 @@ class Nodes(object):
self.load_nodes(conf)
self.get_version()
def __str__ (self):
def __str__(self):
s = "#node-id, cluster, admin-ip, mac, os, roles, online, status\n"
for node in sorted(self.nodes.values(), key=lambda x: x.node_id):
if (self.cluster and (str(self.cluster) != str(node.cluster)) and
@ -386,7 +326,7 @@ class Nodes(object):
s += "%s\n" % str(node)
return s
def get_nodes(self,conf):
def get_nodes(self, conf):
fuel_node_cmd = 'fuel node list --json'
fuelnode = Node(node_id=0,
cluster=0,
@ -397,12 +337,12 @@ class Nodes(object):
online=True,
ip=self.fuelip,
conf=conf)
nodes_json, err, code = ssh_node(ip=self.fuelip,
command=fuel_node_cmd,
ssh_opts=fuelnode.ssh_opts,
env_vars="",
timeout=fuelnode.timeout,
filename=None)
nodes_json, err, code = tools.ssh_node(ip=self.fuelip,
command=fuel_node_cmd,
ssh_opts=fuelnode.ssh_opts,
env_vars="",
timeout=fuelnode.timeout,
filename=None)
if code != 0:
logging.error("Can't get fuel node list %s" % err)
sys.exit(4)
@ -410,17 +350,20 @@ class Nodes(object):
def pass_hard_filter(self, node):
if self.conf.hard_filter:
if self.conf.hard_filter.status and (node.status not in self.conf.hard_filter.status):
logging.info("hard filter by status: excluding node-%s" % node.node_id)
if (self.conf.hard_filter.status and
(node.status not in self.conf.hard_filter.status)):
logging.info("hard filter by status: excluding node-%s" %
node.node_id)
return False
if (isinstance(self.conf.hard_filter.online, bool) and
(bool(node.online) != bool(self.conf.hard_filter.online))):
logging.info("hard filter by online: excluding node-%s" % node.node_id)
(bool(node.online) != self.conf.hard_filter.online)):
logging.info("hard filter by online: excluding node-%s" %
node.node_id)
return False
if (self.conf.hard_filter.node_ids and
((int(node.node_id) not in self.conf.hard_filter.node_ids) and
(str(node.node_id) not in self.conf.hard_filter.node_ids))):
logging.info("hard filter by ids: excluding node-%s" % node.node_id)
(int(node.node_id) not in self.conf.hard_filter.node_ids)):
logging.info("hard filter by ids: excluding node-%s" %
node.node_id)
return False
if self.conf.hard_filter.roles:
ok_roles = []
@ -428,7 +371,8 @@ class Nodes(object):
if role in self.conf.hard_filter.roles:
ok_roles.append(role)
if not ok_roles:
logging.info("hard filter by roles: excluding node-%s" % node.node_id)
logging.info("hard filter by roles: excluding node-%s" %
node.node_id)
return False
return True
@ -468,14 +412,13 @@ class Nodes(object):
def get_version(self):
cmd = "awk -F ':' '/release/ {print \$2}' /etc/nailgun/version.yaml"
logging.info('get_version:%s' %self.conf.ssh_opts)
fuelnode = self.nodes[self.fuelip]
release, err, code = ssh_node(ip=fuelnode.ip,
command=cmd,
ssh_opts=fuelnode.ssh_opts,
env_vars="",
timeout=fuelnode.timeout,
filename=None)
release, err, code = tools.ssh_node(ip=fuelnode.ip,
command=cmd,
ssh_opts=fuelnode.ssh_opts,
env_vars="",
timeout=fuelnode.timeout,
filename=None)
if code != 0:
logging.error("Can't get fuel version %s" % err)
sys.exit(3)
@ -485,23 +428,23 @@ class Nodes(object):
def get_release(self):
cmd = "awk -F ':' '/fuel_version/ {print \$2}' /etc/astute.yaml"
for node in self.nodes.values():
# skip master
if node.node_id == 0:
# skip master
node.release = self.version
if (node.node_id != 0) and (node.status == 'ready'):
release, err, code = ssh_node(ip=node.ip,
command=cmd,
sshopts=self.sshopts,
sshvars='',
timeout=self.timeout,
filename=None)
release, err, code = tools.ssh_node(ip=node.ip,
command=cmd,
ssh_opts=node.sshopts,
timeout=node.timeout)
if code != 0:
logging.warning("get_release: node: %s: Can't get node release" %
(node.node_id))
node.release = self.version
logging.warning("get_release: node: %s: %s" %
(node.node_id, "Can't get node release"))
node.release = None
continue
node.release = release.strip('\n "\'')
logging.info("get_release: node: %s, release: %s" % (node.node_id, node.release))
else:
node.release = release.strip('\n "\'')
logging.info("get_release: node: %s, release: %s" %
(node.node_id, node.release))
def get_node_file_list(self):
for key in self.files.keys():
@ -531,71 +474,61 @@ class Nodes(object):
for node in self.nodes.values():
logging.debug('%s' % node.files[ckey])
def exec_filter(self, node):
f = self.conf.soft_filter
if f:
result = (((not f.status) or (node.status in f.status)) and
((not f.roles) or (node.role in f.roles)) and
((not f.node_ids) or (node.node_id in f.node_ids)))
else:
result = True
return result and (((self.cluster and node.cluster != 0 and
str(self.cluster) == str(node.cluster)) or not
self.cluster) and node.online)
def launch_ssh(self, odir='info', timeout=15, fake=False):
lock = flock.FLock('/tmp/timmy-cmds.lock')
if not lock.lock():
logging.warning('Unable to obtain lock, skipping "cmds"-part')
return ''
label = ckey
threads = []
sem = threading.BoundedSemaphore(value=100)
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
sem.acquire(True)
t = threading.Thread(target=semaphore_release,
args=(sem,
node.exec_cmd,
node.node_id,
[label,
odir,
fake]))
threads.append(t)
t.start()
for t in threads:
t.join()
lock.unlock()
def filter_logs(self):
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
node.logs_filter(self.conf.log_files['filter'])
logging.debug('filter logs: node-%s: filtered logs: %s' %
(node.node_id, node.flogs))
try:
label = ckey
run_items = []
for n in [n for n in self.nodes.values() if self.exec_filter(n)]:
run_items.append(tools.RunItem(target=n.exec_cmd,
args={'label': label,
'odir': odir,
'fake': fake}))
tools.run_batch(run_items, 100)
finally:
lock.unlock()
def calculate_log_size(self, timeout=15):
lsize = 0
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
if not node.log_size_from_find(self.conf.log_files['path'],5):
logging.warning("can't get log file list from node %s" % node.node_id)
self.filter_logs()
for node in self.nodes.values():
for f in node.flogs:
lsize += node.flogs[f]
for fl in sorted(node.flogs.items(), key=lambda x: x[1]):
logging.debug(fl)
logging.info('Full log size on nodes(with fuel): %s bytes' % lsize)
self.alogsize = lsize / 1024
total_size = 0
for node in [n for n in self.nodes.values() if self.exec_filter(n)]:
if not node.logs_populate(5):
logging.warning("can't get log file list from node %s" %
node.node_id)
else:
node.logs_filter()
logging.debug('filter logs: node-%s: filtered logs: %s' %
(node.node_id, node.logs))
total_size += sum(node.logs.values())
logging.info('Full log size on nodes(with fuel): %s bytes' %
total_size)
self.alogsize = total_size / 1024
def is_enough_space(self, directory, coefficient=1.2):
mdir(directory)
outs, errs, code = free_space(directory, timeout=1)
tools.mdir(directory)
outs, errs, code = tools.free_space(directory, timeout=1)
if code != 0:
logging.error("Can't get free space: %s" % errs)
return False
try:
fs = int(outs.rstrip('\n'))
except:
logging.error("is_enough_space: can't get free space\nouts: %s" % outs)
logging.error("is_enough_space: can't get free space\nouts: %s" %
outs)
return False
logging.info('logsize: %s Kb, free space: %s Kb' % (self.alogsize, fs))
if (self.alogsize*coefficient > fs):
@ -606,10 +539,10 @@ class Nodes(object):
def create_archive_general(self, directory, outfile, timeout):
cmd = "tar jcf '%s' -C %s %s" % (outfile, directory, ".")
mdir(self.conf.archives)
tools.mdir(self.conf.archives)
logging.debug("create_archive_general: cmd: %s" % cmd)
outs, errs, code = launch_cmd(command=cmd,
timeout=timeout)
outs, errs, code = tools.launch_cmd(command=cmd,
timeout=timeout)
if code != 0:
logging.error("Can't create archive %s" % (errs))
@ -617,117 +550,78 @@ class Nodes(object):
'''Returns interface speed through which logs will be dowloaded'''
for node in self.nodes.values():
if not (node.ip == 'localhost' or node.ip.startswith('127.')):
cmd = "cat /sys/class/net/$(/sbin/ip -o route get %s | cut -d' ' -f3)/speed" % node.ip
out, err, code = launch_cmd(cmd, node.timeout)
cmd = ("%s$(/sbin/ip -o route get %s | cut -d' ' -f3)/speed" %
('cat /sys/class/net/', node.ip))
out, err, code = tools.launch_cmd(cmd, node.timeout)
if code != 0:
logging.error("can't get interface speed: error message: %s" % err)
return defspeed
logging.error("can't get interface speed: error: %s" % err)
return defspeed
try:
speed = int(out)
except:
speed = defspeed
return speed
def create_log_archives(self, outdir, timeout, fake=False, maxthreads=10, speed=100):
def archive_logs(self, outdir, timeout,
fake=False, maxthreads=10, speed=100):
if fake:
logging.info('create_log_archives: skip creating archives(fake:%s)' % fake)
logging.info('archive_logs:skip creating archives(fake:%s)' % fake)
return
threads = []
txtfl = []
speed = self.find_adm_interface_speed(speed)
if len(self.nodes) > maxthreads:
speed = int(speed * 0.9 / maxthreads)
else:
speed = int(speed * 0.9 / len(self.nodes))
pythonslowpipe = 'import sys\n'
pythonslowpipe += 'import time\n'
pythonslowpipe += 'while 1:\n'
pythonslowpipe += ' a = sys.stdin.read(int(1250*%s))\n' % speed
pythonslowpipe += ' if a:\n'
pythonslowpipe += ' sys.stdout.write(a)\n'
pythonslowpipe += ' time.sleep(0.01)\n'
pythonslowpipe += ' else:\n'
pythonslowpipe += ' break\n'
sem = threading.BoundedSemaphore(value=maxthreads)
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
sem.acquire(True)
node.archivelogsfile = os.path.join(outdir,
'logs-node-'+str(node.node_id) + '.tar.gz')
mdir(outdir)
logslistfile = node.archivelogsfile + '.txt'
txtfl.append(logslistfile)
try:
with open(logslistfile, 'w') as llf:
for line in node.flogs:
llf.write(line+"\0")
except:
logging.error("create_archive_logs: Can't write to file %s" % logslistfile)
continue
if node.ip == 'localhost' or node.ip.startswith('127.'):
cmd = "tar --gzip --create --file - --null --files-from -"
else:
cmd = "tar --gzip --create --file - --null --files-from - | python -c '%s'" % pythonslowpipe
t = threading.Thread(target=semaphore_release,
args=(sem,
node.exec_simple_cmd,
node.node_id,
[cmd,
logslistfile,
node.archivelogsfile,
timeout]
)
)
threads.append(t)
t.start()
while True:
speed = int(speed * 0.9 / min(maxthreads, len(self.nodes)))
pythonslowpipe = tools.slowpipe % speed
run_items = []
for node in [n for n in self.nodes.values() if self.exec_filter(n)]:
node.archivelogsfile = os.path.join(outdir,
'logs-node-%s.tar.gz' %
str(node.node_id))
tools.mdir(outdir)
logslistfile = node.archivelogsfile + '.txt'
txtfl.append(logslistfile)
try:
tt = []
for t in threads:
if t is not None and t.isAlive():
t.join(1)
else:
tt.append(t)
if len(threads) == len(tt):
break
except KeyboardInterrupt:
#sys.exit(9)
killall_children(self.timeout)
raise KeyboardInterrupt()
with open(logslistfile, 'w') as llf:
for filename in node.logs:
llf.write(filename+"\0")
except:
logging.error("create_archive_logs: Can't write to file %s" %
logslistfile)
continue
cmd = "tar --gzip --create --file - --null --files-from -"
if not (node.ip == 'localhost' or node.ip.startswith('127.')):
cmd = ' '.join([cmd, "| python -c '%s'" % pythonslowpipe])
args = {'cmd': cmd,
'infile': logslistfile,
'outfile': node.archivelogsfile,
'timeout': timeout}
run_items.append(tools.RunItem(target=node.exec_simple_cmd,
args=args))
tools.run_batch(run_items, maxthreads)
for tfile in txtfl:
try:
os.remove(tfile)
except:
logging.error("create_log_archives: can't delete file %s" % tfile)
logging.error("archive_logs: can't delete file %s" % tfile)
def get_conf_files(self, odir=fkey, timeout=15):
if fkey not in self.files:
logging.warning("get_conf_files: %s directory does not exist" % fkey)
logging.warning("get_conf_files: %s directory doesn't exist" %
fkey)
return
lock = flock.FLock('/tmp/timmy-files.lock')
if not lock.lock():
logging.warning('Unable to obtain lock, skipping "files"-part')
return ''
label = fkey
threads = []
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
t = threading.Thread(target=node.get_files,
args=(label,
odir,))
threads.append(t)
t.start()
for t in threads:
t.join()
lock.unlock()
try:
label = fkey
run_items = []
for n in [n for n in self.nodes.values() if self.exec_filter(n)]:
run_items.append(tools.RunItem(target=n.get_files,
args={'label': label,
'odir': odir}))
tools.run_batch(run_items, 10)
finally:
lock.unlock()
def main(argv=None):

View File

@ -22,37 +22,73 @@ tools module
import os
import logging
import sys
import threading
import multiprocessing
import subprocess
def import_subprocess():
if 'subprocess' not in globals():
global subprocess
global ok_python
try:
import subprocess32 as subprocess
logging.info("using improved subprocess32 module\n")
ok_python = True
except:
import subprocess
logging.warning(("Please upgrade the module 'subprocess' to the latest version: "
"https://pypi.python.org/pypi/subprocess32/"))
ok_python = True
if sys.version_info > (2, 7, 0):
ok_python = False
logging.warning('this subprocess module does not support timeouts')
slowpipe = '''
import sys
import time
while 1:
a = sys.stdin.read(int(1250*%s))
if a:
sys.stdout.write(a)
time.sleep(0.01)
else:
logging.info('subprocess is already loaded')
break
'''
def semaphore_release(sema, func, node_id, params):
logging.info('start ssh node: %s' % node_id)
def interrupt_wrapper(f):
def wrapper(*args, **kwargs):
try:
f(*args, **kwargs)
except KeyboardInterrupt:
logging.warning('Interrupted, exiting.')
return wrapper
class RunItem():
def __init__(self, target, args):
self.target = target
self.args = args
self.process = None
class SemaphoreProcess(multiprocessing.Process):
def __init__(self, semaphore, target, args):
multiprocessing.Process.__init__(self)
self.semaphore = semaphore
self.target = target
self.args = args
def run(self):
try:
self.target(**self.args)
finally:
logging.debug('finished call: %s' % self.target)
self.semaphore.release()
def run_batch(item_list, maxthreads):
semaphore = multiprocessing.BoundedSemaphore(maxthreads)
try:
result = func(*params)
except:
logging.error("failed to launch: %s on node %s" % node_id)
finally:
sema.release()
logging.info('finish ssh node: %s' % node_id)
return result
for run_item in item_list:
semaphore.acquire(True)
p = SemaphoreProcess(target=run_item.target,
semaphore=semaphore,
args=run_item.args)
run_item.process = p
p.start()
for run_item in item_list:
run_item.process.join()
run_item.process = None
except KeyboardInterrupt:
for run_item in item_list:
if run_item.process:
run_item.process.terminate()
raise KeyboardInterrupt()
def get_dir_structure(rootdir):
@ -85,27 +121,34 @@ def mdir(directory):
def launch_cmd(command, timeout):
def _timeout_terminate(pid):
try:
os.kill(pid, 15)
logging.error("launch_cmd: pid %d killed by timeout" % pid)
except:
pass
logging.info('launch_cmd: command %s' % command)
p = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if ok_python:
timeout_killer = None
try:
timeout_killer = threading.Timer(timeout, _timeout_terminate, [p.pid])
timeout_killer.start()
outs, errs = p.communicate()
except:
try:
outs, errs = p.communicate(timeout=timeout+1)
except subprocess.TimeoutExpired:
p.kill()
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
else:
try:
outs, errs = p.communicate()
except:
p.kill()
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
pass
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
finally:
if timeout_killer:
timeout_killer.cancel()
logging.debug("ssh return: err:%s\nouts:%s\ncode:%s" %
(errs, outs, p.returncode))
logging.info("ssh return: err:%s\ncode:%s" %
@ -115,8 +158,10 @@ def launch_cmd(command, timeout):
def ssh_node(ip, command, ssh_opts=[], env_vars=[], timeout=15, filename=None,
inputfile=None, outputfile=None, prefix='nice -n 19 ionice -c 3'):
#ssh_opts = " ".join(ssh_opts)
#env_vars = " ".join(env_vars)
if type(ssh_opts) is list:
ssh_opts = ' '.join(ssh_opts)
if type(env_vars) is list:
env_vars = ' '.join(env_vars)
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
logging.info("skip ssh")
bstr = "%s timeout '%s' bash -c " % (
@ -135,9 +180,12 @@ def ssh_node(ip, command, ssh_opts=[], env_vars=[], timeout=15, filename=None,
logging.info("ssh_node: inputfile selected, cmd: %s" % cmd)
if outputfile is not None:
cmd += ' > "' + outputfile + '"'
cmd = ("trap 'kill $pid' 15; " +
"trap 'kill $pid' 2; " + cmd + '&:; pid=$!; wait $!')
outs, errs, code = launch_cmd(cmd, timeout)
return outs, errs, code
def killall_children(timeout):
cmd = 'ps -o pid --ppid %d --noheaders' % os.getpid()
out, errs, code = launch_cmd(cmd, timeout)
@ -166,7 +214,10 @@ def killall_children(timeout):
except:
logging.warning('could not kill %s' % p)
def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
if type(ssh_opts) is list:
ssh_opts = ' '.join(ssh_opts)
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
logging.info("skip ssh rsync")
cmd = ("timeout '%s' rsync -avzr --files-from=- / '%s'"
@ -185,22 +236,13 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if ok_python:
try:
outs, errs = p.communicate(input=data, timeout=timeout+1)
except subprocess.TimeoutExpired:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
else:
try:
outs, errs = p.communicate(input=data)
except:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
try:
outs, errs = p.communicate(input=data)
except:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
logging.debug("ip: %s, ssh return: err:%s\nouts:%s\ncode:%s" %
(ip, errs, outs, p.returncode))