Register worker functions once we are connected

This also re-registers once a server appears ensuring reconnection is
properly set up.

Change-Id: I9b915482494d1b273991cb8bdd1b6ad963cf25f2
This commit is contained in:
Joshua Hesketh 2014-03-05 16:49:22 +11:00
parent 38a1718c50
commit 9cd2f933a4
3 changed files with 10 additions and 2 deletions

View File

@ -23,7 +23,7 @@ from turbo_hipster.lib import utils
class Task(object):
""" A base object for running a job (aka Task) """
log = logging.getLogger("lib.models.Task")
def __init__(self, global_config, plugin_config, job_name):

View File

@ -47,6 +47,9 @@ class ZuulManager(threading.Thread):
self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port']
)
def register_functions(self):
hostname = os.uname()[1]
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
@ -67,6 +70,8 @@ class ZuulManager(threading.Thread):
self.gearman_worker.waitForServer()
if (not self.stopped() and self.gearman_worker.running and
self.gearman_worker.active_connections):
self.register_functions()
self.gearman_worker.waitForServer()
logging.debug("Waiting for job")
self.current_step = 0
job = self.gearman_worker.getJob()
@ -145,6 +150,8 @@ class ZuulClient(threading.Thread):
self.gearman_worker.waitForServer()
if (not self.stopped() and self.gearman_worker.running and
self.gearman_worker.active_connections):
self.register_functions()
self.gearman_worker.waitForServer()
self.log.debug("Waiting for job")
self.job = self.gearman_worker.getJob()
self._handle_job()

View File

@ -42,6 +42,7 @@ class Server(threading.Thread):
self.zuul_manager = None
self.zuul_client = None
self.plugins = []
self.services_started = False
# TODO: Make me unique (random?) and we should be able to run multiple
# instances of turbo-hipster on the one host
@ -94,7 +95,6 @@ class Server(threading.Thread):
self.zuul_client.add_function(plugin['plugin_config']['function'],
self.tasks[job_name])
self.zuul_client.register_functions()
self.zuul_client.start()
def start_zuul_manager(self):
@ -113,5 +113,6 @@ class Server(threading.Thread):
def run(self):
self.start_zuul_client()
self.start_zuul_manager()
self.services_started = True
while not self.stopped():
self._stop.wait()