Zerorpc worker for orchestration modules

Change-Id: I8e2f119f2431c2ed8bc1b3f184540c95f071c96f
This commit is contained in:
Dmitry Shulyak 2015-12-14 17:10:52 -08:00
parent 47da01d5c7
commit 1d7b37f499
26 changed files with 739 additions and 295 deletions

View File

@ -11,11 +11,11 @@ enum34==1.0.4
inflection
Fabric==1.10.2
tabulate==0.7.5
gevent>=1.0.2
# we need callbacks for now
ansible<2.0
celery
mock
multipledispatch==0.4.8
pbr
@ -37,3 +37,8 @@ semver
# plugins
stevedore
#zerorpc doesnt consume messages with >13.0.2, need to debug
pyzmq==13.0.2
zerorpc>=0.5.2

View File

@ -29,6 +29,7 @@ source-dir = doc/source
[entry_points]
console_scripts =
solar = solar.cli.main:run
solar-worker = solar.orchestration:main
solar.computable_inputs =
lua = solar.computable_inputs.ci_lua:LuaProcessor
jinja = solar.computable_inputs.ci_jinja:JinjaProcessor

View File

@ -24,7 +24,7 @@ from solar.dblayer.locking import DBLock
from solar import errors
from solar.orchestration import filters
from solar.orchestration import graph
from solar.orchestration import tasks
from solar.orchestration import SCHEDULER_CLIENT
from solar.orchestration.traversal import states
from solar.orchestration import utils
@ -128,9 +128,7 @@ def noop(uid, task):
@click.argument('uid', type=SOLARUID, default='last')
@click.option('-w', 'wait', default=0)
def run_once(uid, wait):
tasks.schedule_start.apply_async(
args=[uid],
queue='scheduler')
SCHEDULER_CLIENT.next({}, uid)
wait_report(uid, wait)
@ -139,7 +137,7 @@ def run_once(uid, wait):
@click.option('-w', 'wait', default=0)
def restart(uid, wait):
graph.reset_by_uid(uid)
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
SCHEDULER_CLIENT.next({}, uid)
wait_report(uid, wait)
@ -150,7 +148,7 @@ def stop(uid):
# using revoke(terminate=True) will lead to inability to restart execution
# research possibility of customizations
# app.control and Panel.register in celery
tasks.soft_stop.apply_async(args=[uid], queue='scheduler')
SCHEDULER_CLIENT.soft_stop({}, uid)
@orchestration.command()
@ -163,14 +161,14 @@ def reset(uid):
@click.argument('uid', type=SOLARUID)
def resume(uid):
graph.reset_by_uid(uid, state_list=['SKIPPED'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
SCHEDULER_CLIENT.next({}, uid)
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def retry(uid):
graph.reset_by_uid(uid, state_list=['ERROR'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
SCHEDULER_CLIENT.next({}, uid)
@orchestration.command()

View File

@ -30,6 +30,9 @@ C.riak_ensemble = False
C.lock_bucket_type = None
C.counter_bucket_type = None
C.log_file = 'solar.log'
C.system_log_address = 'ipc:///tmp/solar_system_log'
C.tasks_address = 'ipc:///tmp/solar_tasks'
C.scheduler_address = 'ipc:///tmp/solar_scheduler'
def _lookup_vals(setter, config, prefix=None):

View File

@ -61,8 +61,8 @@ class _Lock(object):
lk = self._acquire(self.uid, self.identity, self.stamp)
if not lk.am_i_locking(self.identity):
log.debug(
'Lock %s acquired by another identity %s != %s',
self.uid, self.identity, lk.who_is_locking())
'Lock %s acquired by another identity %s != %s, lockers %s',
self.uid, self.identity, lk.who_is_locking(), lk.lockers)
while self.retries:
del DBLock._c.obj_cache[lk.key]
time.sleep(self.wait)
@ -121,8 +121,9 @@ class _CRDTishLock(_Lock):
locking = lk.who_is_locking()
if locking is not None:
log.debug(
'Found lock with UID %s, owned by %s, owner %r',
uid, locking, lk.am_i_locking(identity))
'Found lock with UID %s, owned by %s,'
' owner %r, lockers %s',
uid, locking, lk.am_i_locking(identity), lk.lockers)
return lk
else:
log.debug(

View File

@ -0,0 +1,69 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from solar.config import C
from solar.core.log import log
from solar.dblayer import ModelMeta
from solar.orchestration.executors.zerorpc_executor import Client
from solar.orchestration.executors.zerorpc_executor import Executor
from solar.orchestration.workers import scheduler as wscheduler
from solar.orchestration.workers.system_log import SystemLog
from solar.orchestration.workers.tasks import Tasks
SCHEDULER_CLIENT = Client(C.scheduler_address)
def construct_scheduler(tasks_address, scheduler_address):
scheduler = wscheduler.Scheduler(Client(tasks_address))
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(scheduler, scheduler_address).run()
def construct_system_log(system_log_address):
syslog = SystemLog()
syslog.for_all.before(lambda ctxt: ModelMeta.session_start())
syslog.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(syslog, system_log_address).run()
def construct_tasks(system_log_address, tasks_address, scheduler_address):
syslog = Client(system_log_address)
scheduler = wscheduler.SchedulerCallbackClient(
Client(scheduler_address))
tasks = Tasks()
tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error)
Executor(tasks, tasks_address).run()
def main():
import sys
from gevent import spawn
from gevent import joinall
servers = [
spawn(construct_scheduler, C.tasks_address, C.scheduler_address),
spawn(construct_system_log, C.system_log_address),
spawn(construct_tasks, C.system_log_address, C.tasks_address,
C.scheduler_address)
]
try:
log.info('Spawning scheduler, system log and tasks workers.')
joinall(servers)
except KeyboardInterrupt:
log.info('Exit solar-worker')
sys.exit()

View File

@ -1,55 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from celery import group
from solar.orchestration.runner import app
def celery_executor(dg, tasks, control_tasks=()):
to_execute = []
for task_name in tasks:
# task_id needs to be unique, so for each plan we will use
# generated uid of this plan and task_name
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
return group(to_execute)
def generate_task(task, data, task_id):
subtask = task.subtask(
data['args'], task_id=task_id,
time_limit=data.get('time_limit', None),
soft_time_limit=data.get('soft_time_limit', None))
# NOTE(dshulyak) it seems that we agreed that celery wont be installed
# on every slave and transport will be chosen in handler
# if data.get('target', None):
# subtask.set(queue=data['target'])
yield subtask
def all_success(dg, nodes):
return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes))

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,41 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import zerorpc
class Executor(object):
def __init__(self, worker, bind_to):
self.worker = worker
self.bind_to = bind_to
def run(self):
server = zerorpc.Puller(methods=self.worker)
server.bind(self.bind_to)
server.run()
class Client(object):
def __init__(self, connect_to):
self.connect_to = connect_to
self.client = zerorpc.Pusher()
self.client.connect(connect_to)
def __getattr__(self, method):
return getattr(self.client, method)
def __call__(self, method, ctxt, *args, **kwargs):
return getattr(self.client, method)(ctxt, *args, **kwargs)

View File

@ -1,183 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import time
from functools import partial
from celery.app import task
from celery.signals import task_postrun
from celery.signals import task_prerun
from solar.core import actions
from solar.core.log import log
from solar.core import resource
from solar.dblayer import ModelMeta
from solar.orchestration import executor
from solar.orchestration import graph
from solar.orchestration import limits
from solar.orchestration.runner import app
from solar.orchestration.traversal import traverse
from solar.system_log.tasks import commit_logitem
from solar.system_log.tasks import error_logitem
from solar.dblayer.locking import Lock
from solar.utils import get_current_ident
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']
# NOTE(dshulyak) i am not using celery.signals because it is not possible
# to extract task_id from *task_success* signal
class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs):
schedule_next.apply_async(
args=[task_id, 'SUCCESS'],
queue='scheduler')
commit_logitem.apply_async(args=[task_id], queue='system_log')
def on_failure(self, exc, task_id, args, kwargs, einfo):
schedule_next.apply_async(
args=[task_id, 'ERROR'],
kwargs={'errmsg': str(einfo.exception)},
queue='scheduler')
error_logitem.apply_async(args=[task_id], queue='system_log')
report_task = partial(app.task, base=ReportTask, bind=True)
@task_prerun.connect
def start_solar_session(task_id, task, *args, **kwargs):
ModelMeta.session_start()
@task_postrun.connect
def end_solar_session(task_id, task, *args, **kwargs):
ModelMeta.session_end()
@report_task(name='solar_resource')
def solar_resource(ctxt, resource_name, action):
log.debug('TASK solar resource NAME %s ACTION %s',
resource_name, action)
res = resource.load(resource_name)
return actions.resource_action(res, action)
@report_task(name='cmd')
def cmd(ctxt, cmd):
popen = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = popen.communicate()
rcode = popen.returncode
if rcode:
raise Exception('Command %s failed with err %s', cmd, err)
return popen.returncode, out, err
@report_task(name='sleep')
def sleep(ctxt, seconds):
time.sleep(seconds)
@report_task(name='error')
def error(ctxt, message):
raise Exception('message')
@report_task(name='fault_tolerance')
def fault_tolerance(ctxt, percent):
task_id = ctxt.request.id
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
success = 0.0
predecessors = dg.predecessors(task_name)
lth = len(predecessors)
for s in predecessors:
if dg.node[s]['status'] == 'SUCCESS':
success += 1
succes_percent = (success / lth) * 100
if succes_percent < percent:
raise Exception('Cant proceed with, {0} < {1}'.format(
succes_percent, percent))
@report_task(name='echo')
def echo(ctxt, message):
return message
@report_task(name='anchor')
def anchor(ctxt, *args):
# such tasks should be walked when atleast 1/3/exact number of resources
# visited
dg = graph.get_graph('current')
for s in dg.predecessors(ctxt.request.id):
if dg.node[s]['status'] != 'SUCCESS':
raise Exception('One of the tasks erred, cant proceeed')
def schedule(plan_uid, dg):
tasks = traverse(dg)
filtered_tasks = list(limits.get_default_chain(
dg,
[t for t in dg if dg.node[t]['status'] == 'INPROGRESS'],
tasks))
log.debug('Schedule next tasks %r', filtered_tasks)
execution = executor.celery_executor(
dg, filtered_tasks, control_tasks=('fault_tolerance',))
graph.update_graph(dg)
execution()
@app.task(name='schedule_start')
def schedule_start(plan_uid):
"""On receive finished task should update storage with task result:
- find successors that should be executed
- apply different policies to tasks
"""
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
schedule(plan_uid, dg)
@app.task(name='soft_stop')
def soft_stop(plan_uid):
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.update_graph(dg)
@app.task(name='schedule_next')
def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg
dg.node[task_name]['end_time'] = time.time()
schedule(plan_uid, dg)

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,39 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from solar.orchestration.workers.subscription import CollectionSub
from solar.orchestration.workers.subscription import FuncSub
class WorkerMeta(type):
def __new__(cls, name, bases, attrs):
funcs = []
for attrname, attrvalue in attrs.iteritems():
if attrname[0] != '_' and not isinstance(attrvalue, CollectionSub):
sub = FuncSub(attrvalue)
attrs[attrname] = sub
funcs.append(sub)
return super(WorkerMeta, cls).__new__(cls, name, bases, attrs)
class Worker(object):
__metaclass__ = WorkerMeta
for_all = CollectionSub()
def ping(self, ctxt):
return 'pong'

View File

@ -0,0 +1,96 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from solar.core.log import log
from solar.dblayer.locking import Lock
from solar.orchestration import graph
from solar.orchestration import limits
from solar.orchestration.traversal import traverse
from solar.orchestration.workers import base
from solar.utils import get_current_ident
class Scheduler(base.Worker):
def __init__(self, tasks_client):
self._tasks = tasks_client
super(Scheduler, self).__init__()
def _next(self, dg):
tasks = traverse(dg)
filtered_tasks = list(limits.get_default_chain(
dg,
[t for t in dg if dg.node[t]['status'] == 'INPROGRESS'],
tasks))
return filtered_tasks
def next(self, ctxt, plan_uid):
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
log.debug('Received *next* event for %s', plan_uid)
dg = graph.get_graph(plan_uid)
rst = self._next(dg)
for task_name in rst:
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst)
# process tasks with tasks client
return rst
def soft_stop(self, ctxt, plan_uid):
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.update_graph(dg)
def update_next(self, ctxt, status, errmsg):
plan_uid, task_name = ctxt['task_id'].rsplit(':', 1)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg
dg.node[task_name]['end_time'] = time.time()
rst = self._next(dg)
for task_name in rst:
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst)
return rst
class SchedulerCallbackClient(object):
def __init__(self, client):
self.client = client
def update(self, ctxt, result, *args, **kwargs):
self.client.update_next(ctxt, 'SUCCESS', '')
def error(self, ctxt, result, *args, **kwargs):
self.client.update_next(ctxt, 'ERROR', repr(result))

View File

@ -0,0 +1,113 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
import inspect
from solar.core.log import log
class SubControl(object):
def on_success(self, sub):
self.add_subscriber(sub, 'on_success')
def on_error(self, sub):
self.add_subscriber(sub, 'on_error')
def after(self, sub):
self.add_subscriber(sub, 'after')
def before(self, sub):
self.add_subscriber(sub, 'before')
def add_subscriber(self, sub, event):
raise NotImplemented()
class FuncSubControl(SubControl):
def __init__(self, instance, func):
self.instance = instance
self.func = func
self._subscribers = defaultdict(list)
self.__name__ = func.__name__
def add_subscriber(self, subscriber, event):
"""Subsribe executable to event
:param target_func: string or function object
:param subscriber: callable
:param events: None or iterable
"""
self._subscribers[event].append(subscriber)
def __call__(self, ctxt, *args, **kwargs):
for sub in self._subscribers['before']:
try:
sub(ctxt)
except Exception as exc:
log.error('Subscriber before %r failed with %r', sub, exc)
try:
rst = self.func(self.instance, ctxt, *args, **kwargs)
for sub in self._subscribers['on_success']:
try:
sub(ctxt, rst, *args, **kwargs)
except Exception as exc:
log.error(
'Subscriber on_success %r failed with %r', sub, exc)
return rst
except Exception as exc:
for sub in self._subscribers['on_error']:
try:
sub(ctxt, repr(exc), *args, **kwargs)
except Exception as exc:
log.error(
'Subscriber on_error %r failed with %r', sub, exc)
raise
finally:
for sub in self._subscribers['after']:
try:
sub(ctxt)
except Exception as exc:
log.error('Subscriber after %r failed with %r', sub, exc)
class FuncSub(object):
def __init__(self, func):
self.func = func
def __get__(self, obj, owner):
property_name = '__sub_control_' + self.func.__name__
sub_control = getattr(obj, property_name, None)
if not sub_control:
setattr(obj, property_name, FuncSubControl(obj, self.func))
return getattr(obj, property_name)
class CollectionSubControl(SubControl):
def __init__(self, instance):
self.instance = instance
def add_subscriber(self, subscriber, event):
for entity_name, entity in inspect.getmembers(self.instance):
if isinstance(entity, FuncSubControl) and entity_name[:2] != '__':
entity.add_subscriber(subscriber, event)
class CollectionSub(object):
def __get__(self, obj, owner):
return CollectionSubControl(obj)

View File

@ -12,18 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.orchestration.runner import app
from solar.orchestration.workers import base
from solar.system_log.operations import move_to_commited
from solar.system_log.operations import set_error
__all__ = ['error_logitem', 'commit_logitem']
class SystemLog(base.Worker):
@app.task(name='error_logitem')
def error_logitem(task_uuid):
return set_error(task_uuid.rsplit(':', 1)[-1])
def commit(self, ctxt, *args, **kwargs):
return move_to_commited(ctxt['task_id'].rsplit(':', 1)[-1])
@app.task(name='commit_logitem')
def commit_logitem(task_uuid):
return move_to_commited(task_uuid.rsplit(':', 1)[-1])
def error(self, ctxt, *args, **kwargs):
return set_error(ctxt['task_id'].rsplit(':', 1)[-1])

View File

@ -0,0 +1,38 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from solar.core import actions
from solar.core.log import log
from solar.core import resource
from solar.orchestration.workers import base
class Tasks(base.Worker):
def sleep(self, ctxt, seconds):
return time.sleep(seconds)
def error(self, ctxt, message):
raise Exception(message)
def echo(self, ctxt, message):
return message
def solar_resource(self, ctxt, resource_name, action):
log.debug('TASK solar resource NAME %s ACTION %s',
resource_name, action)
res = resource.load(resource_name)
return actions.resource_action(res, action)

View File

@ -21,6 +21,7 @@ from solar.core.resource import Resource
from solar.dblayer.model import get_bucket
from solar.dblayer.model import Model
from solar.dblayer.model import ModelMeta
from solar.orchestration import graph
def patched_get_bucket_name(cls):
@ -79,3 +80,46 @@ def pytest_runtest_call(item):
Model.get_bucket_name = classmethod(patched_get_bucket_name)
def plan_from_fixture(name):
riak_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'%s.yaml' % name)
return graph.create_plan(riak_path)
@pytest.fixture
def riak_plan():
return plan_from_fixture('riak')
@pytest.fixture
def simple_plan():
return plan_from_fixture('simple')
@pytest.fixture
def sequential_plan():
return plan_from_fixture('sequential')
@pytest.fixture
def two_path_plan():
return plan_from_fixture('two_path')
@pytest.fixture
def sequence_vr(tmpdir):
base_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'resource_fixtures')
vr_tmpl_path = os.path.join(base_path, 'sequence.yaml.tmpl')
base_resource_path = os.path.join(base_path, 'data_resource')
with open(vr_tmpl_path) as f:
vr_data = f.read().format(
resource_path=base_resource_path,
idx='#{ idx }#')
vr_file = tmpdir.join('sequence.yaml')
vr_file.write(vr_data)
return str(vr_file)

View File

@ -12,14 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from celery import Celery
import random
import string
from solar.config import C
import pytest
app = Celery(
include=['solar.system_log.tasks', 'solar.orchestration.tasks'],
broker=C.celery_broker,
backend=C.celery_backend)
app.conf.update(CELERY_ACCEPT_CONTENT=['json'])
app.conf.update(CELERY_TASK_SERIALIZER='json')
@pytest.fixture
def address():
return 'ipc:///tmp/solar_test_' + ''.join(
(random.choice(string.ascii_lowercase) for x in xrange(4)))

View File

@ -0,0 +1,97 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import gevent
import pytest
from solar.core.resource import composer
from solar.dblayer.model import clear_cache
from solar import orchestration
from solar.orchestration.graph import wait_finish
from solar.orchestration.traversal import states
from solar.system_log import change
from solar.system_log import data
@pytest.fixture
def tasks_address(address):
return address + 'tasks'
@pytest.fixture
def system_log_address(address):
return address + 'system_log'
@pytest.fixture
def scheduler_address(address):
return address + 'scheduler'
@pytest.fixture
def scheduler_client(scheduler_address):
return orchestration.Client(scheduler_address)
@pytest.fixture(autouse=True)
def tasks(system_log_address, tasks_address, scheduler_address):
gevent.spawn(
orchestration.construct_tasks,
system_log_address, tasks_address, scheduler_address)
@pytest.fixture(autouse=True)
def scheduler(tasks_address, scheduler_address):
gevent.spawn(
orchestration.construct_scheduler,
tasks_address, scheduler_address)
@pytest.fixture(autouse=True)
def system_log(system_log_address):
gevent.spawn(
orchestration.construct_system_log,
system_log_address)
@pytest.fixture(autouse=True)
def resources(request, sequence_vr):
scale = request.getfuncargvalue('scale')
for idx in range(scale):
composer.create(
'sequence_%s' % idx, sequence_vr, inputs={'idx': idx})
@pytest.mark.parametrize('scale', [20])
def test_concurrent_sequences_with_no_handler(scale, scheduler_client):
total_resources = scale * 3
timeout = scale
assert len(change.stage_changes()) == total_resources
plan = change.send_to_orchestration()
scheduler_client.next({}, plan.graph['uid'])
def wait_function(timeout):
for summary in wait_finish(plan.graph['uid'], timeout):
assert summary[states.ERROR.name] == 0
time.sleep(0.5)
return summary
waiter = gevent.spawn(wait_function, timeout)
waiter.join(timeout=timeout)
assert waiter.get(block=True)[states.SUCCESS.name] == total_resources
assert len(data.CL()) == total_resources
clear_cache()
assert len(change.stage_changes()) == 0

View File

@ -0,0 +1,112 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import gevent
import pytest
from solar.dblayer.model import ModelMeta
from solar.orchestration.executors import zerorpc_executor
from solar.orchestration.workers import scheduler as wscheduler
from solar.orchestration.workers import tasks as wtasks
@pytest.fixture
def tasks_worker():
return wtasks.Tasks()
@pytest.fixture
def tasks_for_scheduler(tasks_worker, address):
address = address + 'tasks'
executor = zerorpc_executor.Executor(tasks_worker, address)
gevent.spawn(executor.run)
return zerorpc_executor.Client(address)
@pytest.fixture
def scheduler(tasks_for_scheduler, address):
address = address + 'scheduler'
worker = wscheduler.Scheduler(tasks_for_scheduler)
worker.for_all.before(lambda ctxt: ModelMeta.session_start())
worker.for_all.after(lambda ctxt: ModelMeta.session_end())
executor = zerorpc_executor.Executor(worker, address)
gevent.spawn(executor.run)
return worker, zerorpc_executor.Client(address)
@pytest.fixture(autouse=True)
def setup_scheduler_callback(scheduler, tasks_worker):
worker, client = scheduler
scheduler_client = wscheduler.SchedulerCallbackClient(
zerorpc_executor.Client(client.connect_to))
tasks_worker.for_all.on_success(scheduler_client.update)
tasks_worker.for_all.on_error(scheduler_client.update)
def _wait_scheduling(plan, wait_time, waiter, client):
client.next({}, plan.graph['uid'])
waiter = gevent.spawn(waiter)
waiter.join(timeout=wait_time)
def test_simple_fixture(simple_plan, scheduler):
worker, client = scheduler
scheduling_results = []
expected = [['echo_stuff'], ['just_fail'], []]
def register(ctxt, rst, *args, **kwargs):
scheduling_results.append(rst)
worker.for_all.on_success(register)
def _result_waiter():
while scheduling_results != expected:
time.sleep(0.1)
_wait_scheduling(simple_plan, 3, _result_waiter, client)
assert scheduling_results == expected
def test_sequential_fixture(sequential_plan, scheduler):
worker, client = scheduler
scheduling_results = set()
expected = {('s1',), ('s2',), ('s3',), ()}
def register(ctxt, rst, *args, **kwargs):
scheduling_results.add(tuple(rst))
worker.for_all.on_success(register)
def _result_waiter():
while scheduling_results != expected:
time.sleep(0.1)
_wait_scheduling(sequential_plan, 2, _result_waiter, client)
assert scheduling_results == expected
def test_two_path_fixture(two_path_plan, scheduler):
worker, client = scheduler
scheduling_results = []
expected = [{'a', 'c'}, {'a', 'c'}, {'b', 'd'}, {'b', 'd'}, {'e'}]
def register(ctxt, rst, *args, **kwargs):
if 'task_name' in ctxt:
scheduling_results.append(ctxt['task_name'])
worker.for_all.on_success(register)
def _result_waiter():
while len(scheduling_results) != len(expected):
time.sleep(0.1)
_wait_scheduling(two_path_plan, 3, _result_waiter, client)
for si, ei in zip(scheduling_results, expected):
assert any([si == e for e in ei])

View File

@ -3,16 +3,15 @@ tasks:
- uid: s1
parameters:
type: sleep
args: [2]
target: 1
args: [0.2]
target: '1'
- uid: s2
parameters:
type: sleep
args: [2]
target: 1
args: [0.2]
target: '1'
- uid: s3
parameters:
type: sleep
args: [2]
target: 1
args: [0.2]
target: '1'

View File

@ -12,23 +12,28 @@
# License for the specific language governing permissions and limitations
# under the License.
import networkx as nx
from mock import patch
from pytest import fixture
from solar.orchestration import executor
from solar.orchestration.workers import base
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', args=['t'], status='PENDING', type='echo')
ex.graph['uid'] = 'some_string'
return ex
class SubTest(base.Worker):
"""for tests."""
def pass_two(self, ctxt):
return 2
@patch.object(executor, 'app')
def test_celery_executor(mapp, dg):
"""Just check that it doesnt fail for now."""
assert executor.celery_executor(dg, ['t1'])
assert dg.node['t1']['status'] == 'INPROGRESS'
def test_subscribe_on_success():
sub = SubTest()
test = []
assert sub.pass_two.on_success(lambda ctxt, rst: test.append(rst)) is None
assert sub.pass_two({}) == 2
assert test == [2]
def test_subscribe_for_all():
sub = SubTest()
test = []
sub.for_all.after(lambda ctxt: test.append('after'))
sub.for_all.before(lambda ctxt: test.append('before'))
sub.pass_two({})
assert test == ['before', 'after']

View File

@ -0,0 +1,10 @@
id: data_resources
handler: none
version: 1.0.0
input:
key1:
schema: str!
value:
key2:
schema: str!
value:

View File

@ -0,0 +1,17 @@
id: data_resources_{idx}
resources:
- id: data_resource_1_{idx}
from: {resource_path}
input:
key1: key1
key2: key2
- id: data_resource_2_{idx}
from: {resource_path}
input:
key1: data_resource_1_{idx}::key1
key2: data_resource_1_{idx}::key2
- id: data_resource_3_{idx}
from: {resource_path}
input:
key1: data_resource_2_{idx}::key1
key2: data_resource_2_{idx}::key2

View File

@ -10,7 +10,3 @@ os-testr
## for computable inputs
# temporary disabled
# lupa
# to test if everything works on gevent
gevent