Removed unneeded binding
This commit is contained in:
parent
5904dd35d3
commit
c313c0959a
@ -1,89 +1,89 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import glob
|
import glob
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import anyjson
|
import anyjson
|
||||||
from conductor.openstack.common import service
|
from conductor.openstack.common import service
|
||||||
from workflow import Workflow
|
from workflow import Workflow
|
||||||
from commands.dispatcher import CommandDispatcher
|
from commands.dispatcher import CommandDispatcher
|
||||||
from openstack.common import log as logging
|
from openstack.common import log as logging
|
||||||
from config import Config
|
from config import Config
|
||||||
import reporting
|
import reporting
|
||||||
import rabbitmq
|
import rabbitmq
|
||||||
|
|
||||||
import windows_agent
|
import windows_agent
|
||||||
import cloud_formation
|
import cloud_formation
|
||||||
|
|
||||||
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
|
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def task_received(task, message_id):
|
def task_received(task, message_id):
|
||||||
with rabbitmq.RmqClient() as rmqclient:
|
with rabbitmq.RmqClient() as rmqclient:
|
||||||
log.info('Starting processing task {0}: {1}'.format(
|
log.info('Starting processing task {0}: {1}'.format(
|
||||||
message_id, anyjson.dumps(task)))
|
message_id, anyjson.dumps(task)))
|
||||||
reporter = reporting.Reporter(rmqclient, message_id, task['id'])
|
reporter = reporting.Reporter(rmqclient, message_id, task['id'])
|
||||||
|
|
||||||
command_dispatcher = CommandDispatcher(
|
command_dispatcher = CommandDispatcher(
|
||||||
task['id'], rmqclient, task['token'])
|
task['id'], rmqclient, task['token'])
|
||||||
workflows = []
|
workflows = []
|
||||||
for path in glob.glob("data/workflows/*.xml"):
|
for path in glob.glob("data/workflows/*.xml"):
|
||||||
log.debug('Loading XML {0}'.format(path))
|
log.debug('Loading XML {0}'.format(path))
|
||||||
workflow = Workflow(path, task, command_dispatcher, config,
|
workflow = Workflow(path, task, command_dispatcher, config,
|
||||||
reporter)
|
reporter)
|
||||||
workflows.append(workflow)
|
workflows.append(workflow)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
result = False
|
result = False
|
||||||
for workflow in workflows:
|
for workflow in workflows:
|
||||||
if workflow.execute():
|
if workflow.execute():
|
||||||
result = True
|
result = True
|
||||||
if not result:
|
if not result:
|
||||||
break
|
break
|
||||||
if not command_dispatcher.execute_pending():
|
if not command_dispatcher.execute_pending():
|
||||||
break
|
break
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
log.exception(ex)
|
log.exception(ex)
|
||||||
break
|
break
|
||||||
|
|
||||||
command_dispatcher.close()
|
command_dispatcher.close()
|
||||||
|
|
||||||
del task['token']
|
del task['token']
|
||||||
result_msg = rabbitmq.Message()
|
result_msg = rabbitmq.Message()
|
||||||
result_msg.body = task
|
result_msg.body = task
|
||||||
result_msg.id = message_id
|
result_msg.id = message_id
|
||||||
|
|
||||||
rmqclient.send(message=result_msg, key='task-results')
|
rmqclient.send(message=result_msg, key='task-results')
|
||||||
log.info('Finished processing task {0}. Result = {1}'.format(
|
log.info('Finished processing task {0}. Result = {1}'.format(
|
||||||
message_id, anyjson.dumps(task)))
|
message_id, anyjson.dumps(task)))
|
||||||
|
|
||||||
|
|
||||||
class ConductorWorkflowService(service.Service):
|
class ConductorWorkflowService(service.Service):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ConductorWorkflowService, self).__init__()
|
super(ConductorWorkflowService, self).__init__()
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super(ConductorWorkflowService, self).start()
|
super(ConductorWorkflowService, self).start()
|
||||||
self.tg.add_thread(self._start_rabbitmq)
|
self.tg.add_thread(self._start_rabbitmq)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
super(ConductorWorkflowService, self).stop()
|
super(ConductorWorkflowService, self).stop()
|
||||||
|
|
||||||
def _start_rabbitmq(self):
|
def _start_rabbitmq(self):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with rabbitmq.RmqClient() as rmq:
|
with rabbitmq.RmqClient() as rmq:
|
||||||
rmq.declare('tasks', 'tasks')
|
rmq.declare('tasks', 'tasks')
|
||||||
rmq.declare('task-results', 'tasks')
|
rmq.declare('task-results')
|
||||||
with rmq.open('tasks') as subscription:
|
with rmq.open('tasks') as subscription:
|
||||||
while True:
|
while True:
|
||||||
msg = subscription.get_message()
|
msg = subscription.get_message()
|
||||||
self.tg.add_thread(
|
self.tg.add_thread(
|
||||||
task_received, msg.body, msg.id)
|
task_received, msg.body, msg.id)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
log.exception(ex)
|
log.exception(ex)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user