From 1d7b37f49929625af3bb1d8cf4c8b6dc3967f58b Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Mon, 14 Dec 2015 17:10:52 -0800 Subject: [PATCH] Zerorpc worker for orchestration modules Change-Id: I8e2f119f2431c2ed8bc1b3f184540c95f071c96f --- requirements.txt | 7 +- setup.cfg | 1 + solar/cli/orch.py | 14 +- solar/config.py | 3 + solar/dblayer/locking.py | 9 +- solar/orchestration/__init__.py | 69 +++++++ solar/orchestration/consts.py | 0 solar/orchestration/executor.py | 55 ------ solar/orchestration/executors/__init__.py | 1 + .../executors/zerorpc_executor.py | 41 ++++ solar/orchestration/tasks.py | 183 ------------------ solar/orchestration/workers/__init__.py | 1 + solar/orchestration/workers/base.py | 39 ++++ solar/orchestration/workers/scheduler.py | 96 +++++++++ solar/orchestration/workers/subscription.py | 113 +++++++++++ .../workers/system_log.py} | 15 +- solar/orchestration/workers/tasks.py | 38 ++++ solar/test/conftest.py | 44 +++++ .../runner.py => test/functional/conftest.py} | 15 +- .../test_complete_solar_workflow.py | 97 ++++++++++ .../test_orchestration_scheduling.py | 112 +++++++++++ solar/test/orch_fixtures/sequential.yaml | 13 +- .../test_subscription_mechanism.py} | 37 ++-- .../resource_fixtures/data_resource/meta.yaml | 10 + .../test/resource_fixtures/sequence.yaml.tmpl | 17 ++ test-requirements.txt | 4 - 26 files changed, 739 insertions(+), 295 deletions(-) delete mode 100644 solar/orchestration/consts.py delete mode 100644 solar/orchestration/executor.py create mode 100644 solar/orchestration/executors/__init__.py create mode 100644 solar/orchestration/executors/zerorpc_executor.py delete mode 100644 solar/orchestration/tasks.py create mode 100644 solar/orchestration/workers/__init__.py create mode 100644 solar/orchestration/workers/base.py create mode 100644 solar/orchestration/workers/scheduler.py create mode 100644 solar/orchestration/workers/subscription.py rename solar/{system_log/tasks.py => orchestration/workers/system_log.py} (68%) create mode 100644 solar/orchestration/workers/tasks.py rename solar/{orchestration/runner.py => test/functional/conftest.py} (67%) create mode 100644 solar/test/functional/test_complete_solar_workflow.py create mode 100644 solar/test/functional/test_orchestration_scheduling.py rename solar/test/{test_celery_executor.py => orchestration/test_subscription_mechanism.py} (51%) create mode 100644 solar/test/resource_fixtures/data_resource/meta.yaml create mode 100644 solar/test/resource_fixtures/sequence.yaml.tmpl diff --git a/requirements.txt b/requirements.txt index 549c2afe..2f822a50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,11 +11,11 @@ enum34==1.0.4 inflection Fabric==1.10.2 tabulate==0.7.5 +gevent>=1.0.2 # we need callbacks for now ansible<2.0 -celery mock multipledispatch==0.4.8 pbr @@ -37,3 +37,8 @@ semver # plugins stevedore + +#zerorpc doesnt consume messages with >13.0.2, need to debug +pyzmq==13.0.2 +zerorpc>=0.5.2 + diff --git a/setup.cfg b/setup.cfg index 3655123a..4859a59e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ source-dir = doc/source [entry_points] console_scripts = solar = solar.cli.main:run + solar-worker = solar.orchestration:main solar.computable_inputs = lua = solar.computable_inputs.ci_lua:LuaProcessor jinja = solar.computable_inputs.ci_jinja:JinjaProcessor diff --git a/solar/cli/orch.py b/solar/cli/orch.py index 0b428e5b..c7261925 100755 --- a/solar/cli/orch.py +++ b/solar/cli/orch.py @@ -24,7 +24,7 @@ from solar.dblayer.locking import DBLock from solar import errors from solar.orchestration import filters from solar.orchestration import graph -from solar.orchestration import tasks +from solar.orchestration import SCHEDULER_CLIENT from solar.orchestration.traversal import states from solar.orchestration import utils @@ -128,9 +128,7 @@ def noop(uid, task): @click.argument('uid', type=SOLARUID, default='last') @click.option('-w', 'wait', default=0) def run_once(uid, wait): - tasks.schedule_start.apply_async( - args=[uid], - queue='scheduler') + SCHEDULER_CLIENT.next({}, uid) wait_report(uid, wait) @@ -139,7 +137,7 @@ def run_once(uid, wait): @click.option('-w', 'wait', default=0) def restart(uid, wait): graph.reset_by_uid(uid) - tasks.schedule_start.apply_async(args=[uid], queue='scheduler') + SCHEDULER_CLIENT.next({}, uid) wait_report(uid, wait) @@ -150,7 +148,7 @@ def stop(uid): # using revoke(terminate=True) will lead to inability to restart execution # research possibility of customizations # app.control and Panel.register in celery - tasks.soft_stop.apply_async(args=[uid], queue='scheduler') + SCHEDULER_CLIENT.soft_stop({}, uid) @orchestration.command() @@ -163,14 +161,14 @@ def reset(uid): @click.argument('uid', type=SOLARUID) def resume(uid): graph.reset_by_uid(uid, state_list=['SKIPPED']) - tasks.schedule_start.apply_async(args=[uid], queue='scheduler') + SCHEDULER_CLIENT.next({}, uid) @orchestration.command() @click.argument('uid', type=SOLARUID) def retry(uid): graph.reset_by_uid(uid, state_list=['ERROR']) - tasks.schedule_start.apply_async(args=[uid], queue='scheduler') + SCHEDULER_CLIENT.next({}, uid) @orchestration.command() diff --git a/solar/config.py b/solar/config.py index 743da680..86b71c38 100644 --- a/solar/config.py +++ b/solar/config.py @@ -30,6 +30,9 @@ C.riak_ensemble = False C.lock_bucket_type = None C.counter_bucket_type = None C.log_file = 'solar.log' +C.system_log_address = 'ipc:///tmp/solar_system_log' +C.tasks_address = 'ipc:///tmp/solar_tasks' +C.scheduler_address = 'ipc:///tmp/solar_scheduler' def _lookup_vals(setter, config, prefix=None): diff --git a/solar/dblayer/locking.py b/solar/dblayer/locking.py index 42f36b03..889ef1cb 100644 --- a/solar/dblayer/locking.py +++ b/solar/dblayer/locking.py @@ -61,8 +61,8 @@ class _Lock(object): lk = self._acquire(self.uid, self.identity, self.stamp) if not lk.am_i_locking(self.identity): log.debug( - 'Lock %s acquired by another identity %s != %s', - self.uid, self.identity, lk.who_is_locking()) + 'Lock %s acquired by another identity %s != %s, lockers %s', + self.uid, self.identity, lk.who_is_locking(), lk.lockers) while self.retries: del DBLock._c.obj_cache[lk.key] time.sleep(self.wait) @@ -121,8 +121,9 @@ class _CRDTishLock(_Lock): locking = lk.who_is_locking() if locking is not None: log.debug( - 'Found lock with UID %s, owned by %s, owner %r', - uid, locking, lk.am_i_locking(identity)) + 'Found lock with UID %s, owned by %s,' + ' owner %r, lockers %s', + uid, locking, lk.am_i_locking(identity), lk.lockers) return lk else: log.debug( diff --git a/solar/orchestration/__init__.py b/solar/orchestration/__init__.py index e69de29b..90618a83 100644 --- a/solar/orchestration/__init__.py +++ b/solar/orchestration/__init__.py @@ -0,0 +1,69 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from solar.config import C +from solar.core.log import log +from solar.dblayer import ModelMeta +from solar.orchestration.executors.zerorpc_executor import Client +from solar.orchestration.executors.zerorpc_executor import Executor +from solar.orchestration.workers import scheduler as wscheduler +from solar.orchestration.workers.system_log import SystemLog +from solar.orchestration.workers.tasks import Tasks + + +SCHEDULER_CLIENT = Client(C.scheduler_address) + + +def construct_scheduler(tasks_address, scheduler_address): + scheduler = wscheduler.Scheduler(Client(tasks_address)) + scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) + scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) + Executor(scheduler, scheduler_address).run() + + +def construct_system_log(system_log_address): + syslog = SystemLog() + syslog.for_all.before(lambda ctxt: ModelMeta.session_start()) + syslog.for_all.after(lambda ctxt: ModelMeta.session_end()) + Executor(syslog, system_log_address).run() + + +def construct_tasks(system_log_address, tasks_address, scheduler_address): + syslog = Client(system_log_address) + scheduler = wscheduler.SchedulerCallbackClient( + Client(scheduler_address)) + tasks = Tasks() + tasks.for_all.on_success(syslog.commit) + tasks.for_all.on_error(syslog.error) + tasks.for_all.on_success(scheduler.update) + tasks.for_all.on_error(scheduler.error) + Executor(tasks, tasks_address).run() + + +def main(): + import sys + from gevent import spawn + from gevent import joinall + servers = [ + spawn(construct_scheduler, C.tasks_address, C.scheduler_address), + spawn(construct_system_log, C.system_log_address), + spawn(construct_tasks, C.system_log_address, C.tasks_address, + C.scheduler_address) + ] + try: + log.info('Spawning scheduler, system log and tasks workers.') + joinall(servers) + except KeyboardInterrupt: + log.info('Exit solar-worker') + sys.exit() diff --git a/solar/orchestration/consts.py b/solar/orchestration/consts.py deleted file mode 100644 index e69de29b..00000000 diff --git a/solar/orchestration/executor.py b/solar/orchestration/executor.py deleted file mode 100644 index 6174f128..00000000 --- a/solar/orchestration/executor.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from celery import group - -from solar.orchestration.runner import app - - -def celery_executor(dg, tasks, control_tasks=()): - to_execute = [] - - for task_name in tasks: - - # task_id needs to be unique, so for each plan we will use - # generated uid of this plan and task_name - task_id = '{}:{}'.format(dg.graph['uid'], task_name) - task = app.tasks[dg.node[task_name]['type']] - - dg.node[task_name]['status'] = 'INPROGRESS' - dg.node[task_name]['start_time'] = time.time() - for t in generate_task(task, dg.node[task_name], task_id): - to_execute.append(t) - return group(to_execute) - - -def generate_task(task, data, task_id): - - subtask = task.subtask( - data['args'], task_id=task_id, - time_limit=data.get('time_limit', None), - soft_time_limit=data.get('soft_time_limit', None)) - - # NOTE(dshulyak) it seems that we agreed that celery wont be installed - # on every slave and transport will be chosen in handler - # if data.get('target', None): - # subtask.set(queue=data['target']) - - yield subtask - - -def all_success(dg, nodes): - return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes)) diff --git a/solar/orchestration/executors/__init__.py b/solar/orchestration/executors/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/solar/orchestration/executors/__init__.py @@ -0,0 +1 @@ + diff --git a/solar/orchestration/executors/zerorpc_executor.py b/solar/orchestration/executors/zerorpc_executor.py new file mode 100644 index 00000000..c382d10b --- /dev/null +++ b/solar/orchestration/executors/zerorpc_executor.py @@ -0,0 +1,41 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import zerorpc + + +class Executor(object): + + def __init__(self, worker, bind_to): + self.worker = worker + self.bind_to = bind_to + + def run(self): + server = zerorpc.Puller(methods=self.worker) + server.bind(self.bind_to) + server.run() + + +class Client(object): + + def __init__(self, connect_to): + self.connect_to = connect_to + self.client = zerorpc.Pusher() + self.client.connect(connect_to) + + def __getattr__(self, method): + return getattr(self.client, method) + + def __call__(self, method, ctxt, *args, **kwargs): + return getattr(self.client, method)(ctxt, *args, **kwargs) diff --git a/solar/orchestration/tasks.py b/solar/orchestration/tasks.py deleted file mode 100644 index c346c4ab..00000000 --- a/solar/orchestration/tasks.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import subprocess -import time - -from functools import partial - -from celery.app import task -from celery.signals import task_postrun -from celery.signals import task_prerun - -from solar.core import actions -from solar.core.log import log -from solar.core import resource -from solar.dblayer import ModelMeta -from solar.orchestration import executor -from solar.orchestration import graph -from solar.orchestration import limits -from solar.orchestration.runner import app -from solar.orchestration.traversal import traverse -from solar.system_log.tasks import commit_logitem -from solar.system_log.tasks import error_logitem - -from solar.dblayer.locking import Lock -from solar.utils import get_current_ident - - -__all__ = ['solar_resource', 'cmd', 'sleep', - 'error', 'fault_tolerance', 'schedule_start', 'schedule_next'] - - -# NOTE(dshulyak) i am not using celery.signals because it is not possible -# to extract task_id from *task_success* signal -class ReportTask(task.Task): - - def on_success(self, retval, task_id, args, kwargs): - schedule_next.apply_async( - args=[task_id, 'SUCCESS'], - queue='scheduler') - commit_logitem.apply_async(args=[task_id], queue='system_log') - - def on_failure(self, exc, task_id, args, kwargs, einfo): - schedule_next.apply_async( - args=[task_id, 'ERROR'], - kwargs={'errmsg': str(einfo.exception)}, - queue='scheduler') - error_logitem.apply_async(args=[task_id], queue='system_log') - - -report_task = partial(app.task, base=ReportTask, bind=True) - - -@task_prerun.connect -def start_solar_session(task_id, task, *args, **kwargs): - ModelMeta.session_start() - - -@task_postrun.connect -def end_solar_session(task_id, task, *args, **kwargs): - ModelMeta.session_end() - - -@report_task(name='solar_resource') -def solar_resource(ctxt, resource_name, action): - log.debug('TASK solar resource NAME %s ACTION %s', - resource_name, action) - res = resource.load(resource_name) - return actions.resource_action(res, action) - - -@report_task(name='cmd') -def cmd(ctxt, cmd): - popen = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - out, err = popen.communicate() - rcode = popen.returncode - if rcode: - raise Exception('Command %s failed with err %s', cmd, err) - return popen.returncode, out, err - - -@report_task(name='sleep') -def sleep(ctxt, seconds): - time.sleep(seconds) - - -@report_task(name='error') -def error(ctxt, message): - raise Exception('message') - - -@report_task(name='fault_tolerance') -def fault_tolerance(ctxt, percent): - task_id = ctxt.request.id - plan_uid, task_name = task_id.rsplit(':', 1) - - dg = graph.get_graph(plan_uid) - success = 0.0 - predecessors = dg.predecessors(task_name) - lth = len(predecessors) - - for s in predecessors: - if dg.node[s]['status'] == 'SUCCESS': - success += 1 - - succes_percent = (success / lth) * 100 - if succes_percent < percent: - raise Exception('Cant proceed with, {0} < {1}'.format( - succes_percent, percent)) - - -@report_task(name='echo') -def echo(ctxt, message): - return message - - -@report_task(name='anchor') -def anchor(ctxt, *args): - # such tasks should be walked when atleast 1/3/exact number of resources - # visited - dg = graph.get_graph('current') - for s in dg.predecessors(ctxt.request.id): - if dg.node[s]['status'] != 'SUCCESS': - raise Exception('One of the tasks erred, cant proceeed') - - -def schedule(plan_uid, dg): - tasks = traverse(dg) - filtered_tasks = list(limits.get_default_chain( - dg, - [t for t in dg if dg.node[t]['status'] == 'INPROGRESS'], - tasks)) - log.debug('Schedule next tasks %r', filtered_tasks) - execution = executor.celery_executor( - dg, filtered_tasks, control_tasks=('fault_tolerance',)) - graph.update_graph(dg) - execution() - - -@app.task(name='schedule_start') -def schedule_start(plan_uid): - """On receive finished task should update storage with task result: - - - find successors that should be executed - - apply different policies to tasks - """ - with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): - dg = graph.get_graph(plan_uid) - schedule(plan_uid, dg) - - -@app.task(name='soft_stop') -def soft_stop(plan_uid): - with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): - dg = graph.get_graph(plan_uid) - for n in dg: - if dg.node[n]['status'] == 'PENDING': - dg.node[n]['status'] = 'SKIPPED' - graph.update_graph(dg) - - -@app.task(name='schedule_next') -def schedule_next(task_id, status, errmsg=None): - plan_uid, task_name = task_id.rsplit(':', 1) - with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): - dg = graph.get_graph(plan_uid) - dg.node[task_name]['status'] = status - dg.node[task_name]['errmsg'] = errmsg - dg.node[task_name]['end_time'] = time.time() - - schedule(plan_uid, dg) diff --git a/solar/orchestration/workers/__init__.py b/solar/orchestration/workers/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/solar/orchestration/workers/__init__.py @@ -0,0 +1 @@ + diff --git a/solar/orchestration/workers/base.py b/solar/orchestration/workers/base.py new file mode 100644 index 00000000..edc6a9ef --- /dev/null +++ b/solar/orchestration/workers/base.py @@ -0,0 +1,39 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from solar.orchestration.workers.subscription import CollectionSub +from solar.orchestration.workers.subscription import FuncSub + + +class WorkerMeta(type): + + def __new__(cls, name, bases, attrs): + + funcs = [] + for attrname, attrvalue in attrs.iteritems(): + if attrname[0] != '_' and not isinstance(attrvalue, CollectionSub): + sub = FuncSub(attrvalue) + attrs[attrname] = sub + funcs.append(sub) + return super(WorkerMeta, cls).__new__(cls, name, bases, attrs) + + +class Worker(object): + + __metaclass__ = WorkerMeta + + for_all = CollectionSub() + + def ping(self, ctxt): + return 'pong' diff --git a/solar/orchestration/workers/scheduler.py b/solar/orchestration/workers/scheduler.py new file mode 100644 index 00000000..61fbf962 --- /dev/null +++ b/solar/orchestration/workers/scheduler.py @@ -0,0 +1,96 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from solar.core.log import log +from solar.dblayer.locking import Lock +from solar.orchestration import graph +from solar.orchestration import limits +from solar.orchestration.traversal import traverse +from solar.orchestration.workers import base +from solar.utils import get_current_ident + + +class Scheduler(base.Worker): + + def __init__(self, tasks_client): + self._tasks = tasks_client + super(Scheduler, self).__init__() + + def _next(self, dg): + tasks = traverse(dg) + filtered_tasks = list(limits.get_default_chain( + dg, + [t for t in dg if dg.node[t]['status'] == 'INPROGRESS'], + tasks)) + return filtered_tasks + + def next(self, ctxt, plan_uid): + with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): + log.debug('Received *next* event for %s', plan_uid) + dg = graph.get_graph(plan_uid) + rst = self._next(dg) + for task_name in rst: + task_id = '{}:{}'.format(dg.graph['uid'], task_name) + task_type = dg.node[task_name]['type'] + dg.node[task_name]['status'] = 'INPROGRESS' + ctxt = {'task_id': task_id, 'task_name': task_name} + self._tasks( + task_type, ctxt, + *dg.node[task_name]['args']) + graph.update_graph(dg) + log.debug('Scheduled tasks %r', rst) + # process tasks with tasks client + return rst + + def soft_stop(self, ctxt, plan_uid): + with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): + dg = graph.get_graph(plan_uid) + for n in dg: + if dg.node[n]['status'] == 'PENDING': + dg.node[n]['status'] = 'SKIPPED' + graph.update_graph(dg) + + def update_next(self, ctxt, status, errmsg): + plan_uid, task_name = ctxt['task_id'].rsplit(':', 1) + with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): + dg = graph.get_graph(plan_uid) + dg.node[task_name]['status'] = status + dg.node[task_name]['errmsg'] = errmsg + dg.node[task_name]['end_time'] = time.time() + rst = self._next(dg) + for task_name in rst: + task_id = '{}:{}'.format(dg.graph['uid'], task_name) + task_type = dg.node[task_name]['type'] + dg.node[task_name]['status'] = 'INPROGRESS' + ctxt = {'task_id': task_id, 'task_name': task_name} + self._tasks( + task_type, ctxt, + *dg.node[task_name]['args']) + graph.update_graph(dg) + log.debug('Scheduled tasks %r', rst) + return rst + + +class SchedulerCallbackClient(object): + + def __init__(self, client): + self.client = client + + def update(self, ctxt, result, *args, **kwargs): + self.client.update_next(ctxt, 'SUCCESS', '') + + def error(self, ctxt, result, *args, **kwargs): + self.client.update_next(ctxt, 'ERROR', repr(result)) diff --git a/solar/orchestration/workers/subscription.py b/solar/orchestration/workers/subscription.py new file mode 100644 index 00000000..373314d8 --- /dev/null +++ b/solar/orchestration/workers/subscription.py @@ -0,0 +1,113 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict +import inspect + +from solar.core.log import log + + +class SubControl(object): + + def on_success(self, sub): + self.add_subscriber(sub, 'on_success') + + def on_error(self, sub): + self.add_subscriber(sub, 'on_error') + + def after(self, sub): + self.add_subscriber(sub, 'after') + + def before(self, sub): + self.add_subscriber(sub, 'before') + + def add_subscriber(self, sub, event): + raise NotImplemented() + + +class FuncSubControl(SubControl): + + def __init__(self, instance, func): + self.instance = instance + self.func = func + self._subscribers = defaultdict(list) + self.__name__ = func.__name__ + + def add_subscriber(self, subscriber, event): + """Subsribe executable to event + :param target_func: string or function object + :param subscriber: callable + :param events: None or iterable + """ + self._subscribers[event].append(subscriber) + + def __call__(self, ctxt, *args, **kwargs): + for sub in self._subscribers['before']: + try: + sub(ctxt) + except Exception as exc: + log.error('Subscriber before %r failed with %r', sub, exc) + try: + rst = self.func(self.instance, ctxt, *args, **kwargs) + for sub in self._subscribers['on_success']: + try: + sub(ctxt, rst, *args, **kwargs) + except Exception as exc: + log.error( + 'Subscriber on_success %r failed with %r', sub, exc) + return rst + except Exception as exc: + for sub in self._subscribers['on_error']: + try: + sub(ctxt, repr(exc), *args, **kwargs) + except Exception as exc: + log.error( + 'Subscriber on_error %r failed with %r', sub, exc) + raise + finally: + for sub in self._subscribers['after']: + try: + sub(ctxt) + except Exception as exc: + log.error('Subscriber after %r failed with %r', sub, exc) + + +class FuncSub(object): + + def __init__(self, func): + self.func = func + + def __get__(self, obj, owner): + property_name = '__sub_control_' + self.func.__name__ + sub_control = getattr(obj, property_name, None) + if not sub_control: + setattr(obj, property_name, FuncSubControl(obj, self.func)) + return getattr(obj, property_name) + + +class CollectionSubControl(SubControl): + + def __init__(self, instance): + self.instance = instance + + def add_subscriber(self, subscriber, event): + for entity_name, entity in inspect.getmembers(self.instance): + if isinstance(entity, FuncSubControl) and entity_name[:2] != '__': + entity.add_subscriber(subscriber, event) + + +class CollectionSub(object): + + def __get__(self, obj, owner): + return CollectionSubControl(obj) diff --git a/solar/system_log/tasks.py b/solar/orchestration/workers/system_log.py similarity index 68% rename from solar/system_log/tasks.py rename to solar/orchestration/workers/system_log.py index 1c53f1b1..ab2c72f7 100644 --- a/solar/system_log/tasks.py +++ b/solar/orchestration/workers/system_log.py @@ -12,18 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -from solar.orchestration.runner import app +from solar.orchestration.workers import base from solar.system_log.operations import move_to_commited from solar.system_log.operations import set_error -__all__ = ['error_logitem', 'commit_logitem'] +class SystemLog(base.Worker): -@app.task(name='error_logitem') -def error_logitem(task_uuid): - return set_error(task_uuid.rsplit(':', 1)[-1]) + def commit(self, ctxt, *args, **kwargs): + return move_to_commited(ctxt['task_id'].rsplit(':', 1)[-1]) - -@app.task(name='commit_logitem') -def commit_logitem(task_uuid): - return move_to_commited(task_uuid.rsplit(':', 1)[-1]) + def error(self, ctxt, *args, **kwargs): + return set_error(ctxt['task_id'].rsplit(':', 1)[-1]) diff --git a/solar/orchestration/workers/tasks.py b/solar/orchestration/workers/tasks.py new file mode 100644 index 00000000..3651455c --- /dev/null +++ b/solar/orchestration/workers/tasks.py @@ -0,0 +1,38 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from solar.core import actions +from solar.core.log import log +from solar.core import resource +from solar.orchestration.workers import base + + +class Tasks(base.Worker): + + def sleep(self, ctxt, seconds): + return time.sleep(seconds) + + def error(self, ctxt, message): + raise Exception(message) + + def echo(self, ctxt, message): + return message + + def solar_resource(self, ctxt, resource_name, action): + log.debug('TASK solar resource NAME %s ACTION %s', + resource_name, action) + res = resource.load(resource_name) + return actions.resource_action(res, action) diff --git a/solar/test/conftest.py b/solar/test/conftest.py index 8abec36c..cbb4c6ff 100644 --- a/solar/test/conftest.py +++ b/solar/test/conftest.py @@ -21,6 +21,7 @@ from solar.core.resource import Resource from solar.dblayer.model import get_bucket from solar.dblayer.model import Model from solar.dblayer.model import ModelMeta +from solar.orchestration import graph def patched_get_bucket_name(cls): @@ -79,3 +80,46 @@ def pytest_runtest_call(item): Model.get_bucket_name = classmethod(patched_get_bucket_name) + + +def plan_from_fixture(name): + riak_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures', + '%s.yaml' % name) + return graph.create_plan(riak_path) + + +@pytest.fixture +def riak_plan(): + return plan_from_fixture('riak') + + +@pytest.fixture +def simple_plan(): + return plan_from_fixture('simple') + + +@pytest.fixture +def sequential_plan(): + return plan_from_fixture('sequential') + + +@pytest.fixture +def two_path_plan(): + return plan_from_fixture('two_path') + + +@pytest.fixture +def sequence_vr(tmpdir): + base_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'resource_fixtures') + vr_tmpl_path = os.path.join(base_path, 'sequence.yaml.tmpl') + base_resource_path = os.path.join(base_path, 'data_resource') + with open(vr_tmpl_path) as f: + vr_data = f.read().format( + resource_path=base_resource_path, + idx='#{ idx }#') + vr_file = tmpdir.join('sequence.yaml') + vr_file.write(vr_data) + return str(vr_file) diff --git a/solar/orchestration/runner.py b/solar/test/functional/conftest.py similarity index 67% rename from solar/orchestration/runner.py rename to solar/test/functional/conftest.py index 4ac65098..1d231803 100644 --- a/solar/orchestration/runner.py +++ b/solar/test/functional/conftest.py @@ -12,14 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -from celery import Celery +import random +import string -from solar.config import C +import pytest -app = Celery( - include=['solar.system_log.tasks', 'solar.orchestration.tasks'], - broker=C.celery_broker, - backend=C.celery_backend) -app.conf.update(CELERY_ACCEPT_CONTENT=['json']) -app.conf.update(CELERY_TASK_SERIALIZER='json') +@pytest.fixture +def address(): + return 'ipc:///tmp/solar_test_' + ''.join( + (random.choice(string.ascii_lowercase) for x in xrange(4))) diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py new file mode 100644 index 00000000..657f74a4 --- /dev/null +++ b/solar/test/functional/test_complete_solar_workflow.py @@ -0,0 +1,97 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import gevent +import pytest + +from solar.core.resource import composer +from solar.dblayer.model import clear_cache +from solar import orchestration +from solar.orchestration.graph import wait_finish +from solar.orchestration.traversal import states +from solar.system_log import change +from solar.system_log import data + + +@pytest.fixture +def tasks_address(address): + return address + 'tasks' + + +@pytest.fixture +def system_log_address(address): + return address + 'system_log' + + +@pytest.fixture +def scheduler_address(address): + return address + 'scheduler' + + +@pytest.fixture +def scheduler_client(scheduler_address): + return orchestration.Client(scheduler_address) + + +@pytest.fixture(autouse=True) +def tasks(system_log_address, tasks_address, scheduler_address): + gevent.spawn( + orchestration.construct_tasks, + system_log_address, tasks_address, scheduler_address) + + +@pytest.fixture(autouse=True) +def scheduler(tasks_address, scheduler_address): + gevent.spawn( + orchestration.construct_scheduler, + tasks_address, scheduler_address) + + +@pytest.fixture(autouse=True) +def system_log(system_log_address): + gevent.spawn( + orchestration.construct_system_log, + system_log_address) + + +@pytest.fixture(autouse=True) +def resources(request, sequence_vr): + scale = request.getfuncargvalue('scale') + for idx in range(scale): + composer.create( + 'sequence_%s' % idx, sequence_vr, inputs={'idx': idx}) + + +@pytest.mark.parametrize('scale', [20]) +def test_concurrent_sequences_with_no_handler(scale, scheduler_client): + total_resources = scale * 3 + timeout = scale + + assert len(change.stage_changes()) == total_resources + plan = change.send_to_orchestration() + scheduler_client.next({}, plan.graph['uid']) + + def wait_function(timeout): + for summary in wait_finish(plan.graph['uid'], timeout): + assert summary[states.ERROR.name] == 0 + time.sleep(0.5) + return summary + waiter = gevent.spawn(wait_function, timeout) + waiter.join(timeout=timeout) + assert waiter.get(block=True)[states.SUCCESS.name] == total_resources + assert len(data.CL()) == total_resources + clear_cache() + assert len(change.stage_changes()) == 0 diff --git a/solar/test/functional/test_orchestration_scheduling.py b/solar/test/functional/test_orchestration_scheduling.py new file mode 100644 index 00000000..13079153 --- /dev/null +++ b/solar/test/functional/test_orchestration_scheduling.py @@ -0,0 +1,112 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import gevent +import pytest + +from solar.dblayer.model import ModelMeta +from solar.orchestration.executors import zerorpc_executor +from solar.orchestration.workers import scheduler as wscheduler +from solar.orchestration.workers import tasks as wtasks + + +@pytest.fixture +def tasks_worker(): + return wtasks.Tasks() + + +@pytest.fixture +def tasks_for_scheduler(tasks_worker, address): + address = address + 'tasks' + executor = zerorpc_executor.Executor(tasks_worker, address) + gevent.spawn(executor.run) + return zerorpc_executor.Client(address) + + +@pytest.fixture +def scheduler(tasks_for_scheduler, address): + address = address + 'scheduler' + worker = wscheduler.Scheduler(tasks_for_scheduler) + worker.for_all.before(lambda ctxt: ModelMeta.session_start()) + worker.for_all.after(lambda ctxt: ModelMeta.session_end()) + executor = zerorpc_executor.Executor(worker, address) + gevent.spawn(executor.run) + return worker, zerorpc_executor.Client(address) + + +@pytest.fixture(autouse=True) +def setup_scheduler_callback(scheduler, tasks_worker): + worker, client = scheduler + scheduler_client = wscheduler.SchedulerCallbackClient( + zerorpc_executor.Client(client.connect_to)) + tasks_worker.for_all.on_success(scheduler_client.update) + tasks_worker.for_all.on_error(scheduler_client.update) + + +def _wait_scheduling(plan, wait_time, waiter, client): + client.next({}, plan.graph['uid']) + waiter = gevent.spawn(waiter) + waiter.join(timeout=wait_time) + + +def test_simple_fixture(simple_plan, scheduler): + worker, client = scheduler + scheduling_results = [] + expected = [['echo_stuff'], ['just_fail'], []] + + def register(ctxt, rst, *args, **kwargs): + scheduling_results.append(rst) + worker.for_all.on_success(register) + + def _result_waiter(): + while scheduling_results != expected: + time.sleep(0.1) + _wait_scheduling(simple_plan, 3, _result_waiter, client) + assert scheduling_results == expected + + +def test_sequential_fixture(sequential_plan, scheduler): + worker, client = scheduler + scheduling_results = set() + expected = {('s1',), ('s2',), ('s3',), ()} + + def register(ctxt, rst, *args, **kwargs): + scheduling_results.add(tuple(rst)) + worker.for_all.on_success(register) + + def _result_waiter(): + while scheduling_results != expected: + time.sleep(0.1) + _wait_scheduling(sequential_plan, 2, _result_waiter, client) + assert scheduling_results == expected + + +def test_two_path_fixture(two_path_plan, scheduler): + worker, client = scheduler + scheduling_results = [] + expected = [{'a', 'c'}, {'a', 'c'}, {'b', 'd'}, {'b', 'd'}, {'e'}] + + def register(ctxt, rst, *args, **kwargs): + if 'task_name' in ctxt: + scheduling_results.append(ctxt['task_name']) + worker.for_all.on_success(register) + + def _result_waiter(): + while len(scheduling_results) != len(expected): + time.sleep(0.1) + _wait_scheduling(two_path_plan, 3, _result_waiter, client) + for si, ei in zip(scheduling_results, expected): + assert any([si == e for e in ei]) diff --git a/solar/test/orch_fixtures/sequential.yaml b/solar/test/orch_fixtures/sequential.yaml index 5e8675d7..7de9d1d3 100644 --- a/solar/test/orch_fixtures/sequential.yaml +++ b/solar/test/orch_fixtures/sequential.yaml @@ -3,16 +3,15 @@ tasks: - uid: s1 parameters: type: sleep - args: [2] - target: 1 + args: [0.2] + target: '1' - uid: s2 - parameters: type: sleep - args: [2] - target: 1 + args: [0.2] + target: '1' - uid: s3 parameters: type: sleep - args: [2] - target: 1 + args: [0.2] + target: '1' diff --git a/solar/test/test_celery_executor.py b/solar/test/orchestration/test_subscription_mechanism.py similarity index 51% rename from solar/test/test_celery_executor.py rename to solar/test/orchestration/test_subscription_mechanism.py index 323b71e6..c570ffec 100644 --- a/solar/test/test_celery_executor.py +++ b/solar/test/orchestration/test_subscription_mechanism.py @@ -12,23 +12,28 @@ # License for the specific language governing permissions and limitations # under the License. -import networkx as nx - -from mock import patch -from pytest import fixture -from solar.orchestration import executor +from solar.orchestration.workers import base -@fixture -def dg(): - ex = nx.DiGraph() - ex.add_node('t1', args=['t'], status='PENDING', type='echo') - ex.graph['uid'] = 'some_string' - return ex +class SubTest(base.Worker): + """for tests.""" + + def pass_two(self, ctxt): + return 2 -@patch.object(executor, 'app') -def test_celery_executor(mapp, dg): - """Just check that it doesnt fail for now.""" - assert executor.celery_executor(dg, ['t1']) - assert dg.node['t1']['status'] == 'INPROGRESS' +def test_subscribe_on_success(): + sub = SubTest() + test = [] + assert sub.pass_two.on_success(lambda ctxt, rst: test.append(rst)) is None + assert sub.pass_two({}) == 2 + assert test == [2] + + +def test_subscribe_for_all(): + sub = SubTest() + test = [] + sub.for_all.after(lambda ctxt: test.append('after')) + sub.for_all.before(lambda ctxt: test.append('before')) + sub.pass_two({}) + assert test == ['before', 'after'] diff --git a/solar/test/resource_fixtures/data_resource/meta.yaml b/solar/test/resource_fixtures/data_resource/meta.yaml new file mode 100644 index 00000000..76194b5a --- /dev/null +++ b/solar/test/resource_fixtures/data_resource/meta.yaml @@ -0,0 +1,10 @@ +id: data_resources +handler: none +version: 1.0.0 +input: + key1: + schema: str! + value: + key2: + schema: str! + value: diff --git a/solar/test/resource_fixtures/sequence.yaml.tmpl b/solar/test/resource_fixtures/sequence.yaml.tmpl new file mode 100644 index 00000000..6af6e63d --- /dev/null +++ b/solar/test/resource_fixtures/sequence.yaml.tmpl @@ -0,0 +1,17 @@ +id: data_resources_{idx} +resources: + - id: data_resource_1_{idx} + from: {resource_path} + input: + key1: key1 + key2: key2 + - id: data_resource_2_{idx} + from: {resource_path} + input: + key1: data_resource_1_{idx}::key1 + key2: data_resource_1_{idx}::key2 + - id: data_resource_3_{idx} + from: {resource_path} + input: + key1: data_resource_2_{idx}::key1 + key2: data_resource_2_{idx}::key2 diff --git a/test-requirements.txt b/test-requirements.txt index 066835ea..5dd93422 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,7 +10,3 @@ os-testr ## for computable inputs # temporary disabled # lupa - -# to test if everything works on gevent -gevent -