Fix: main process stuck if subprocess killed

Fix for one of the two issues reported here:
https://github.com/adobdin/timmy/issues/64

Change-Id: I2fb45320783f93a558dda1fb33b9f2d2adbe9656
This commit is contained in:
Dmitry Sutyagin 2016-10-24 17:14:11 -07:00
parent 33e4464e1a
commit cdfa6acf97

View File

@ -20,12 +20,13 @@ tools module
""" """
from flock import FLock from flock import FLock
from multiprocessing import Process, Queue, BoundedSemaphore
from pipes import quote from pipes import quote
from tempfile import gettempdir from tempfile import gettempdir
from timmy.env import project_name from timmy.env import project_name
import Queue
import json import json
import logging import logging
import multiprocessing as mp
import os import os
import signal import signal
import subprocess import subprocess
@ -88,9 +89,9 @@ class RunItem():
self.logger = logger or logging.getLogger(project_name) self.logger = logger or logging.getLogger(project_name)
class SemaphoreProcess(Process): class SemaphoreProcess(mp.Process):
def __init__(self, semaphore, target, args=None, queue=None, logger=None): def __init__(self, semaphore, target, args=None, queue=None, logger=None):
Process.__init__(self) mp.Process.__init__(self)
self.logger = logger or logging.getLogger(project_name) self.logger = logger or logging.getLogger(project_name)
self.semaphore = semaphore self.semaphore = semaphore
self.target = target self.target = target
@ -121,6 +122,7 @@ def run_batch(item_list, maxthreads, dict_result=False):
exc_msg = 'exception in subprocess, pid: %s, details:' exc_msg = 'exception in subprocess, pid: %s, details:'
rem_msg = 'removing reference to finished subprocess, pid: %s' rem_msg = 'removing reference to finished subprocess, pid: %s'
int_msg = 'received keyboard interrupt during batch execution, cleaning up' int_msg = 'received keyboard interrupt during batch execution, cleaning up'
emp_msg = 'subprocess did not return results, pid: %s'
def cleanup(launched): def cleanup(launched):
logger.info('cleaning up running subprocesses') logger.info('cleaning up running subprocesses')
@ -134,23 +136,26 @@ def run_batch(item_list, maxthreads, dict_result=False):
remove_procs = [] remove_procs = []
for key, proc in l.items(): for key, proc in l.items():
if not proc.is_alive() or join: if not proc.is_alive() or join:
results[key] = proc.queue.get() logger.debug('joining subprocess, pid: %s' % proc.pid)
if isinstance(results[key], Exception): proc.join()
try:
results[key] = proc.queue.get(block=False)
except Queue.Empty:
logger.warning(emp_msg % proc.pid)
if key in results and isinstance(results[key], Exception):
exc_text = proc.queue.get() exc_text = proc.queue.get()
logger.critical(exc_msg % proc.pid) logger.critical(exc_msg % proc.pid)
for line in exc_text.splitlines(): for line in exc_text.splitlines():
logger.critical('____%s' % line) logger.critical('____%s' % line)
cleanup(l) cleanup(l)
sys.exit(109) sys.exit(109)
logger.debug('joining subprocess, pid: %s' % proc.pid)
proc.join()
remove_procs.append(key) remove_procs.append(key)
for key in remove_procs: for key in remove_procs:
logger.debug(rem_msg % key) logger.debug(rem_msg % key)
l.pop(key) l.pop(key)
return results return results
semaphore = BoundedSemaphore(maxthreads) semaphore = mp.BoundedSemaphore(maxthreads)
try: try:
launched = {} launched = {}
results = {} results = {}
@ -162,7 +167,7 @@ def run_batch(item_list, maxthreads, dict_result=False):
p = SemaphoreProcess(target=run_item.target, p = SemaphoreProcess(target=run_item.target,
semaphore=semaphore, semaphore=semaphore,
args=run_item.args, args=run_item.args,
queue=Queue()) queue=mp.Queue())
p.start() p.start()
if dict_result: if dict_result:
launched[run_item.key] = p launched[run_item.key] = p