Implement communication between subprocs and main proc
Fix node object not updated in main proc
This commit is contained in:
parent
05d8cce13b
commit
28ca313268
@ -152,12 +152,14 @@ class Node(object):
|
|||||||
(self.node_id, self.ip, os.path.basename(f)))
|
(self.node_id, self.ip, os.path.basename(f)))
|
||||||
logging.info('outfile: %s' % dfile)
|
logging.info('outfile: %s' % dfile)
|
||||||
self.mapcmds[os.path.basename(f)] = dfile
|
self.mapcmds[os.path.basename(f)] = dfile
|
||||||
|
print(self.mapcmds)
|
||||||
if not fake:
|
if not fake:
|
||||||
try:
|
try:
|
||||||
with open(dfile, 'w') as df:
|
with open(dfile, 'w') as df:
|
||||||
df.write(outs)
|
df.write(outs)
|
||||||
except:
|
except:
|
||||||
logging.error("exec_cmd: can't write to file %s" % dfile)
|
logging.error("exec_cmd: can't write to file %s" % dfile)
|
||||||
|
return self
|
||||||
|
|
||||||
def exec_simple_cmd(self, cmd, infile, outfile, timeout=15, fake=False):
|
def exec_simple_cmd(self, cmd, infile, outfile, timeout=15, fake=False):
|
||||||
logging.info('node:%s(%s), exec: %s' % (self.node_id, self.ip, cmd))
|
logging.info('node:%s(%s), exec: %s' % (self.node_id, self.ip, cmd))
|
||||||
@ -491,17 +493,17 @@ class Nodes(object):
|
|||||||
if not lock.lock():
|
if not lock.lock():
|
||||||
logging.warning('Unable to obtain lock, skipping "cmds"-part')
|
logging.warning('Unable to obtain lock, skipping "cmds"-part')
|
||||||
return ''
|
return ''
|
||||||
try:
|
label = ckey
|
||||||
label = ckey
|
run_items = []
|
||||||
run_items = []
|
for key, node in self.nodes.items():
|
||||||
for n in [n for n in self.nodes.values() if self.exec_filter(n)]:
|
if self.exec_filter(node):
|
||||||
run_items.append(tools.RunItem(target=n.exec_cmd,
|
run_items.append(tools.RunItem(target=node.exec_cmd,
|
||||||
args={'label': label,
|
args={'label': label,
|
||||||
'odir': odir,
|
'odir': odir,
|
||||||
'fake': fake}))
|
'fake': fake},
|
||||||
tools.run_batch(run_items, 100)
|
key=key))
|
||||||
finally:
|
self.nodes = tools.run_batch(run_items, 100, dict_result=True)
|
||||||
lock.unlock()
|
lock.unlock()
|
||||||
|
|
||||||
def calculate_log_size(self, timeout=15):
|
def calculate_log_size(self, timeout=15):
|
||||||
total_size = 0
|
total_size = 0
|
||||||
|
@ -23,7 +23,7 @@ import os
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import multiprocessing
|
from multiprocessing import Process, Queue, BoundedSemaphore
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
@ -50,40 +50,52 @@ def interrupt_wrapper(f):
|
|||||||
|
|
||||||
|
|
||||||
class RunItem():
|
class RunItem():
|
||||||
def __init__(self, target, args):
|
def __init__(self, target, args, key=None):
|
||||||
self.target = target
|
self.target = target
|
||||||
self.args = args
|
self.args = args
|
||||||
self.process = None
|
self.process = None
|
||||||
|
self.queue = None
|
||||||
|
self.key = key
|
||||||
|
|
||||||
|
class SemaphoreProcess(Process):
|
||||||
class SemaphoreProcess(multiprocessing.Process):
|
def __init__(self, semaphore, target, args, queue=None):
|
||||||
def __init__(self, semaphore, target, args):
|
Process.__init__(self)
|
||||||
multiprocessing.Process.__init__(self)
|
|
||||||
self.semaphore = semaphore
|
self.semaphore = semaphore
|
||||||
self.target = target
|
self.target = target
|
||||||
self.args = args
|
self.args = args
|
||||||
|
self.queue = queue
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
self.target(**self.args)
|
self.queue.put_nowait(self.target(**self.args))
|
||||||
finally:
|
finally:
|
||||||
logging.debug('finished call: %s' % self.target)
|
logging.debug('finished call: %s' % self.target)
|
||||||
self.semaphore.release()
|
self.semaphore.release()
|
||||||
|
|
||||||
|
|
||||||
def run_batch(item_list, maxthreads):
|
def run_batch(item_list, maxthreads, dict_result=False):
|
||||||
semaphore = multiprocessing.BoundedSemaphore(maxthreads)
|
semaphore = BoundedSemaphore(maxthreads)
|
||||||
try:
|
try:
|
||||||
for run_item in item_list:
|
for run_item in item_list:
|
||||||
semaphore.acquire(True)
|
semaphore.acquire(True)
|
||||||
|
run_item.queue = Queue()
|
||||||
p = SemaphoreProcess(target=run_item.target,
|
p = SemaphoreProcess(target=run_item.target,
|
||||||
semaphore=semaphore,
|
semaphore=semaphore,
|
||||||
args=run_item.args)
|
args=run_item.args,
|
||||||
|
queue=run_item.queue)
|
||||||
run_item.process = p
|
run_item.process = p
|
||||||
p.start()
|
p.start()
|
||||||
for run_item in item_list:
|
for run_item in item_list:
|
||||||
|
run_item.result = run_item.queue.get()
|
||||||
run_item.process.join()
|
run_item.process.join()
|
||||||
run_item.process = None
|
run_item.process = None
|
||||||
|
if dict_result:
|
||||||
|
result = {}
|
||||||
|
for run_item in item_list:
|
||||||
|
result[run_item.key] = run_item.result
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return [run_item.result for run_item in item_list]
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
for run_item in item_list:
|
for run_item in item_list:
|
||||||
if run_item.process:
|
if run_item.process:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user