diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py
index da17e9b..fe44f2d 100644
--- a/conductor/conductor/app.py
+++ b/conductor/conductor/app.py
@@ -9,6 +9,9 @@ from config import Config
import reporting
import rabbitmq
+import windows_agent
+import cloud_formation
+
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
@@ -18,7 +21,7 @@ def task_received(task, message_id):
reporter = reporting.Reporter(rmqclient, message_id, task['id'])
command_dispatcher = CommandDispatcher(
- task['name'], rmqclient, task['token'])
+ task['id'], rmqclient, task['token'])
workflows = []
for path in glob.glob("data/workflows/*.xml"):
print "loading", path
@@ -28,14 +31,21 @@ def task_received(task, message_id):
while True:
try:
- for workflow in workflows:
- workflow.execute()
+ while True:
+ result = False
+ for workflow in workflows:
+ if workflow.execute():
+ result = True
+ if not result:
+ break
if not command_dispatcher.execute_pending():
break
except Exception:
break
command_dispatcher.close()
+
+ del task['token']
result_msg = rabbitmq.Message()
result_msg.body = task
result_msg.id = message_id
@@ -59,9 +69,9 @@ class ConductorWorkflowService(service.Service):
while True:
try:
with rabbitmq.RmqClient() as rmq:
- rmq.declare('tasks', 'tasks')
- rmq.declare('task-results', 'tasks')
- with rmq.open('tasks') as subscription:
+ rmq.declare('tasks2', 'tasks2')
+ rmq.declare('task-results', 'tasks2')
+ with rmq.open('tasks2') as subscription:
while True:
msg = subscription.get_message()
self.tg.add_thread(
diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py
index 7303801..9798832 100644
--- a/conductor/conductor/cloud_formation.py
+++ b/conductor/conductor/cloud_formation.py
@@ -2,6 +2,8 @@ import base64
import xml_code_engine
import config
+from random import choice
+import time
def update_cf_stack(engine, context, body, template,
mappings, arguments, **kwargs):
@@ -16,7 +18,7 @@ def update_cf_stack(engine, context, body, template,
arguments=arguments, callback=callback)
-def prepare_user_data(context, template='Default', **kwargs):
+def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs):
settings = config.CONF.rabbitmq
with open('data/init.ps1') as init_script_file:
@@ -26,17 +28,36 @@ def prepare_user_data(context, template='Default', **kwargs):
template_data = template_file.read()
template_data = template_data.replace(
'%RABBITMQ_HOST%', settings.host)
+ template_data = template_data.replace(
+ '%RABBITMQ_INPUT_QUEUE%',
+ '-'.join([str(context['/dataSource']['id']),
+ str(service), str(unit)])
+ )
template_data = template_data.replace(
'%RESULT_QUEUE%',
- '-execution-results-%s' % str(context['/dataSource']['name']))
+ '-execution-results-%s' % str(context['/dataSource']['id']))
- return init_script.replace(
+ init_script = init_script.replace(
'%WINDOWS_AGENT_CONFIG_BASE64%',
base64.b64encode(template_data))
+ init_script = init_script.replace('%INTERNAL_HOSTNAME%', hostname)
+
+ return init_script
+
+def generate_hostname(**kwargs):
+ chars = 'abcdefghijklmnopqrstuvwxyz'
+ prefix = ''.join(choice(chars) for _ in range(4))
+ return prefix + str(int(time.time() * 10))
+
+
+
xml_code_engine.XmlCodeEngine.register_function(
update_cf_stack, "update-cf-stack")
xml_code_engine.XmlCodeEngine.register_function(
- prepare_user_data, "prepare_user_data")
+ prepare_user_data, "prepare-user-data")
+
+xml_code_engine.XmlCodeEngine.register_function(
+ generate_hostname, "generate-hostname")
diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py
index ab62ac5..44f4e08 100644
--- a/conductor/conductor/commands/cloud_formation.py
+++ b/conductor/conductor/commands/cloud_formation.py
@@ -67,18 +67,24 @@ class HeatExecutor(CommandBase):
# ])
try:
+ print 'try update'
self._heat_client.stacks.update(
stack_id=self._stack,
parameters=arguments,
template=template)
+ print 'wait update'
self._wait_state('UPDATE_COMPLETE')
except heatclient.exc.HTTPNotFound:
+ print 'try create'
+
self._heat_client.stacks.create(
stack_name=self._stack,
parameters=arguments,
template=template)
+ print 'wait create'
self._wait_state('CREATE_COMPLETE')
+
pending_list = self._pending_list
self._pending_list = []
diff --git a/conductor/conductor/commands/dispatcher.py b/conductor/conductor/commands/dispatcher.py
index 8208856..d44f224 100644
--- a/conductor/conductor/commands/dispatcher.py
+++ b/conductor/conductor/commands/dispatcher.py
@@ -8,7 +8,7 @@ class CommandDispatcher(command.CommandBase):
self._command_map = {
'cf': cloud_formation.HeatExecutor(environment_id, token),
'agent': windows_agent.WindowsAgentExecutor(
- environment_id, rmqclient, environment_id)
+ environment_id, rmqclient)
}
def execute(self, name, **kwargs):
diff --git a/conductor/conductor/commands/windows_agent.py b/conductor/conductor/commands/windows_agent.py
index 978ddd2..8573489 100644
--- a/conductor/conductor/commands/windows_agent.py
+++ b/conductor/conductor/commands/windows_agent.py
@@ -7,14 +7,14 @@ from command import CommandBase
class WindowsAgentExecutor(CommandBase):
- def __init__(self, stack, rmqclient, environment):
+ def __init__(self, stack, rmqclient):
self._stack = stack
self._rmqclient = rmqclient
self._pending_list = []
- self._results_queue = '-execution-results-%s' % str(environment)
+ self._results_queue = '-execution-results-%s' % str(stack)
rmqclient.declare(self._results_queue)
- def execute(self, template, mappings, host, callback):
+ def execute(self, template, mappings, host, service, callback):
with open('data/templates/agent/%s.template' %
template) as template_file:
template_data = template_file.read()
@@ -23,7 +23,7 @@ class WindowsAgentExecutor(CommandBase):
json.loads(template_data), mappings)
id = str(uuid.uuid4()).lower()
- host = ('%s-%s' % (self._stack, host)).lower().replace(' ', '-')
+ host = ('%s-%s-%s' % (self._stack, service, host)).lower()
self._pending_list.append({
'id': id,
'callback': callback
diff --git a/conductor/conductor/helpers.py b/conductor/conductor/helpers.py
index 435a35b..b133836 100644
--- a/conductor/conductor/helpers.py
+++ b/conductor/conductor/helpers.py
@@ -1,6 +1,5 @@
import types
-
def transform_json(json, mappings):
if isinstance(json, types.ListType):
return [transform_json(t, mappings) for t in json]
@@ -47,3 +46,7 @@ def find(f, seq):
return item, index
index += 1
return None, -1
+
+
+def generate(length):
+ return ''.join(choice(chars) for _ in range(length))
diff --git a/conductor/conductor/windows_agent.py b/conductor/conductor/windows_agent.py
index 2e7761c..3a4fda7 100644
--- a/conductor/conductor/windows_agent.py
+++ b/conductor/conductor/windows_agent.py
@@ -1,7 +1,7 @@
import xml_code_engine
-def send_command(engine, context, body, template, host, mappings=None,
+def send_command(engine, context, body, template, service, host, mappings=None,
result=None, **kwargs):
if not mappings: mappings = {}
command_dispatcher = context['/commandDispatcher']
@@ -18,7 +18,7 @@ def send_command(engine, context, body, template, host, mappings=None,
command_dispatcher.execute(name='agent',
template=template, mappings=mappings,
- host=host, callback=callback)
+ host=host, service=service, callback=callback)
xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command")
\ No newline at end of file
diff --git a/conductor/conductor/workflow.py b/conductor/conductor/workflow.py
index 46d1d3d..94d357e 100644
--- a/conductor/conductor/workflow.py
+++ b/conductor/conductor/workflow.py
@@ -17,14 +17,12 @@ class Workflow(object):
self._reporter = reporter
def execute(self):
- while True:
- context = function_context.Context()
- context['/dataSource'] = self._data
- context['/commandDispatcher'] = self._command_dispatcher
- context['/config'] = self._config
- context['/reporter'] = self._reporter
- if not self._engine.execute(context):
- break
+ context = function_context.Context()
+ context['/dataSource'] = self._data
+ context['/commandDispatcher'] = self._command_dispatcher
+ context['/config'] = self._config
+ context['/reporter'] = self._reporter
+ return self._engine.execute(context)
@staticmethod
def _get_path(obj, path, create_non_existing=False):
diff --git a/conductor/data/templates/agent-config/Default.template b/conductor/data/templates/agent-config/Default.template
index 179bfb8..ff5c3c4 100644
--- a/conductor/data/templates/agent-config/Default.template
+++ b/conductor/data/templates/agent-config/Default.template
@@ -22,6 +22,7 @@
+
diff --git a/conductor/data/workflows/AD.xml b/conductor/data/workflows/AD.xml
index 929069f..c394fbd 100644
--- a/conductor/data/workflows/AD.xml
+++ b/conductor/data/workflows/AD.xml
@@ -5,7 +5,7 @@
-
+
Creating instance
@@ -13,11 +13,13 @@
@@ -44,6 +46,9 @@
+
+
+
-
+
Creating instance
@@ -13,11 +13,13 @@
-
-
-
+ WS--
-
+
+
+
+
+
@@ -48,6 +50,9 @@
+
+
+