
Iec6b3c56d464d26e4f1fc143e6a7804add67a35d I3f2d3a12fcb53759a906fcbae6fae768833d325e I566811521da16055a73c73052ffcd497aaa8e475 I2ee04b6d5aaa26d49243cf7e0b6045026f052625 I329620f3c8aa7e7f1bdd658cbaa8ea20d9aa4ba5 I5ff3d9146b4fbec74d8d65de84d7ab61d869725c Ib38fd52811812170bdd9bf9df90a66f1a2e6c8d9 I64ce3efaec6df2e402ca2acf6a3cf1a6f2bb1909 I66c3659ab0f33772d7a51c8961a37e32c65354c2 I29ce4a6ef165daa0fe60003301a0d807fd1cce42 Ibd2a4f55e2a64d9a992833200a791dbb20c41eca I16133a213ef25a1b374f10fa80cd5a03d1f40753 Ie09f32fcacfe70f436cad71e5749edf94be038ed Iaba6a6bf07ff223e41f705f0f1db5688a5290f5c I64a0474ecfe5ea38393fe681d520a7b6ce00d959 I270b3ce5ef776522a62d9622b36f0d6b50b9cc57 Ic5f6849ea166bb0295f84685b0a2b5c4701f972a I51190cb02255254a888f66404ecdc3dfc5be0386 I0c2180c603cd09e29d4e6c5e592b987e2b447972 Iae1cdbeb7fa3e49c2cb5cac7c92eceffef477e7e I6c643f58aada0a8525711bc452d0c581625f3d26 I9c4f999b1b3006b8ae5f18a030d5b30c7e85e03b I32eaad36edcb889b448c45ba36f4e97f7c87d1e5 I8c91c40a922690b475aac1c0a3b2c0c28274b130 I574fd1dbeea58dbf41f77d295dc03c23d2feaf96 Change-Id: I0ffb3c38c0c1b3aafa8617364e22036c47aaef76
174 lines
6.4 KiB
Python
174 lines
6.4 KiB
Python
# Copyright (c) 2013 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import glob
|
|
import sys
|
|
import traceback
|
|
|
|
import anyjson
|
|
import eventlet
|
|
from muranoconductor.openstack.common import service
|
|
from workflow import Workflow
|
|
from commands.dispatcher import CommandDispatcher
|
|
from openstack.common import log as logging
|
|
from config import Config
|
|
import reporting
|
|
from muranocommon.messaging import MqClient, Message
|
|
from muranocommon.helpers.token_sanitizer import TokenSanitizer
|
|
from muranoconductor import config as cfg
|
|
|
|
import windows_agent
|
|
import cloud_formation
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def secure_task(task):
|
|
sanitizer = TokenSanitizer()
|
|
return sanitizer.sanitize(task)
|
|
|
|
|
|
class ConductorWorkflowService(service.Service):
|
|
def __init__(self):
|
|
super(ConductorWorkflowService, self).__init__()
|
|
|
|
def start(self):
|
|
super(ConductorWorkflowService, self).start()
|
|
self.tg.add_thread(self._start_rabbitmq)
|
|
|
|
def stop(self):
|
|
super(ConductorWorkflowService, self).stop()
|
|
|
|
def create_rmq_client(self):
|
|
rabbitmq = cfg.CONF.rabbitmq
|
|
connection_params = {
|
|
'login': rabbitmq.login,
|
|
'password': rabbitmq.password,
|
|
'host': rabbitmq.host,
|
|
'port': rabbitmq.port,
|
|
'virtual_host': rabbitmq.virtual_host,
|
|
'ssl': rabbitmq.ssl,
|
|
'ca_certs': rabbitmq.ca_certs.strip() or None
|
|
}
|
|
return MqClient(**connection_params)
|
|
|
|
def _start_rabbitmq(self):
|
|
while True:
|
|
try:
|
|
with self.create_rmq_client() as mq:
|
|
mq.declare('tasks', 'tasks')
|
|
mq.declare('task-results')
|
|
with mq.open('tasks',
|
|
prefetch_count=
|
|
cfg.CONF.max_environments) as subscription:
|
|
while True:
|
|
msg = subscription.get_message(timeout=2)
|
|
if msg is not None:
|
|
eventlet.spawn(self._task_received, msg)
|
|
except Exception as ex:
|
|
log.exception(ex)
|
|
|
|
def _task_received(self, message):
|
|
task = message.body or {}
|
|
message_id = message.id
|
|
with self.create_rmq_client() as mq:
|
|
try:
|
|
log.info('Starting processing task {0}: {1}'.format(
|
|
message_id, anyjson.dumps(secure_task(task))))
|
|
reporter = reporting.Reporter(mq, message_id, task['id'])
|
|
config = Config()
|
|
|
|
command_dispatcher = CommandDispatcher('e' + task['id'], mq,
|
|
task['token'],
|
|
task['tenant_id'],
|
|
reporter)
|
|
workflows = []
|
|
for path in glob.glob("data/workflows/*.xml"):
|
|
log.debug('Loading XML {0}'.format(path))
|
|
workflow = Workflow(path, task, command_dispatcher, config,
|
|
reporter)
|
|
workflows.append(workflow)
|
|
|
|
stop = False
|
|
while not stop:
|
|
try:
|
|
for workflow in workflows:
|
|
workflow.prepare()
|
|
while True:
|
|
result = False
|
|
for workflow in workflows:
|
|
if workflow.execute():
|
|
result = True
|
|
if not result:
|
|
log.debug(
|
|
"No rules matched, "
|
|
"will now execute pending commands")
|
|
break
|
|
if not command_dispatcher.execute_pending():
|
|
log.debug("No pending commands found, "
|
|
"seems like we are done")
|
|
break
|
|
if self.check_stop_requested(task):
|
|
log.info("Workflow stop requested")
|
|
stop = True
|
|
except Exception as ex:
|
|
reporter.report_generic(
|
|
"Unexpected error has occurred", ex.message,
|
|
'error')
|
|
log.exception(ex)
|
|
break
|
|
command_dispatcher.close()
|
|
if stop:
|
|
log.info("Workflow stopped by 'stop' command")
|
|
finally:
|
|
self.cleanup(task, reporter)
|
|
result_msg = Message()
|
|
result_msg.body = task
|
|
result_msg.id = message_id
|
|
|
|
mq.send(message=result_msg, key='task-results')
|
|
message.ack()
|
|
log.info('Finished processing task {0}. Result = {1}'.format(
|
|
message_id, anyjson.dumps(secure_task(task))))
|
|
|
|
def cleanup(self, model, reporter):
|
|
try:
|
|
if 'token' in model:
|
|
del model['token']
|
|
|
|
if 'temp' in model:
|
|
del model['temp']
|
|
|
|
services = model.get('services', [])
|
|
for service in services:
|
|
if 'temp' in service:
|
|
del service['temp']
|
|
|
|
units = service.get('units', [])
|
|
for unit in units:
|
|
if 'temp' in unit:
|
|
del unit['temp']
|
|
except Exception as e:
|
|
log.exception("Unexpected exception has occurred")
|
|
if reporter:
|
|
reporter.report_generic("Unexpected error has occurred",
|
|
e.message, 'error')
|
|
|
|
def check_stop_requested(self, model):
|
|
if 'temp' in model:
|
|
if '_stop_requested' in model['temp']:
|
|
return model['temp']['_stop_requested']
|
|
return False
|