Extensions mechanism for orchestration components

All current orchestator componentns loaded from -
  solar.orchestration.extensions

Each of them will be loaded according to prefered driver,
that will be specified in solar config, and added to appropriate
namespace, for example -
  solar.orchestration.drivers.tasks

related to blueprint stevedorize-orchestration

Change-Id: I1370f5a6cd8a7a0b23d58485a484a246717b7017
This commit is contained in:
Dmitry Shulyak 2016-01-27 17:37:29 +02:00
parent 2d5b0a87bc
commit e0e6e0eb73
6 changed files with 144 additions and 30 deletions

View File

@ -55,3 +55,17 @@ solar.handlers =
naive_sync = solar.core.handlers.naive_sync:NaiveSync naive_sync = solar.core.handlers.naive_sync:NaiveSync
solar.orchestration.executors = solar.orchestration.executors =
zerorpc = solar.orchestration.executors.zerorpc_executor zerorpc = solar.orchestration.executors.zerorpc_executor
solar.orchestration.extensions =
tasks = solar.orchestration.extensions:tasks
system_log = solar.orchestration.extensions:system_log
scheduler = solar.orchestration.extensions:scheduler
solar.orchestration.extensions_clients =
tasks = solar.orchestration.extensions:tasks_client
system_log = solar.orchestration.extensions:system_log_client
scheduler = solar.orchestration.extensions:scheduler_client
solar.orchestration.drivers.tasks =
solar = solar.orchestration.workers.tasks:Tasks
solar.orchestration.drivers.scheduler =
solar = solar.orchestration.workers.scheduler:Scheduler
solar.orchestration.drivers.system_log =
solar = solar.orchestration.workers.system_log:SystemLog

View File

@ -35,6 +35,9 @@ C.tasks_address = 'ipc:///tmp/solar_tasks'
C.scheduler_address = 'ipc:///tmp/solar_scheduler' C.scheduler_address = 'ipc:///tmp/solar_scheduler'
C.timewatcher_address = 'ipc:///tmp/solar_timewatcher' C.timewatcher_address = 'ipc:///tmp/solar_timewatcher'
C.executor = 'zerorpc' C.executor = 'zerorpc'
C.tasks_driver = 'solar'
C.scheduler_driver = 'solar'
C.system_log_driver = 'solar'
def _lookup_vals(setter, config, prefix=None): def _lookup_vals(setter, config, prefix=None):

View File

@ -15,54 +15,56 @@
from solar.config import C from solar.config import C
from solar.core.log import log from solar.core.log import log
from solar.dblayer import ModelMeta from solar.dblayer import ModelMeta
from solar.orchestration.executors import Client from solar.orchestration import extensions as loader
from solar.orchestration.executors import Executor from solar.orchestration.executors import Executor
from solar.orchestration.workers import scheduler as wscheduler from solar.orchestration.workers.scheduler import SchedulerCallbackClient
from solar.orchestration.workers.system_log import SystemLog
from solar.orchestration.workers.tasks import Tasks
SCHEDULER_CLIENT = Client(C.scheduler_address) SCHEDULER_CLIENT = loader.get_client('scheduler')
def construct_scheduler(tasks_address, scheduler_address): def construct_scheduler(extensions, clients):
scheduler = wscheduler.Scheduler(Client(tasks_address)) scheduler = extensions['scheduler']
scheduler_executor = Executor(scheduler, scheduler_address) scheduler_executor = Executor(
scheduler, clients['scheduler'].connect_to)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(scheduler, scheduler_address).run() scheduler_executor.run()
def construct_system_log(system_log_address): def construct_system_log(extensions, clients):
syslog = SystemLog() syslog = extensions['system_log']
syslog.for_all.before(lambda ctxt: ModelMeta.session_start()) syslog.for_all.before(lambda ctxt: ModelMeta.session_start())
syslog.for_all.after(lambda ctxt: ModelMeta.session_end()) syslog.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(syslog, system_log_address).run() Executor(syslog, clients['system_log'].connect_to).run()
def construct_tasks(system_log_address, tasks_address, scheduler_address): def construct_tasks(extensions, clients):
syslog = Client(system_log_address) syslog = clients['system_log']
scheduler = wscheduler.SchedulerCallbackClient( # FIXME will be solved by hooks on certain events
Client(scheduler_address)) # solar.orchestraion.extensions.tasks.before =
tasks = Tasks() # 1 = solar.orchestration.workers.scheduler:subscribe
tasks_executor = Executor(tasks, tasks_address) scheduler = SchedulerCallbackClient(clients['scheduler'])
tasks = extensions['tasks']
tasks_executor = Executor(tasks, clients['tasks'].connect_to)
tasks.for_all.before(tasks_executor.register_task) tasks.for_all.before(tasks_executor.register_task)
tasks.for_all.on_success(syslog.commit) tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error) tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update) tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error) tasks.for_all.on_error(scheduler.error)
Executor(tasks, tasks_address).run() tasks_executor.run()
def main(): def main():
import sys import sys
from gevent import spawn from gevent import spawn
from gevent import joinall from gevent import joinall
clients = loader.get_clients()
mgr = loader.get_extensions(clients)
servers = [ servers = [
spawn(construct_scheduler, C.tasks_address, C.scheduler_address), spawn(construct_scheduler, mgr, clients),
spawn(construct_system_log, C.system_log_address), spawn(construct_system_log, mgr, clients),
spawn(construct_tasks, C.system_log_address, C.tasks_address, spawn(construct_tasks, mgr, clients)
C.scheduler_address)
] ]
try: try:
log.info('Spawning scheduler, system log and tasks workers.') log.info('Spawning scheduler, system log and tasks workers.')

View File

@ -0,0 +1,76 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
from stevedore import driver
from stevedore import extension
from solar.config import C
from solar.orchestration.executors import Client
def client(address):
return Client(address)
tasks_client = partial(client, C.tasks_address)
scheduler_client = partial(client, C.scheduler_address)
system_log_client = partial(client, C.system_log_address)
def get_driver(extension, implementation):
mgr = driver.DriverManager(
namespace='solar.orchestration.drivers.%s' % extension,
name=implementation,
invoke_on_load=False,
)
return mgr.driver
def tasks(clients):
return get_driver('tasks', C.tasks_driver)()
def scheduler(clients):
return get_driver('scheduler', C.scheduler_driver)(clients['tasks'])
def system_log(clients):
return get_driver('system_log', C.system_log_driver)()
class GetObjExtensionManager(extension.ExtensionManager):
def __getitem__(self, name):
ext = super(GetObjExtensionManager, self).__getitem__(name)
return ext.obj
def get_clients():
return GetObjExtensionManager(
namespace='solar.orchestration.extensions_clients',
invoke_on_load=True)
def get_client(name):
return get_clients()[name]
def get_extensions(clients):
ext = GetObjExtensionManager(
namespace='solar.orchestration.extensions',
invoke_on_load=True,
invoke_args=(clients,))
return ext

View File

@ -22,6 +22,7 @@ import pytest
from solar.core.log import log from solar.core.log import log
from solar.dblayer.model import ModelMeta from solar.dblayer.model import ModelMeta
from solar.orchestration import executors from solar.orchestration import executors
from solar.orchestration import extensions as loader
from solar.orchestration import workers from solar.orchestration import workers
@ -86,3 +87,20 @@ def tasks(request, tasks_address):
gevent.spawn(executor.run) gevent.spawn(executor.run)
return worker, executors.Client(tasks_address) return worker, executors.Client(tasks_address)
@pytest.fixture
def clients(request):
rst = {}
rst['tasks'] = executors.Client(request.getfuncargvalue(
'tasks_address'))
rst['scheduler'] = executors.Client(request.getfuncargvalue(
'scheduler_address'))
rst['system_log'] = executors.Client(request.getfuncargvalue(
'system_log_address'))
return rst
@pytest.fixture
def extensions(clients):
return loader.get_extensions(clients)

View File

@ -33,24 +33,24 @@ def scheduler_client(scheduler_address):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def tasks(system_log_address, tasks_address, scheduler_address): def tasks(extensions, clients):
gevent.spawn( gevent.spawn(
orchestration.construct_tasks, orchestration.construct_tasks,
system_log_address, tasks_address, scheduler_address) extensions, clients)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def scheduler(tasks_address, scheduler_address): def scheduler(extensions, clients):
gevent.spawn( gevent.spawn(
orchestration.construct_scheduler, orchestration.construct_scheduler,
tasks_address, scheduler_address) extensions, clients)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def system_log(system_log_address): def system_log(extensions, clients):
gevent.spawn( gevent.spawn(
orchestration.construct_system_log, orchestration.construct_system_log,
system_log_address) extensions, clients)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@ -62,9 +62,10 @@ def resources(request, sequence_vr):
@pytest.mark.parametrize('scale', [10]) @pytest.mark.parametrize('scale', [10])
def test_concurrent_sequences_with_no_handler(scale, scheduler_client): def test_concurrent_sequences_with_no_handler(scale, clients):
total_resources = scale * 3 total_resources = scale * 3
timeout = scale * 2 timeout = scale * 2
scheduler_client = clients['scheduler']
assert len(change.stage_changes()) == total_resources assert len(change.stage_changes()) == total_resources
plan = change.send_to_orchestration() plan = change.send_to_orchestration()