Implement db based lock mechanism for orchestration

Current change allows us to use single worker for stateful
and stateless tasks, and add more workers in case of scaling issues.

Patch covers:
- locking with riak and n_val=1
- riak strongly consistent buckets (riak ensemble)
- sqlite based lock

To enable locking with riak ensemble solar config should be modified:
solar_db: riak://10.0.0.3:18087
riak_ensemble: True
lock_bucket_type: <<bucket_name>>

Change-Id: I5afddb35a944ff75efdf1159ad037536a9d8bbfb
This commit is contained in:
Dmitry Shulyak 2015-12-09 15:08:54 +02:00
parent 026c44445c
commit 4b1374577a
18 changed files with 394 additions and 32 deletions

View File

@ -3,3 +3,5 @@ celery_broker: sqla+sqlite:////tmp/celery.db
celery_backend: db+sqlite:////tmp/celery.db
solar_db: sqlite:////tmp/solar.db
# solar_db: riak://10.0.0.2:8087
riak_ensemble: False
lock_bucket_type: ''

View File

@ -10,6 +10,7 @@ ADD run.sh /run.sh
RUN apt-get update && apt-get upgrade -y
RUN apt-get install -y python python-dev python-distribute python-pip openssh-client rsync libyaml-dev vim libffi-dev libssl-dev git sshpass
RUN pip install ansible
RUN pip install gevent
RUN pip install git+https://github.com/Mirantis/solar.git
RUN pip install git+https://github.com/Mirantis/solar-agent.git

View File

@ -10,6 +10,13 @@ solar-celery:
- ./bootstrap/playbooks/celery.yaml:/celery.yaml
- /var/lib/solar/repositories:/var/lib/solar/repositories
- /.solar_config_override:/.solar_config_override
- ./run.sh:/run.sh
- ./.config:~/.config
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
- RIAK_HOST=riak
- RIAK_PORT=8087
# links are not used for configuration because we can rely on non-container
# based datastores
links:

5
run.sh
View File

@ -5,7 +5,4 @@ if [ -d /solar ]; then
cd /solar && python setup.py develop
fi
#used only to start celery on docker
ansible-playbook -v -i "localhost," -c local /celery.yaml --skip-tags install
tail -f /var/run/celery/*.log
celery worker -A solar.orchestration.runner -P gevent -c 1000 -Q system_log,celery,scheduler

View File

@ -20,6 +20,7 @@ import click
from solar.cli.uids_history import remember_uid
from solar.cli.uids_history import SOLARUID
from solar.dblayer.locking import Lock
from solar import errors
from solar.orchestration import filters
from solar.orchestration import graph
@ -190,3 +191,10 @@ def dg(uid, start, end):
@click.argument('uid', type=SOLARUID)
def show(uid):
click.echo(graph.show(uid))
@orchestration.command(name='release-lock')
@click.argument('uid', type=SOLARUID)
def release_lock(uid):
"""Use if worker was killed, and lock wasnt released properly. """
Lock._release(uid)

View File

@ -22,9 +22,12 @@ import yaml
CWD = os.getcwd()
C = Bunch(solar_db="")
C.celery_broker = 'sqla+sqlite:////tmp/celery.db'
C.celery_backend = 'db+sqlite:////tmp/celery.db'
C.riak_ensemble = False
C.lock_bucket_type = None
def _lookup_vals(setter, config, prefix=None):
@ -69,6 +72,8 @@ def from_configs():
def _setter(config, path):
vals = data
for key in path:
if key not in vals:
return
vals = vals[key]
config[path[-1]] = vals
if data:

View File

@ -15,6 +15,10 @@
from collections import Counter
class SiblingsError(RuntimeError):
pass
def naive_resolver(riak_object):
# for now we support deleted vs existing object
siblings = riak_object.siblings
@ -23,11 +27,11 @@ def naive_resolver(riak_object):
siblings_len.sort()
c = Counter((x[0] for x in siblings_len))
if len(c) > 2:
raise RuntimeError(
raise SiblingsError(
"Too many different siblings, not sure what to do with siblings")
if 0 not in c:
raise RuntimeError("No empty object for resolution"
" not sure what to do with siblings")
raise SiblingsError("No empty object for resolution"
" not sure what to do with siblings")
selected = max(siblings_len)
# TODO: pass info to obj save_lazy too
riak_object.siblings = [selected[1]]

View File

@ -42,3 +42,7 @@ def solar_map(funct, args, concurrency=5):
def get_local():
from solar.dblayer.gevent_local import local
return local
def get_current_ident():
return id(gevent.getcurrent())

View File

@ -28,6 +28,7 @@ def patch_all():
from solar.dblayer.model import Model
from solar.dblayer.gevent_helpers import get_current_ident
from solar.dblayer.gevent_helpers import get_local
from solar.dblayer.gevent_helpers import multi_get
from solar.dblayer.gevent_helpers import solar_map
@ -40,4 +41,5 @@ def patch_all():
_patch(utils, 'solar_map', solar_map)
_patch(utils, 'get_local', get_local)
_patch(utils, 'get_current_ident', get_current_ident)
_patch(Model, '_local', get_local()())

163
solar/dblayer/locking.py Normal file
View File

@ -0,0 +1,163 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import peewee
from riak import RiakError
from solar.config import C
from solar.core.log import log
from solar.dblayer.conflict_resolution import SiblingsError
from solar.dblayer.model import DBLayerNotFound
from solar.dblayer.solar_models import Lock as DBLock
class _Lock(object):
def __init__(self, uid, identity, retries=0, wait=1):
"""Storage based lock mechanism
:param uid: target of lock
:param identity: unit of concurrency
:param retries: retries of acquiring lock
:param wait: sleep between retries
"""
self.uid = uid
self.identity = identity
self.retries = retries
self.wait = wait
@classmethod
def _acquire(cls, uid, identity):
raise NotImplemented(
'Different strategies for handling collisions')
@classmethod
def _release(cls, uid):
lk = DBLock.get(uid)
log.debug('Release lock %s with %s', uid, lk.identity)
lk.delete()
def __enter__(self):
lk = self._acquire(self.uid, self.identity)
if lk.identity != self.identity:
log.debug(
'Lock %s acquired by another identity %s != %s',
self.uid, self.identity, lk.identity)
while self.retries:
del DBLock._c.obj_cache[lk.key]
time.sleep(self.wait)
lk = self._acquire(self.uid, self.identity)
self.retries -= 1
if lk.identity == self.identity:
break
else:
if lk.identity != self.identity:
raise RuntimeError(
'Failed to acquire {},'
' owned by identity {}'.format(
lk.key, lk.identity))
log.debug('Lock for %s acquired by %s', self.uid, self.identity)
def __exit__(self, *err):
self._release(self.uid)
class RiakLock(_Lock):
@classmethod
def _acquire(cls, uid, identity):
try:
try:
lk = DBLock.get(uid)
log.debug(
'Found lock with UID %s, owned by %s, owner %r',
uid, lk.identity, lk.identity == identity)
except DBLayerNotFound:
log.debug(
'Create lock UID %s for %s', uid, identity)
lk = DBLock.from_dict(uid, {'identity': identity})
lk.save(force=True)
except SiblingsError:
log.debug(
'Race condition for lock with UID %s, among %r',
uid,
[s.data.get('identity') for s in lk._riak_object.siblings])
siblings = []
for s in lk._riak_object.siblings:
if s.data.get('identity') != identity:
siblings.append(s)
lk._riak_object.siblings = siblings
lk.save()
return lk
class SQLiteLock(_Lock):
@classmethod
def _acquire(cls, uid, identity):
"""It is hard to properly handle concurrent updates
using sqlite backend.
INSERT only should maitain integrity of
primary keys and therefore will raise proper exception
"""
try:
lk = DBLock.get(uid)
log.debug(
'Found lock with UID %s, owned by %s, owner %r',
uid, lk.identity, lk.identity == identity)
return lk
except DBLayerNotFound:
log.debug(
'Create lock UID %s for %s', uid, identity)
lk = DBLock.from_dict(uid, {'identity': identity})
try:
lk.save(force=True, force_insert=True)
except peewee.IntegrityError:
log.debug('Lock was acquired by another thread')
return DBLock.get(uid)
return lk
class RiakEnsembleLock(_Lock):
@classmethod
def _acquire(cls, uid, identity):
try:
log.debug(
'Create lock UID %s for %s', uid, identity)
lk = DBLock.from_dict(uid, {'identity': identity})
lk.save(force=True)
return lk
except RiakError as exc:
# TODO object shouldnt be cached before successfull save
del DBLock._c.obj_cache[lk.key]
# check documentation for error message
# http://docs.basho.com/riak/latest/dev/advanced/strong-consistency/#Error-Messages
if 'failed' in str(exc):
lk = DBLock.get(uid)
log.debug('Lock %s already acquired by %s', uid, lk.identity)
return lk
raise
if 'sqlite' in C.solar_db:
Lock = SQLiteLock
elif 'riak' in C.solar_db:
if C.riak_ensemble:
Lock = RiakEnsembleLock
else:
Lock = RiakLock

View File

@ -135,7 +135,14 @@ def clear_cache():
def get_bucket(_, owner, mcs):
name = owner.get_bucket_name()
bucket = mcs.riak_client.bucket(name)
if owner.bucket_type:
bucket_type = mcs.riak_client.bucket_type(owner.bucket_type)
else:
# using default bucket type
bucket_type = mcs.riak_client
bucket = bucket_type.bucket(name)
if owner.bucket_properties:
bucket.set_properties(owner.bucket_properties)
bucket.resolver = dblayer_conflict_resolver
return bucket
@ -730,9 +737,11 @@ class Model(object):
_changed = False
_local = get_local()()
_lock = RLock() # for class objs
bucket_properties = {}
bucket_type = None
def __init__(self, key=None):
self._modified_fields = set()
# TODO: that _indexes_changed should be smarter
@ -910,10 +919,10 @@ class Model(object):
cls._c.lazy_save.clear()
@clears_state_for('index')
def save(self, force=False):
def save(self, force=False, **kwargs):
with self._lock:
if self.changed() or force or self._new:
res = self._riak_object.store()
res = self._riak_object.store(**kwargs)
self._reset_state()
return res
else:

View File

@ -15,6 +15,7 @@
import time
from riak import RiakClient as OrigRiakClient
from riak import RiakError
from solar.dblayer.model import clear_cache
@ -30,9 +31,15 @@ class RiakClient(OrigRiakClient):
def delete_all(self, cls):
for _ in xrange(10):
# riak dislikes deletes without dvv
rst = cls.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
try:
rst = cls.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
except RiakError as exc:
if 'indexes_not_supported' in str(exc):
rst = cls.bucket.get_keys()
else:
raise
for key in rst:
cls.bucket.delete(key)
else:

View File

@ -20,8 +20,10 @@ from types import NoneType
from uuid import uuid4
from enum import Enum
from solar.computable_inputs import ComputablePassedTypes
from solar.computable_inputs.processor import get_processor
from solar.config import C
from solar.dblayer.model import check_state_for
from solar.dblayer.model import CompositeIndexField
from solar.dblayer.model import DBLayerException
@ -1114,3 +1116,13 @@ class LogItem(Model):
vals['uid'] = cls.uid.default
vals.update(data)
return LogItem.from_dict(vals['uid'], vals)
class Lock(Model):
bucket_properties = {
'backend': 'lock_bitcask_mult',
}
bucket_type = C.lock_bucket_type
identity = Field(basestring)

View File

@ -187,10 +187,10 @@ class RiakObj(object):
self.indexes.difference_update(to_rem)
return self
def store(self, return_body=True):
def store(self, return_body=True, **kwargs):
self.vclock = uuid.uuid4().hex
assert self._sql_bucket_obj is not None
self._sql_bucket_obj.save()
self._sql_bucket_obj.save(kwargs)
self._save_indexes()
return self
@ -290,6 +290,9 @@ class Bucket(object):
def get_properties(self):
return {'search_index': False}
def set_properties(self, properties):
"""Required for complience with riak bucket."""
def get(self, key):
try:
ret = self._sql_model.get(self._sql_model.key == key)

View File

@ -13,12 +13,17 @@
# under the License.
from __future__ import print_function
import pytest
from solar.config import C
from solar.dblayer.conflict_resolution import SiblingsError
from solar.dblayer.model import check_state_for
from solar.dblayer.model import clear_cache
from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import DBLayerSolarException
from solar.dblayer.solar_models import InputAlreadyExists
from solar.dblayer.solar_models import Lock
from solar.dblayer.solar_models import Resource
from solar.dblayer.solar_models import UnknownInput
@ -728,3 +733,53 @@ def test_remove_input(rk):
with pytest.raises(DBLayerSolarException):
r1.inputs.remove_existing('b')
@pytest.mark.skipif(
not ('riak' in C.solar_db and not C.riak_ensemble),
reason=('Siblings error on write is expected'
' only with n_val=1 and 1 node installation'))
def test_return_siblings_on_write(rk):
pytest.importorskip('riak')
uid = next(rk)
lock = Lock.from_dict(uid, {'identity': uid})
lock.save()
clear_cache()
with pytest.raises(SiblingsError):
lock1 = Lock.from_dict(uid, {'identity': uid})
lock1.save()
s1, s2 = lock1._riak_object.siblings
assert s1.data == s2.data
@pytest.mark.skipif(
not ('riak' in C.solar_db and C.riak_ensemble),
reason='On update without turned on ensemble riak wont raise RiakError')
def test_raise_riak_error_on_incorrect_update(rk):
riak = pytest.importorskip('riak')
uid = next(rk)
lock = Lock.from_dict(uid, {'identity': uid})
lock.save()
clear_cache()
with pytest.raises(riak.RiakError):
lock1 = Lock.from_dict(uid, {'identity': uid})
lock1.save()
@pytest.mark.skipif(
'sqlite' not in C.solar_db,
reason='Force insert wont be used by other backends')
def test_non_unique_key(rk):
peewee = pytest.importorskip('peewee')
uid = next(rk)
lock = Lock.from_dict(uid, {'identity': '1'})
lock.save(force_insert=True)
clear_cache()
lock1 = Lock.from_dict(uid, {'identity': '2'})
with pytest.raises(peewee.IntegrityError):
lock1.save(force_insert=True)

View File

@ -22,6 +22,7 @@ from celery.signals import task_postrun
from celery.signals import task_prerun
from solar.core import actions
from solar.core.log import log
from solar.core import resource
from solar.dblayer import ModelMeta
from solar.orchestration import executor
@ -32,6 +33,9 @@ from solar.orchestration.traversal import traverse
from solar.system_log.tasks import commit_logitem
from solar.system_log.tasks import error_logitem
from solar.dblayer.locking import Lock
from solar.utils import get_current_ident
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']
@ -70,6 +74,8 @@ def end_solar_session(task_id, task, *args, **kwargs):
@report_task(name='solar_resource')
def solar_resource(ctxt, resource_name, action):
log.debug('TASK solar resource NAME %s ACTION %s',
resource_name, action)
res = resource.load(resource_name)
return actions.resource_action(res, action)
@ -132,12 +138,13 @@ def anchor(ctxt, *args):
def schedule(plan_uid, dg):
tasks = traverse(dg)
limit_chain = limits.get_default_chain(
filtered_tasks = list(limits.get_default_chain(
dg,
[t for t in dg if dg.node[t]['status'] == 'INPROGRESS'],
tasks)
tasks))
log.debug('Schedule next tasks %r', filtered_tasks)
execution = executor.celery_executor(
dg, limit_chain, control_tasks=('fault_tolerance',))
dg, filtered_tasks, control_tasks=('fault_tolerance',))
graph.update_graph(dg)
execution()
@ -149,25 +156,28 @@ def schedule_start(plan_uid):
- find successors that should be executed
- apply different policies to tasks
"""
dg = graph.get_graph(plan_uid)
schedule(plan_uid, dg)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
schedule(plan_uid, dg)
@app.task(name='soft_stop')
def soft_stop(plan_uid):
dg = graph.get_graph(plan_uid)
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.update_graph(dg)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.update_graph(dg)
@app.task(name='schedule_next')
def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg
dg.node[task_name]['end_time'] = time.time()
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid)
dg.node[task_name]['status'] = status
dg.node[task_name]['errmsg'] = errmsg
dg.node[task_name]['end_time'] = time.time()
schedule(plan_uid, dg)
schedule(plan_uid, dg)

69
solar/test/test_lock.py Normal file
View File

@ -0,0 +1,69 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
import pytest
from solar.dblayer.locking import Lock
from solar.dblayer.model import clear_cache
def test_acquire_release_logic():
uid = '2131'
first = '1111'
second = '2222'
assert Lock._acquire(uid, first).identity == first
clear_cache()
assert Lock._acquire(uid, second).identity == first
Lock._release(uid)
assert Lock._acquire(uid, second).identity == second
def test_lock_acquired_released():
uid = '11'
with Lock(uid, uid):
clear_cache()
assert Lock._acquire(uid, '12').identity == '11'
assert Lock._acquire(uid, '12').identity == '12'
def test_raise_error_if_acquired():
uid = '11'
Lock._acquire(uid, '12')
clear_cache()
with pytest.raises(RuntimeError):
with Lock(uid, '13'):
assert True
@patch('solar.dblayer.locking.time.sleep')
def test_time_sleep_called(msleep):
uid = '11'
Lock._acquire(uid, '12')
clear_cache()
sleep_time = 5
with pytest.raises(RuntimeError):
with Lock(uid, '13', 1, sleep_time):
assert True
msleep.assert_called_once_with(sleep_time)
def test_lock_released_exception():
uid = '11'
with pytest.raises(Exception):
with Lock(uid, uid):
raise Exception
new_lock = Lock._acquire(uid, '12')
assert new_lock.identity == '12'

View File

@ -19,6 +19,7 @@ import logging
import os
import re
import subprocess
import threading
import urlparse
import uuid
@ -139,7 +140,6 @@ def solar_map(funct, args, **kwargs):
def get_local():
import threading
return threading.local
@ -194,3 +194,7 @@ def detect_input_schema_by_value(value):
return _types['list']
if isinstance(value, dict):
return _types['hash']
def get_current_ident():
return threading.currentThread().ident