fix: Too many open files

+ better logging info
+ better KeyboardInterrupt handling
+ sort imports :)

Change-Id: If044a363bd84e03a1e5f8b18413225ae1f15b4e9
Closed-Bug: #1627477
This commit is contained in:
Aleksandr Dobdin 2016-09-27 12:55:48 +00:00 committed by Dmitry Sutyagin
parent f4f70a2edb
commit 0ef46e49a1

View File

@ -19,18 +19,19 @@
tools module tools module
""" """
import os from flock import FLock
from multiprocessing import Process, Queue, BoundedSemaphore
from pipes import quote
from tempfile import gettempdir
import json
import logging import logging
import os
import signal
import subprocess
import sys import sys
import threading import threading
from multiprocessing import Process, Queue, BoundedSemaphore import traceback
import subprocess
import yaml import yaml
import json
from flock import FLock
from tempfile import gettempdir
from pipes import quote
import signal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
slowpipe = ''' slowpipe = '''
@ -51,7 +52,7 @@ def interrupt_wrapper(f):
try: try:
f(*args, **kwargs) f(*args, **kwargs)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning('Interrupted, exiting.') logger.warning('received keyboard interrupt, exiting')
sys.exit(signal.SIGINT) sys.exit(signal.SIGINT)
except Exception as e: except Exception as e:
logger.error('Error: %s' % e, exc_info=True) logger.error('Error: %s' % e, exc_info=True)
@ -98,54 +99,88 @@ class SemaphoreProcess(Process):
self.queue = queue self.queue = queue
def run(self): def run(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
fin_msg = 'finished subprocess, pid: %s'
sem_msg = 'semaphore released by subprocess, pid: %s'
try: try:
result = self.target(**self.args) result = self.target(**self.args)
if self.queue: if self.queue:
self.queue.put_nowait(result) self.queue.put_nowait(result)
except Exception as error: except Exception as error:
self.logger.exception(error)
if self.queue: if self.queue:
self.queue.put_nowait(error) self.queue.put_nowait(error)
self.queue.put_nowait(traceback.format_exc())
finally: finally:
self.logger.debug('finished call: %s' % self.target) self.logger.debug(fin_msg % self.pid)
self.semaphore.release() self.semaphore.release()
self.logger.debug('semaphore released') self.logger.debug(sem_msg % self.pid)
def run_batch(item_list, maxthreads, dict_result=False): def run_batch(item_list, maxthreads, dict_result=False):
def cleanup(): exc_msg = 'exception in subprocess, pid: %s, details:'
logger.debug('cleanup processes') rem_msg = 'removing reference to finished subprocess, pid: %s'
for run_item in item_list: int_msg = 'received keyboard interrupt during batch execution, cleaning up'
if run_item.process:
run_item.process.terminate() def cleanup(launched):
logger.info('cleaning up running subprocesses')
for proc in launched.values():
logger.debug('terminating subprocess, pid: %s' % proc.pid)
proc.terminate()
proc.join()
def collect_results(l, join=False):
results = {}
remove_procs = []
for key, proc in l.items():
if not proc.is_alive() or join:
results[key] = proc.queue.get()
if isinstance(results[key], Exception):
exc_text = proc.queue.get()
logger.critical(exc_msg % proc.pid)
for line in exc_text.splitlines():
logger.critical('____%s' % line)
cleanup(l)
sys.exit(109)
logger.debug('joining subprocess, pid: %s' % proc.pid)
proc.join()
remove_procs.append(key)
for key in remove_procs:
logger.debug(rem_msg % key)
l.pop(key)
return results
semaphore = BoundedSemaphore(maxthreads) semaphore = BoundedSemaphore(maxthreads)
try: try:
launched = {}
results = {}
if not dict_result:
key = 0
for run_item in item_list: for run_item in item_list:
semaphore.acquire(True) results.update(collect_results(launched))
run_item.queue = Queue() semaphore.acquire(block=True)
p = SemaphoreProcess(target=run_item.target, p = SemaphoreProcess(target=run_item.target,
semaphore=semaphore, semaphore=semaphore,
args=run_item.args, args=run_item.args,
queue=run_item.queue) queue=Queue())
run_item.process = p
p.start() p.start()
for run_item in item_list: if dict_result:
run_item.result = run_item.queue.get() launched[run_item.key] = p
if isinstance(run_item.result, Exception): logger.debug('started subprocess, pid: %s, func: %s, key: %s' %
logger.critical('%s, exiting' % run_item.result) (p.pid, run_item.target, run_item.key))
cleanup() else:
sys.exit(109) launched[key] = p
run_item.process.join() key += 1
run_item.process = None logger.debug('started subprocess, pid:%s, func:%s, key:%s' %
(p.pid, run_item.target, key))
results.update(collect_results(launched, True))
if dict_result: if dict_result:
result = {} return results
for run_item in item_list:
result[run_item.key] = run_item.result
return result
else: else:
return [run_item.result for run_item in item_list] return results.values()
except KeyboardInterrupt: except KeyboardInterrupt:
cleanup() logger.warning(int_msg)
cleanup(launched)
raise KeyboardInterrupt() raise KeyboardInterrupt()
@ -211,7 +246,7 @@ def launch_cmd(cmd, timeout, input=None, ok_codes=None, decode=True):
shell=True, shell=True,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE, close_fds=True)
timeout_killer = None timeout_killer = None
outs = None outs = None
errs = None errs = None