This commit is contained in:
f3flight 2016-04-28 03:01:06 +00:00
parent 965d4cbdec
commit 8ada5fb7a6
4 changed files with 103 additions and 127 deletions

View File

@ -1,21 +1,21 @@
ssh_opts:
- -oConnectTimeout=2
- -oStrictHostKeyChecking=no
- -oUserKnownHostsFile=/dev/null
- -oLogLevel=error
- -lroot
- -oBatchMode=yes
- '-oConnectTimeout=2'
- '-oStrictHostKeyChecking=no'
- '-oUserKnownHostsFile=/dev/null'
- '-oLogLevel=error'
- '-lroot'
- '-oBatchMode=yes'
env_vars:
OPENRC: '/root/openrc'
IPTABLES_STR: 'iptables -nvL'
fuelip: 127.0.0.1
rqdir: ./rq
- 'OPENRC=/root/openrc'
- 'IPTABLES_STR="iptables -nvL"'
fuelip: '127.0.0.1'
rqdir: './rq'
soft_filter:
status: ['ready']
timeout: 15
compress_timeout: 3600
log_files:
path: /var/log
path: '/var/log'
filter:
default:
include: '(.)*'

View File

@ -23,8 +23,10 @@ import sys
import os
from timmy.conf import Conf
from timmy import flock
from timmy.tools import interrupt_wrapper
@interrupt_wrapper
def main(argv=None):
if argv is None:
argv = sys.argv

View File

@ -24,7 +24,7 @@ import json
import os
import logging
import sys
import threading
import multiprocessing
import re
from tools import *
@ -198,7 +198,7 @@ class Node(object):
logging.info("node: %s, ip: %s, size: %s" %
(self.node_id, self.ip, self.logsize))
def get_files(self, label, sshopts, odir='info', timeout=15):
def get_files(self, label, odir='info', timeout=15):
logging.info('node:%s(%s), filelist: %s' %
(self.node_id, self.ip, label))
sn = 'node-%s' % self.node_id
@ -353,7 +353,6 @@ class Nodes(object):
"""Class nodes """
def __init__(self, cluster, extended, conf, filename=None):
import_subprocess()
self.dirname = conf.rqdir.rstrip('/')
if (not os.path.exists(self.dirname)):
logging.error("directory %s doesn't exist" % (self.dirname))
@ -541,25 +540,23 @@ class Nodes(object):
logging.warning('Unable to obtain lock, skipping "cmds"-part')
return ''
label = ckey
threads = []
sem = threading.BoundedSemaphore(value=100)
procs = []
semaphore = multiprocessing.BoundedSemaphore(100)
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
sem.acquire(True)
t = threading.Thread(target=semaphore_release,
args=(sem,
node.exec_cmd,
node.node_id,
[label,
odir,
fake]))
threads.append(t)
t.start()
for t in threads:
t.join()
semaphore.acquire(True)
p = SemaphoreProcess(target=node.exec_cmd,
semaphore=semaphore,
args={'label': label,
'odir': odir,
'fake': fake})
procs.append(p)
p.start()
for p in procs:
p.join()
lock.unlock()
def filter_logs(self):
@ -636,7 +633,7 @@ class Nodes(object):
if fake:
logging.info('create_log_archives: skip creating archives(fake:%s)' % fake)
return
threads = []
procs = []
txtfl = []
speed = self.find_adm_interface_speed(speed)
if len(self.nodes) > maxthreads:
@ -652,14 +649,12 @@ class Nodes(object):
pythonslowpipe += ' time.sleep(0.01)\n'
pythonslowpipe += ' else:\n'
pythonslowpipe += ' break\n'
sem = threading.BoundedSemaphore(value=maxthreads)
semaphore = threading.BoundedSemaphore(maxthreads)
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
sem.acquire(True)
node.archivelogsfile = os.path.join(outdir,
'logs-node-'+str(node.node_id) + '.tar.gz')
mdir(outdir)
@ -676,33 +671,17 @@ class Nodes(object):
cmd = "tar --gzip --create --file - --null --files-from -"
else:
cmd = "tar --gzip --create --file - --null --files-from - | python -c '%s'" % pythonslowpipe
t = threading.Thread(target=semaphore_release,
args=(sem,
node.exec_simple_cmd,
node.node_id,
[cmd,
logslistfile,
node.archivelogsfile,
timeout]
)
)
threads.append(t)
t.start()
while True:
try:
tt = []
for t in threads:
if t is not None and t.isAlive():
t.join(1)
else:
tt.append(t)
if len(threads) == len(tt):
break
except KeyboardInterrupt:
#sys.exit(9)
killall_children(self.timeout)
raise KeyboardInterrupt()
semaphore.acquire(True)
p = SemaphoreProcess(target=node.exec_simple_cmd,
semaphore=semaphore,
args={'cmd': cmd,
'infile': logslistfile,
'outfile': node.archivelogsfile,
'timeout': timeout})
procs.append(p)
p.start()
for p in procs:
p.join()
for tfile in txtfl:
try:
os.remove(tfile)
@ -718,19 +697,22 @@ class Nodes(object):
logging.warning('Unable to obtain lock, skipping "files"-part')
return ''
label = fkey
threads = []
procs = []
semaphore = multiprocessing.BoundedSemaphore(10)
for node in self.nodes.values():
if (self.cluster and str(self.cluster) != str(node.cluster) and
node.cluster != 0):
continue
if node.status in self.conf.soft_filter.status and node.online:
t = threading.Thread(target=node.get_files,
args=(label,
odir,))
threads.append(t)
t.start()
for t in threads:
t.join()
semaphore.acquire(True)
p = SemaphoreProcess(target=node.get_files,
semaphore=semaphore,
args={'label': label,
'odir': odir})
procs.append(p)
p.start()
for p in procs:
p.join()
lock.unlock()

View File

@ -22,37 +22,32 @@ tools module
import os
import logging
import sys
import threading
import multiprocessing
import subprocess
def import_subprocess():
if 'subprocess' not in globals():
global subprocess
global ok_python
def interrupt_wrapper(f):
def wrapper(*args, **kwargs):
try:
import subprocess32 as subprocess
logging.info("using improved subprocess32 module\n")
ok_python = True
except:
import subprocess
logging.warning(("Please upgrade the module 'subprocess' to the latest version: "
"https://pypi.python.org/pypi/subprocess32/"))
ok_python = True
if sys.version_info > (2, 7, 0):
ok_python = False
logging.warning('this subprocess module does not support timeouts')
else:
logging.info('subprocess is already loaded')
f(*args, **kwargs)
except KeyboardInterrupt:
logging.warning('Interrupted, exiting.')
def semaphore_release(sema, func, node_id, params):
logging.info('start ssh node: %s' % node_id)
try:
result = func(*params)
except:
logging.error("failed to launch: %s on node %s" % node_id)
finally:
sema.release()
logging.info('finish ssh node: %s' % node_id)
return result
return wrapper
class SemaphoreProcess(multiprocessing.Process):
def __init__(self, semaphore, target, args):
multiprocessing.Process.__init__(self)
self.semaphore = semaphore
self.target = target
self.args = args
def run(self):
try:
self.target(**self.args)
finally:
self.semaphore.release()
def get_dir_structure(rootdir):
@ -85,27 +80,32 @@ def mdir(directory):
def launch_cmd(command, timeout):
def _timeout_terminate(pid):
try:
os.kill(pid, 15)
logging.error("launch_cmd: pid %d killed by timeout" % pid)
except:
pass
logging.info('launch_cmd: command %s' % command)
p = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if ok_python:
try:
outs, errs = p.communicate(timeout=timeout+1)
except subprocess.TimeoutExpired:
timeout_killer = None
try:
timeout_killer = threading.Timer(timeout, _timeout_terminate, [p.pid])
timeout_killer.start()
outs, errs = p.communicate()
except:
if p and not p.poll():
p.kill()
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
else:
try:
outs, errs = p.communicate()
except:
p.kill()
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
outs, errs = p.communicate()
logging.error("command: %s err: %s, returned: %s" %
(command, errs, p.returncode))
finally:
if timeout_killer:
timeout_killer.cancel()
logging.debug("ssh return: err:%s\nouts:%s\ncode:%s" %
(errs, outs, p.returncode))
logging.info("ssh return: err:%s\ncode:%s" %
@ -117,8 +117,8 @@ def ssh_node(ip, command, ssh_opts=[], env_vars=[], timeout=15, filename=None,
inputfile=None, outputfile=None, prefix='nice -n 19 ionice -c 3'):
if type(ssh_opts) is list:
ssh_opts = ' '.join(ssh_opts)
if type(env_vars) is dict:
env_vars = ' '.join(['%s="%s"' % (x, y) for x, y in env_vars.items()])
if type(env_vars) is list:
env_vars = ' '.join(env_vars)
if (ip in ['localhost', '127.0.0.1']) or ip.startswith('127.'):
logging.info("skip ssh")
bstr = "%s timeout '%s' bash -c " % (
@ -137,6 +137,7 @@ def ssh_node(ip, command, ssh_opts=[], env_vars=[], timeout=15, filename=None,
logging.info("ssh_node: inputfile selected, cmd: %s" % cmd)
if outputfile is not None:
cmd += ' > "' + outputfile + '"'
cmd = "trap 'kill $pid' 15;" + cmd + '&:; pid=$!; wait $!'
outs, errs, code = launch_cmd(cmd, timeout)
return outs, errs, code
@ -189,22 +190,13 @@ def get_files_rsync(ip, data, ssh_opts, dpath, timeout=15):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if ok_python:
try:
outs, errs = p.communicate(input=data, timeout=timeout+1)
except subprocess.TimeoutExpired:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
else:
try:
outs, errs = p.communicate(input=data)
except:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
try:
outs, errs = p.communicate(input=data)
except:
p.kill()
outs, errs = p.communicate()
logging.error("ip: %s, command: %s err: %s, returned: %s" %
(ip, cmd, errs, p.returncode))
logging.debug("ip: %s, ssh return: err:%s\nouts:%s\ncode:%s" %
(ip, errs, outs, p.returncode))