From e0e6e0eb73c1d89061c6fc0c28416febccf43ead Mon Sep 17 00:00:00 2001
From: Dmitry Shulyak <dshulyak@mirantis.com>
Date: Wed, 27 Jan 2016 17:37:29 +0200
Subject: [PATCH] Extensions mechanism for orchestration components

All current orchestator componentns loaded from -
  solar.orchestration.extensions

Each of them will be loaded according to prefered driver,
that will be specified in solar config, and added to appropriate
namespace, for example -
  solar.orchestration.drivers.tasks

related to blueprint stevedorize-orchestration

Change-Id: I1370f5a6cd8a7a0b23d58485a484a246717b7017
---
 setup.cfg                                     | 14 ++++
 solar/config.py                               |  3 +
 solar/orchestration/__init__.py               | 48 ++++++------
 solar/orchestration/extensions.py             | 76 +++++++++++++++++++
 solar/test/functional/conftest.py             | 18 +++++
 .../test_complete_solar_workflow.py           | 15 ++--
 6 files changed, 144 insertions(+), 30 deletions(-)
 create mode 100644 solar/orchestration/extensions.py

diff --git a/setup.cfg b/setup.cfg
index e0ebd438..801fc0aa 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -55,3 +55,17 @@ solar.handlers =
     naive_sync = solar.core.handlers.naive_sync:NaiveSync
 solar.orchestration.executors =
     zerorpc = solar.orchestration.executors.zerorpc_executor
+solar.orchestration.extensions =
+    tasks = solar.orchestration.extensions:tasks
+    system_log = solar.orchestration.extensions:system_log
+    scheduler = solar.orchestration.extensions:scheduler
+solar.orchestration.extensions_clients =
+    tasks = solar.orchestration.extensions:tasks_client
+    system_log = solar.orchestration.extensions:system_log_client
+    scheduler = solar.orchestration.extensions:scheduler_client
+solar.orchestration.drivers.tasks =
+    solar = solar.orchestration.workers.tasks:Tasks
+solar.orchestration.drivers.scheduler =
+    solar = solar.orchestration.workers.scheduler:Scheduler
+solar.orchestration.drivers.system_log =
+    solar = solar.orchestration.workers.system_log:SystemLog
diff --git a/solar/config.py b/solar/config.py
index 6637b50f..ecbf81ca 100644
--- a/solar/config.py
+++ b/solar/config.py
@@ -35,6 +35,9 @@ C.tasks_address = 'ipc:///tmp/solar_tasks'
 C.scheduler_address = 'ipc:///tmp/solar_scheduler'
 C.timewatcher_address = 'ipc:///tmp/solar_timewatcher'
 C.executor = 'zerorpc'
+C.tasks_driver = 'solar'
+C.scheduler_driver = 'solar'
+C.system_log_driver = 'solar'
 
 
 def _lookup_vals(setter, config, prefix=None):
diff --git a/solar/orchestration/__init__.py b/solar/orchestration/__init__.py
index 7841bc02..3aff3328 100644
--- a/solar/orchestration/__init__.py
+++ b/solar/orchestration/__init__.py
@@ -15,54 +15,56 @@
 from solar.config import C
 from solar.core.log import log
 from solar.dblayer import ModelMeta
-from solar.orchestration.executors import Client
+from solar.orchestration import extensions as loader
 from solar.orchestration.executors import Executor
-from solar.orchestration.workers import scheduler as wscheduler
-from solar.orchestration.workers.system_log import SystemLog
-from solar.orchestration.workers.tasks import Tasks
+from solar.orchestration.workers.scheduler import SchedulerCallbackClient
 
 
-SCHEDULER_CLIENT = Client(C.scheduler_address)
+SCHEDULER_CLIENT = loader.get_client('scheduler')
 
 
-def construct_scheduler(tasks_address, scheduler_address):
-    scheduler = wscheduler.Scheduler(Client(tasks_address))
-    scheduler_executor = Executor(scheduler, scheduler_address)
+def construct_scheduler(extensions, clients):
+    scheduler = extensions['scheduler']
+    scheduler_executor = Executor(
+        scheduler, clients['scheduler'].connect_to)
     scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
     scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
-    Executor(scheduler, scheduler_address).run()
+    scheduler_executor.run()
 
 
-def construct_system_log(system_log_address):
-    syslog = SystemLog()
+def construct_system_log(extensions, clients):
+    syslog = extensions['system_log']
     syslog.for_all.before(lambda ctxt: ModelMeta.session_start())
     syslog.for_all.after(lambda ctxt: ModelMeta.session_end())
-    Executor(syslog, system_log_address).run()
+    Executor(syslog, clients['system_log'].connect_to).run()
 
 
-def construct_tasks(system_log_address, tasks_address, scheduler_address):
-    syslog = Client(system_log_address)
-    scheduler = wscheduler.SchedulerCallbackClient(
-        Client(scheduler_address))
-    tasks = Tasks()
-    tasks_executor = Executor(tasks, tasks_address)
+def construct_tasks(extensions, clients):
+    syslog = clients['system_log']
+    # FIXME will be solved by hooks on certain events
+    # solar.orchestraion.extensions.tasks.before =
+    #   1 = solar.orchestration.workers.scheduler:subscribe
+    scheduler = SchedulerCallbackClient(clients['scheduler'])
+    tasks = extensions['tasks']
+    tasks_executor = Executor(tasks, clients['tasks'].connect_to)
     tasks.for_all.before(tasks_executor.register_task)
     tasks.for_all.on_success(syslog.commit)
     tasks.for_all.on_error(syslog.error)
     tasks.for_all.on_success(scheduler.update)
     tasks.for_all.on_error(scheduler.error)
-    Executor(tasks, tasks_address).run()
+    tasks_executor.run()
 
 
 def main():
     import sys
     from gevent import spawn
     from gevent import joinall
+    clients = loader.get_clients()
+    mgr = loader.get_extensions(clients)
     servers = [
-        spawn(construct_scheduler, C.tasks_address, C.scheduler_address),
-        spawn(construct_system_log, C.system_log_address),
-        spawn(construct_tasks, C.system_log_address, C.tasks_address,
-              C.scheduler_address)
+        spawn(construct_scheduler, mgr, clients),
+        spawn(construct_system_log, mgr, clients),
+        spawn(construct_tasks, mgr, clients)
         ]
     try:
         log.info('Spawning scheduler, system log and tasks workers.')
diff --git a/solar/orchestration/extensions.py b/solar/orchestration/extensions.py
new file mode 100644
index 00000000..b31bc1bf
--- /dev/null
+++ b/solar/orchestration/extensions.py
@@ -0,0 +1,76 @@
+#    Copyright 2015 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from functools import partial
+
+from stevedore import driver
+from stevedore import extension
+
+from solar.config import C
+from solar.orchestration.executors import Client
+
+
+def client(address):
+    return Client(address)
+
+
+tasks_client = partial(client, C.tasks_address)
+scheduler_client = partial(client, C.scheduler_address)
+system_log_client = partial(client, C.system_log_address)
+
+
+def get_driver(extension, implementation):
+    mgr = driver.DriverManager(
+        namespace='solar.orchestration.drivers.%s' % extension,
+        name=implementation,
+        invoke_on_load=False,
+    )
+    return mgr.driver
+
+
+def tasks(clients):
+    return get_driver('tasks', C.tasks_driver)()
+
+
+def scheduler(clients):
+    return get_driver('scheduler', C.scheduler_driver)(clients['tasks'])
+
+
+def system_log(clients):
+    return get_driver('system_log', C.system_log_driver)()
+
+
+class GetObjExtensionManager(extension.ExtensionManager):
+
+    def __getitem__(self, name):
+        ext = super(GetObjExtensionManager, self).__getitem__(name)
+        return ext.obj
+
+
+def get_clients():
+    return GetObjExtensionManager(
+        namespace='solar.orchestration.extensions_clients',
+        invoke_on_load=True)
+
+
+def get_client(name):
+    return get_clients()[name]
+
+
+def get_extensions(clients):
+    ext = GetObjExtensionManager(
+        namespace='solar.orchestration.extensions',
+        invoke_on_load=True,
+        invoke_args=(clients,))
+    return ext
diff --git a/solar/test/functional/conftest.py b/solar/test/functional/conftest.py
index becc659e..56b2ca8f 100644
--- a/solar/test/functional/conftest.py
+++ b/solar/test/functional/conftest.py
@@ -22,6 +22,7 @@ import pytest
 from solar.core.log import log
 from solar.dblayer.model import ModelMeta
 from solar.orchestration import executors
+from solar.orchestration import extensions as loader
 from solar.orchestration import workers
 
 
@@ -86,3 +87,20 @@ def tasks(request, tasks_address):
 
     gevent.spawn(executor.run)
     return worker, executors.Client(tasks_address)
+
+
+@pytest.fixture
+def clients(request):
+    rst = {}
+    rst['tasks'] = executors.Client(request.getfuncargvalue(
+        'tasks_address'))
+    rst['scheduler'] = executors.Client(request.getfuncargvalue(
+        'scheduler_address'))
+    rst['system_log'] = executors.Client(request.getfuncargvalue(
+        'system_log_address'))
+    return rst
+
+
+@pytest.fixture
+def extensions(clients):
+    return loader.get_extensions(clients)
diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py
index f6a0e0d5..66b509bb 100644
--- a/solar/test/functional/test_complete_solar_workflow.py
+++ b/solar/test/functional/test_complete_solar_workflow.py
@@ -33,24 +33,24 @@ def scheduler_client(scheduler_address):
 
 
 @pytest.fixture(autouse=True)
-def tasks(system_log_address, tasks_address, scheduler_address):
+def tasks(extensions, clients):
     gevent.spawn(
         orchestration.construct_tasks,
-        system_log_address, tasks_address, scheduler_address)
+        extensions, clients)
 
 
 @pytest.fixture(autouse=True)
-def scheduler(tasks_address, scheduler_address):
+def scheduler(extensions, clients):
     gevent.spawn(
         orchestration.construct_scheduler,
-        tasks_address, scheduler_address)
+        extensions, clients)
 
 
 @pytest.fixture(autouse=True)
-def system_log(system_log_address):
+def system_log(extensions, clients):
     gevent.spawn(
         orchestration.construct_system_log,
-        system_log_address)
+        extensions, clients)
 
 
 @pytest.fixture(autouse=True)
@@ -62,9 +62,10 @@ def resources(request, sequence_vr):
 
 
 @pytest.mark.parametrize('scale', [10])
-def test_concurrent_sequences_with_no_handler(scale, scheduler_client):
+def test_concurrent_sequences_with_no_handler(scale, clients):
     total_resources = scale * 3
     timeout = scale * 2
+    scheduler_client = clients['scheduler']
 
     assert len(change.stage_changes()) == total_resources
     plan = change.send_to_orchestration()