Move locks into a decorator, add more locks, fix flake8
This commit is contained in:
parent
35df4446d3
commit
284b6d7cf2
53
timmy/cli.py
53
timmy/cli.py
@ -21,9 +21,7 @@ import logging
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
from timmy.conf import load_conf
|
from timmy.conf import load_conf
|
||||||
from timmy import flock
|
|
||||||
from timmy.tools import interrupt_wrapper
|
from timmy.tools import interrupt_wrapper
|
||||||
from tempfile import gettempdir
|
|
||||||
|
|
||||||
|
|
||||||
def pretty_run(msg, f, args=[], kwargs={}):
|
def pretty_run(msg, f, args=[], kwargs={}):
|
||||||
@ -131,46 +129,33 @@ def main(argv=None):
|
|||||||
main_arc = args.dest_file
|
main_arc = args.dest_file
|
||||||
nm = pretty_run('Initializing node data',
|
nm = pretty_run('Initializing node data',
|
||||||
NodeManager,
|
NodeManager,
|
||||||
kwargs={'conf': conf,
|
kwargs={'conf': conf, 'extended': args.extended})
|
||||||
'extended': args.extended})
|
|
||||||
if not args.only_logs:
|
if not args.only_logs:
|
||||||
if not (conf['shell_mode'] and not args.command):
|
if not (conf['shell_mode'] and not args.command):
|
||||||
pretty_run('Executing commands and scripts',
|
pretty_run('Executing commands and scripts',
|
||||||
nm.run_commands,
|
nm.run_commands,
|
||||||
args=(conf['outdir'], args.maxthreads))
|
args=(conf['outdir'], args.maxthreads))
|
||||||
if not (conf['shell_mode'] and not args.file):
|
if not (conf['shell_mode'] and not args.file):
|
||||||
pretty_run('Collecting files and filelists',
|
pretty_run('Collecting files and filelists',
|
||||||
nm.get_files,
|
nm.get_files,
|
||||||
args=(conf['outdir'], args.maxthreads))
|
args=(conf['outdir'], args.maxthreads))
|
||||||
if not args.no_archive:
|
if not args.no_archive:
|
||||||
pretty_run('Creating outputs and files archive',
|
pretty_run('Creating outputs and files archive',
|
||||||
nm.create_archive_general,
|
nm.create_archive_general,
|
||||||
args=(conf['outdir'], main_arc, 60))
|
args=(conf['outdir'], main_arc, 60))
|
||||||
if args.only_logs or args.getlogs:
|
if args.only_logs or args.getlogs:
|
||||||
lf = os.path.join(gettempdir(), 'timmy-logs.lock')
|
size = pretty_run('Calculating logs size', nm.calculate_log_size,
|
||||||
lock = flock.FLock(lf)
|
args=(args.maxthreads,))
|
||||||
if lock.lock():
|
if size == 0:
|
||||||
size = pretty_run('Calculating logs size',
|
logging.warning('Size zero - no logs to collect.')
|
||||||
nm.calculate_log_size,
|
return
|
||||||
args=(args.maxthreads,))
|
enough = pretty_run('Checking free space', nm.is_enough_space,
|
||||||
if size == 0:
|
args=(conf['archives'],))
|
||||||
logging.warning('Size zero - no logs to collect.')
|
if enough:
|
||||||
print('Size zero - no logs to collect.')
|
pretty_run('Collecting and packing logs', nm.get_logs,
|
||||||
return
|
args=(conf['archives'], conf['compress_timeout']),
|
||||||
enough = pretty_run('Checking free space',
|
kwargs={'maxthreads': args.logs_maxthreads,
|
||||||
nm.is_enough_space,
|
'fake': args.fake_logs})
|
||||||
args=(conf['archives'],))
|
|
||||||
if enough:
|
|
||||||
pretty_run('Collecting and packing logs',
|
|
||||||
nm.get_logs,
|
|
||||||
args=(conf['archives'],
|
|
||||||
conf['compress_timeout']),
|
|
||||||
kwargs={'maxthreads': args.logs_maxthreads,
|
|
||||||
'fake': args.fake_logs})
|
|
||||||
lock.unlock()
|
|
||||||
else:
|
|
||||||
logging.warning('Unable to obtain lock %s, skipping "logs"-part' %
|
|
||||||
lf)
|
|
||||||
logging.info("Nodes:\n%s" % nm)
|
logging.info("Nodes:\n%s" % nm)
|
||||||
print('Run complete. Node information:')
|
print('Run complete. Node information:')
|
||||||
print(nm)
|
print(nm)
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
main module
|
main module
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import flock
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
@ -27,8 +26,7 @@ import logging
|
|||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
import tools
|
import tools
|
||||||
from tempfile import gettempdir
|
from tools import w_list, run_with_lock
|
||||||
from tools import w_list
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
|
|
||||||
@ -527,12 +525,9 @@ class NodeManager(object):
|
|||||||
checks.append(not set(node_v).isdisjoint(filter_v))
|
checks.append(not set(node_v).isdisjoint(filter_v))
|
||||||
return all(checks)
|
return all(checks)
|
||||||
|
|
||||||
|
@run_with_lock
|
||||||
def run_commands(self, odir='info', timeout=15, fake=False,
|
def run_commands(self, odir='info', timeout=15, fake=False,
|
||||||
maxthreads=100):
|
maxthreads=100):
|
||||||
lock = flock.FLock(os.path.join(gettempdir(), 'timmy-cmds.lock'))
|
|
||||||
if not lock.lock():
|
|
||||||
logging.warning('Unable to obtain lock, skipping "cmds"-part')
|
|
||||||
return ''
|
|
||||||
run_items = []
|
run_items = []
|
||||||
for key, node in self.nodes.items():
|
for key, node in self.nodes.items():
|
||||||
if not node.filtered_out:
|
if not node.filtered_out:
|
||||||
@ -543,7 +538,6 @@ class NodeManager(object):
|
|||||||
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
result = tools.run_batch(run_items, maxthreads, dict_result=True)
|
||||||
for key in result:
|
for key in result:
|
||||||
self.nodes[key] = result[key]
|
self.nodes[key] = result[key]
|
||||||
lock.unlock()
|
|
||||||
|
|
||||||
def calculate_log_size(self, timeout=15, maxthreads=100):
|
def calculate_log_size(self, timeout=15, maxthreads=100):
|
||||||
total_size = 0
|
total_size = 0
|
||||||
@ -582,6 +576,7 @@ class NodeManager(object):
|
|||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@run_with_lock
|
||||||
def create_archive_general(self, directory, outfile, timeout):
|
def create_archive_general(self, directory, outfile, timeout):
|
||||||
cmd = "tar zcf '%s' -C %s %s" % (outfile, directory, ".")
|
cmd = "tar zcf '%s' -C %s %s" % (outfile, directory, ".")
|
||||||
tools.mdir(self.conf['archives'])
|
tools.mdir(self.conf['archives'])
|
||||||
@ -607,6 +602,7 @@ class NodeManager(object):
|
|||||||
speed = defspeed
|
speed = defspeed
|
||||||
return speed
|
return speed
|
||||||
|
|
||||||
|
@run_with_lock
|
||||||
def get_logs(self, outdir, timeout, fake=False, maxthreads=10, speed=100):
|
def get_logs(self, outdir, timeout, fake=False, maxthreads=10, speed=100):
|
||||||
if fake:
|
if fake:
|
||||||
logging.info('archive_logs:skip creating archives(fake:%s)' % fake)
|
logging.info('archive_logs:skip creating archives(fake:%s)' % fake)
|
||||||
@ -653,17 +649,13 @@ class NodeManager(object):
|
|||||||
except:
|
except:
|
||||||
logging.error("archive_logs: can't delete file %s" % tfile)
|
logging.error("archive_logs: can't delete file %s" % tfile)
|
||||||
|
|
||||||
|
@run_with_lock
|
||||||
def get_files(self, odir=Node.fkey, timeout=15):
|
def get_files(self, odir=Node.fkey, timeout=15):
|
||||||
lock = flock.FLock(os.path.join(gettempdir(), 'timmy-files.lock'))
|
|
||||||
if not lock.lock():
|
|
||||||
logging.warning('Unable to obtain lock, skipping "files"-part')
|
|
||||||
return ''
|
|
||||||
run_items = []
|
run_items = []
|
||||||
for n in [n for n in self.nodes.values() if not n.filtered_out]:
|
for n in [n for n in self.nodes.values() if not n.filtered_out]:
|
||||||
run_items.append(tools.RunItem(target=n.get_files,
|
run_items.append(tools.RunItem(target=n.get_files,
|
||||||
args={'odir': odir}))
|
args={'odir': odir}))
|
||||||
tools.run_batch(run_items, 10)
|
tools.run_batch(run_items, 10)
|
||||||
lock.unlock()
|
|
||||||
|
|
||||||
|
|
||||||
def main(argv=None):
|
def main(argv=None):
|
||||||
|
@ -26,6 +26,8 @@ import threading
|
|||||||
from multiprocessing import Process, Queue, BoundedSemaphore
|
from multiprocessing import Process, Queue, BoundedSemaphore
|
||||||
import subprocess
|
import subprocess
|
||||||
import yaml
|
import yaml
|
||||||
|
from flock import FLock
|
||||||
|
from tempfile import gettempdir
|
||||||
|
|
||||||
|
|
||||||
slowpipe = '''
|
slowpipe = '''
|
||||||
@ -58,6 +60,18 @@ def interrupt_wrapper(f):
|
|||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def run_with_lock(f):
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
lock = FLock(os.path.join(gettempdir(), 'timmy_%s.lock' % f.__name__))
|
||||||
|
if not lock.lock():
|
||||||
|
logging.warning('Unable to obtain lock, skipping "%s"' %
|
||||||
|
f.__name__)
|
||||||
|
return ''
|
||||||
|
f(*args, **kwargs)
|
||||||
|
lock.unlock()
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
class RunItem():
|
class RunItem():
|
||||||
def __init__(self, target, args, key=None):
|
def __init__(self, target, args, key=None):
|
||||||
self.target = target
|
self.target = target
|
||||||
|
Loading…
x
Reference in New Issue
Block a user