Add support for configurable executor

Current patch allows to enable one of the implemented drivers
for executor. Each driver should implement Client and Executor
classes - interfaces for each of them are defined in
solar/orchestration/executors/base.py

Related to blueprint stevedorize-orchestration

Change-Id: I11c7de961ddd96377c3270bf7c6a30e001dc4fae
This commit is contained in:
Dmitry Shulyak 2016-01-27 14:17:16 +02:00
parent a179702369
commit 2d5b0a87bc
7 changed files with 100 additions and 25 deletions

View File

@ -53,3 +53,5 @@ solar.handlers =
none = solar.core.handlers.base:Empty
puppetv2 = solar.core.handlers.puppet:PuppetV2
naive_sync = solar.core.handlers.naive_sync:NaiveSync
solar.orchestration.executors =
zerorpc = solar.orchestration.executors.zerorpc_executor

View File

@ -34,6 +34,7 @@ C.system_log_address = 'ipc:///tmp/solar_system_log'
C.tasks_address = 'ipc:///tmp/solar_tasks'
C.scheduler_address = 'ipc:///tmp/solar_scheduler'
C.timewatcher_address = 'ipc:///tmp/solar_timewatcher'
C.executor = 'zerorpc'
def _lookup_vals(setter, config, prefix=None):

View File

@ -15,8 +15,8 @@
from solar.config import C
from solar.core.log import log
from solar.dblayer import ModelMeta
from solar.orchestration.executors.zerorpc_executor import Client
from solar.orchestration.executors.zerorpc_executor import Executor
from solar.orchestration.executors import Client
from solar.orchestration.executors import Executor
from solar.orchestration.workers import scheduler as wscheduler
from solar.orchestration.workers.system_log import SystemLog
from solar.orchestration.workers.tasks import Tasks
@ -46,7 +46,7 @@ def construct_tasks(system_log_address, tasks_address, scheduler_address):
Client(scheduler_address))
tasks = Tasks()
tasks_executor = Executor(tasks, tasks_address)
tasks.for_all.before(tasks_executor.register)
tasks.for_all.before(tasks_executor.register_task)
tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update)

View File

@ -1 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import driver
from solar.config import C
mgr = driver.DriverManager(
namespace='solar.orchestration.executors',
name=C.executor,
invoke_on_load=False,
)
Client = mgr.driver.Client
Executor = mgr.driver.Executor

View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Client(object):
def __init__(self, connect_to):
self.connect_to = connect_to
def __getattr__(self, method):
return getattr(self.client, method)
def __call__(self, method, ctxt, *args, **kwargs):
return getattr(self.client, method)(ctxt, *args, **kwargs)
class Executor(object):
def __init__(self, worker, bind_to):
self.worker = worker
self.bind_to = bind_to
self.worker._executor = self
def register_task(self, ctxt):
raise NotImplemented(
'Register task should be implemented'
' to support task interruption.')
def register_timeout(self, timeout, callable_):
raise NotImplemented(
'Should be implemented to propagate errors by timeout')
def kill(self, task_id, exc):
raise NotImplemented(
'Kill should be implemented'
' to support task interruption.')
def run(self):
raise NotImplemented()

View File

@ -18,6 +18,7 @@ import gevent
import zerorpc
from solar.core.log import log
from solar.orchestration.executors import base
class PoolBasedPuller(zerorpc.Puller):
@ -95,16 +96,15 @@ class LimitedExecutionPuller(PoolBasedPuller):
self._timeout_group.join(raise_error=True)
class Executor(object):
class Executor(base.Executor):
def __init__(self, worker, bind_to):
self.worker = worker
self.bind_to = bind_to
super(Executor, self).__init__(worker, bind_to)
self._tasks_register = {}
worker._executor = self
self._server = LimitedExecutionPuller(methods=self.worker)
def register(self, ctxt):
def register_task(self, ctxt):
if 'task_id' in ctxt:
self._tasks_register[ctxt['task_id']] = gevent.getcurrent()
@ -114,23 +114,17 @@ class Executor(object):
self._tasks_register[task_id].kill(exc, block=True)
self._tasks_register.pop(task_id)
def register_timeout(self, *args):
self._server.register_timeout(*args)
def register_timeout(self, timeout, callable_):
self._server.register_timeout(timeout, callable_)
def run(self):
self._server.bind(self.bind_to)
self._server.run()
class Client(object):
class Client(base.Client):
def __init__(self, connect_to):
self.connect_to = connect_to
super(Client, self).__init__(connect_to)
self.client = zerorpc.Pusher()
self.client.connect(connect_to)
def __getattr__(self, method):
return getattr(self.client, method)
def __call__(self, method, ctxt, *args, **kwargs):
return getattr(self.client, method)(ctxt, *args, **kwargs)

View File

@ -21,7 +21,7 @@ import pytest
from solar.core.log import log
from solar.dblayer.model import ModelMeta
from solar.orchestration.executors import zerorpc_executor
from solar.orchestration import executors
from solar.orchestration import workers
@ -51,7 +51,7 @@ def scheduler(request, scheduler_address):
tasks_client = None
if 'tasks' in request.node.fixturenames:
tasks_client = zerorpc_executor.Client(
tasks_client = executors.Client(
request.getfuncargvalue('tasks_address'))
worker = workers.scheduler.Scheduler(tasks_client)
@ -67,22 +67,22 @@ def scheduler(request, scheduler_address):
worker.for_all.before(session_start)
worker.for_all.after(session_end)
executor = zerorpc_executor.Executor(worker, scheduler_address)
executor = executors.Executor(worker, scheduler_address)
gevent.spawn(executor.run)
return worker, zerorpc_executor.Client(scheduler_address)
return worker, executors.Client(scheduler_address)
@pytest.fixture
def tasks(request, tasks_address):
worker = workers.tasks.Tasks()
executor = zerorpc_executor.Executor(worker, tasks_address)
worker.for_all.before(executor.register)
executor = executors.Executor(worker, tasks_address)
worker.for_all.before(executor.register_task)
if 'scheduler' in request.node.fixturenames:
scheduler_client = workers.scheduler.SchedulerCallbackClient(
zerorpc_executor.Client(request.getfuncargvalue(
executors.Client(request.getfuncargvalue(
'scheduler_address')))
worker.for_all.on_success(scheduler_client.update)
worker.for_all.on_error(scheduler_client.error)
gevent.spawn(executor.run)
return worker, zerorpc_executor.Client(tasks_address)
return worker, executors.Client(tasks_address)