diff --git a/setup.cfg b/setup.cfg index 801fc0aa..d37398e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,3 +69,16 @@ solar.orchestration.drivers.scheduler = solar = solar.orchestration.workers.scheduler:Scheduler solar.orchestration.drivers.system_log = solar = solar.orchestration.workers.system_log:SystemLog +solar.orchestration.hooks.tasks.construct = + scheduler_sub = solar.orchestration.workers.scheduler:tasks_subscribe + system_log_sub = solar.orchestration.workers.system_log:tasks_subscribe +solar.orchestration.hooks.system_log.construct = + session_sub = solar.orchestration:wrap_session +solar.orchestration.hooks.scheduler.construct = + session_sub = solar.orchestration:wrap_session +solar.orchestration.runners = + gevent = solar.orchestration.runners.gevent_runner:run_all +solar.orchestration.constructors = + tasks = solar.orchestration:construct_tasks + system_log = solar.orchestration:construct_system_log + scheduler = solar.orchestration:construct_scheduler diff --git a/solar/config.py b/solar/config.py index ecbf81ca..5304a32e 100644 --- a/solar/config.py +++ b/solar/config.py @@ -38,6 +38,7 @@ C.executor = 'zerorpc' C.tasks_driver = 'solar' C.scheduler_driver = 'solar' C.system_log_driver = 'solar' +C.runner = 'gevent' def _lookup_vals(setter, config, prefix=None): diff --git a/solar/orchestration/__init__.py b/solar/orchestration/__init__.py index 3aff3328..e27207bb 100644 --- a/solar/orchestration/__init__.py +++ b/solar/orchestration/__init__.py @@ -17,58 +17,42 @@ from solar.core.log import log from solar.dblayer import ModelMeta from solar.orchestration import extensions as loader from solar.orchestration.executors import Executor -from solar.orchestration.workers.scheduler import SchedulerCallbackClient SCHEDULER_CLIENT = loader.get_client('scheduler') +def wrap_session(extension, clients): + log.debug('DB session for %r', extension) + extension.for_all.before(lambda ctxt: ModelMeta.session_start()) + extension.for_all.after(lambda ctxt: ModelMeta.session_end()) + + def construct_scheduler(extensions, clients): scheduler = extensions['scheduler'] + loader.load_contruct_hooks('scheduler', extensions, clients) scheduler_executor = Executor( scheduler, clients['scheduler'].connect_to) - scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) - scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) scheduler_executor.run() def construct_system_log(extensions, clients): syslog = extensions['system_log'] - syslog.for_all.before(lambda ctxt: ModelMeta.session_start()) - syslog.for_all.after(lambda ctxt: ModelMeta.session_end()) + loader.load_contruct_hooks('system_log', extensions, clients) Executor(syslog, clients['system_log'].connect_to).run() def construct_tasks(extensions, clients): - syslog = clients['system_log'] - # FIXME will be solved by hooks on certain events - # solar.orchestraion.extensions.tasks.before = - # 1 = solar.orchestration.workers.scheduler:subscribe - scheduler = SchedulerCallbackClient(clients['scheduler']) tasks = extensions['tasks'] + loader.load_contruct_hooks('tasks', extensions, clients) tasks_executor = Executor(tasks, clients['tasks'].connect_to) tasks.for_all.before(tasks_executor.register_task) - tasks.for_all.on_success(syslog.commit) - tasks.for_all.on_error(syslog.error) - tasks.for_all.on_success(scheduler.update) - tasks.for_all.on_error(scheduler.error) tasks_executor.run() def main(): - import sys - from gevent import spawn - from gevent import joinall + runner = loader.get_runner(C.runner) + constructors = loader.get_constructors() clients = loader.get_clients() - mgr = loader.get_extensions(clients) - servers = [ - spawn(construct_scheduler, mgr, clients), - spawn(construct_system_log, mgr, clients), - spawn(construct_tasks, mgr, clients) - ] - try: - log.info('Spawning scheduler, system log and tasks workers.') - joinall(servers) - except KeyboardInterrupt: - log.info('Exit solar-worker') - sys.exit() + exts = loader.get_extensions(clients) + runner.driver(constructors, exts, clients) diff --git a/solar/orchestration/extensions.py b/solar/orchestration/extensions.py index b31bc1bf..00cbe0f7 100644 --- a/solar/orchestration/extensions.py +++ b/solar/orchestration/extensions.py @@ -74,3 +74,23 @@ def get_extensions(clients): invoke_on_load=True, invoke_args=(clients,)) return ext + + +def load_contruct_hooks(name, extensions, clients): + extension.ExtensionManager( + namespace='solar.orchestration.hooks.{}.construct'.format(name), + invoke_on_load=True, + invoke_args=(extensions[name], clients)) + + +def get_runner(name): + return driver.DriverManager( + namespace='solar.orchestration.runners', + name=name, + invoke_on_load=False) + + +def get_constructors(): + return extension.ExtensionManager( + namespace='solar.orchestration.constructors', + invoke_on_load=False) diff --git a/solar/orchestration/runners/__init__.py b/solar/orchestration/runners/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/solar/orchestration/runners/gevent_runner.py b/solar/orchestration/runners/gevent_runner.py new file mode 100644 index 00000000..0c893b48 --- /dev/null +++ b/solar/orchestration/runners/gevent_runner.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +import gevent + +from solar.core.log import log + + +def run_all(construct_manager, extensions, clients): + + def _spawn(constructor, extensions, clients): + return gevent.spawn(constructor.plugin, extensions, clients) + + try: + log.info('Spawning scheduler, system log and tasks workers.') + gevent.joinall( + construct_manager.map(_spawn, extensions, clients)) + except KeyboardInterrupt: + log.info('Exit solar-worker') + sys.exit() diff --git a/solar/orchestration/workers/__init__.py b/solar/orchestration/workers/__init__.py index b2b34f5f..4534c9d8 100644 --- a/solar/orchestration/workers/__init__.py +++ b/solar/orchestration/workers/__init__.py @@ -1,5 +1,4 @@ -# -# Copyright 2015 Mirantis, Inc. +# Copyright 2015 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,9 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# -from solar.orchestration.workers.scheduler import Scheduler -from solar.orchestration.workers.scheduler import SchedulerCallbackClient -from solar.orchestration.workers.system_log import SystemLog -from solar.orchestration.workers.tasks import Tasks +import solar.orchestration.workers.scheduler +import solar.orchestration.workers.system_log +import solar.orchestration.workers.tasks diff --git a/solar/orchestration/workers/scheduler.py b/solar/orchestration/workers/scheduler.py index 4c8699cf..d1f6a39a 100644 --- a/solar/orchestration/workers/scheduler.py +++ b/solar/orchestration/workers/scheduler.py @@ -140,3 +140,10 @@ class SchedulerCallbackClient(object): def error(self, ctxt, result, *args, **kwargs): self.client.update_next(ctxt, states.ERROR.name, repr(result)) + + +def tasks_subscribe(tasks, clients): + log.debug('Scheduler subscribes to tasks hooks') + scheduler = SchedulerCallbackClient(clients['scheduler']) + tasks.for_all.on_success(scheduler.update) + tasks.for_all.on_error(scheduler.error) diff --git a/solar/orchestration/workers/system_log.py b/solar/orchestration/workers/system_log.py index ab2c72f7..38ee6864 100644 --- a/solar/orchestration/workers/system_log.py +++ b/solar/orchestration/workers/system_log.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +from solar.core.log import log from solar.orchestration.workers import base from solar.system_log.operations import move_to_commited from solar.system_log.operations import set_error @@ -24,3 +25,10 @@ class SystemLog(base.Worker): def error(self, ctxt, *args, **kwargs): return set_error(ctxt['task_id'].rsplit(':', 1)[-1]) + + +def tasks_subscribe(tasks, clients): + log.debug('System log subscribes to tasks hooks') + syslog = clients['system_log'] + tasks.for_all.on_success(syslog.commit) + tasks.for_all.on_error(syslog.error) diff --git a/solar/test/functional/conftest.py b/solar/test/functional/conftest.py index 56b2ca8f..3d9bcafd 100644 --- a/solar/test/functional/conftest.py +++ b/solar/test/functional/conftest.py @@ -18,7 +18,7 @@ import string import gevent import pytest - +from solar.config import C from solar.core.log import log from solar.dblayer.model import ModelMeta from solar.orchestration import executors @@ -104,3 +104,13 @@ def clients(request): @pytest.fixture def extensions(clients): return loader.get_extensions(clients) + + +@pytest.fixture +def runner(): + return loader.get_runner(C.runner) + + +@pytest.fixture +def constructors(): + return loader.get_constructors() diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py index 66b509bb..744f58ac 100644 --- a/solar/test/functional/test_complete_solar_workflow.py +++ b/solar/test/functional/test_complete_solar_workflow.py @@ -33,24 +33,9 @@ def scheduler_client(scheduler_address): @pytest.fixture(autouse=True) -def tasks(extensions, clients): - gevent.spawn( - orchestration.construct_tasks, - extensions, clients) - - -@pytest.fixture(autouse=True) -def scheduler(extensions, clients): - gevent.spawn( - orchestration.construct_scheduler, - extensions, clients) - - -@pytest.fixture(autouse=True) -def system_log(extensions, clients): - gevent.spawn( - orchestration.construct_system_log, - extensions, clients) +def prepare_all(constructors, extensions, clients): + for cons in constructors: + gevent.spawn(cons.plugin, extensions, clients) @pytest.fixture(autouse=True) @@ -61,7 +46,7 @@ def resources(request, sequence_vr): 'sequence_%s' % idx, sequence_vr, inputs={'idx': idx}) -@pytest.mark.parametrize('scale', [10]) +@pytest.mark.parametrize('scale', [3]) def test_concurrent_sequences_with_no_handler(scale, clients): total_resources = scale * 3 timeout = scale * 2