Merge "CRDTish lock to avoid concurrent update/delete"
This commit is contained in:
commit
cd809e8e83
@ -20,7 +20,7 @@ import click
|
||||
|
||||
from solar.cli.uids_history import remember_uid
|
||||
from solar.cli.uids_history import SOLARUID
|
||||
from solar.dblayer.locking import Lock
|
||||
from solar.dblayer.locking import DBLock
|
||||
from solar import errors
|
||||
from solar.orchestration import filters
|
||||
from solar.orchestration import graph
|
||||
@ -197,4 +197,6 @@ def show(uid):
|
||||
@click.argument('uid', type=SOLARUID)
|
||||
def release_lock(uid):
|
||||
"""Use if worker was killed, and lock wasnt released properly. """
|
||||
Lock._release(uid)
|
||||
lock = DBLock.get(uid)
|
||||
lock.delete()
|
||||
return True
|
||||
|
@ -29,12 +29,20 @@ def naive_resolver(riak_object):
|
||||
if len(c) > 2:
|
||||
raise SiblingsError(
|
||||
"Too many different siblings, not sure what to do with siblings")
|
||||
|
||||
# both are the same
|
||||
if siblings[0]._get_encoded_data() == siblings[1]._get_encoded_data():
|
||||
riak_object.siblings = [siblings[0]]
|
||||
return
|
||||
|
||||
if 0 not in c:
|
||||
sdata = map(lambda x: x._get_encoded_data(), siblings)
|
||||
raise SiblingsError("No empty object for resolution"
|
||||
" not sure what to do with siblings")
|
||||
selected = max(siblings_len)
|
||||
# TODO: pass info to obj save_lazy too
|
||||
riak_object.siblings = [selected[1]]
|
||||
" not sure what to do with siblings: %r" % sdata)
|
||||
else:
|
||||
selected = max(siblings_len)
|
||||
# TODO: pass info to obj save_lazy too
|
||||
riak_object.siblings = [selected[1]]
|
||||
|
||||
|
||||
dblayer_conflict_resolver = naive_resolver
|
||||
|
@ -19,16 +19,15 @@ from solar.utils import parse_database_conn
|
||||
|
||||
_connection, _connection_details = parse_database_conn(C.solar_db)
|
||||
|
||||
if _connection.mode == 'sqlite':
|
||||
import peewee
|
||||
elif _connection.mode == 'riak':
|
||||
if _connection.mode == 'riak':
|
||||
from riak import RiakError
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.dblayer.conflict_resolution import SiblingsError
|
||||
from solar.dblayer.model import DBLayerNotFound
|
||||
from solar.dblayer.solar_models import Lock as DBLock
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class _Lock(object):
|
||||
|
||||
@ -45,6 +44,7 @@ class _Lock(object):
|
||||
self.identity = identity
|
||||
self.retries = retries
|
||||
self.wait = wait
|
||||
self.stamp = str(uuid4())
|
||||
|
||||
@classmethod
|
||||
def _acquire(cls, uid, identity):
|
||||
@ -52,96 +52,97 @@ class _Lock(object):
|
||||
'Different strategies for handling collisions')
|
||||
|
||||
@classmethod
|
||||
def _release(cls, uid):
|
||||
def _release(cls, uid, identity):
|
||||
lk = DBLock.get(uid)
|
||||
log.debug('Release lock %s with %s', uid, lk.identity)
|
||||
log.debug('Release lock %s with %s', uid, identity)
|
||||
lk.delete()
|
||||
|
||||
def __enter__(self):
|
||||
lk = self._acquire(self.uid, self.identity)
|
||||
if lk.identity != self.identity:
|
||||
lk = self._acquire(self.uid, self.identity, self.stamp)
|
||||
if not lk.am_i_locking(self.identity):
|
||||
log.debug(
|
||||
'Lock %s acquired by another identity %s != %s',
|
||||
self.uid, self.identity, lk.identity)
|
||||
self.uid, self.identity, lk.who_is_locking())
|
||||
while self.retries:
|
||||
del DBLock._c.obj_cache[lk.key]
|
||||
time.sleep(self.wait)
|
||||
lk = self._acquire(self.uid, self.identity)
|
||||
lk = self._acquire(self.uid, self.identity, self.stamp)
|
||||
self.retries -= 1
|
||||
if lk.identity == self.identity:
|
||||
if lk.am_i_locking(self.identity):
|
||||
break
|
||||
else:
|
||||
# reset stamp mark
|
||||
self.stamp = str(uuid4())
|
||||
else:
|
||||
if lk.identity != self.identity:
|
||||
if not lk.am_i_locking(self.identity):
|
||||
raise RuntimeError(
|
||||
'Failed to acquire {},'
|
||||
' owned by identity {}'.format(
|
||||
lk.key, lk.identity))
|
||||
lk.key, lk.who_is_locking()))
|
||||
log.debug('Lock for %s acquired by %s', self.uid, self.identity)
|
||||
|
||||
def __exit__(self, *err):
|
||||
self._release(self.uid)
|
||||
self._release(self.uid, self.identity, self.stamp)
|
||||
|
||||
|
||||
class RiakLock(_Lock):
|
||||
class _CRDTishLock(_Lock):
|
||||
|
||||
@classmethod
|
||||
def _acquire(cls, uid, identity):
|
||||
def _release(cls, uid, identity, stamp):
|
||||
log.debug("Release lock %s with %s", uid, identity)
|
||||
lk = DBLock.get(uid)
|
||||
lk.change_locking_state(identity, -1, stamp)
|
||||
lk.save(force=True)
|
||||
|
||||
@classmethod
|
||||
def _acquire(cls, uid, identity, stamp):
|
||||
try:
|
||||
try:
|
||||
lk = DBLock.get(uid)
|
||||
log.debug(
|
||||
'Found lock with UID %s, owned by %s, owner %r',
|
||||
uid, lk.identity, lk.identity == identity)
|
||||
except DBLayerNotFound:
|
||||
log.debug(
|
||||
'Create lock UID %s for %s', uid, identity)
|
||||
lk = DBLock.from_dict(uid, {'identity': identity})
|
||||
lk.save(force=True)
|
||||
except SiblingsError:
|
||||
log.debug(
|
||||
'Race condition for lock with UID %s, among %r',
|
||||
uid,
|
||||
[s.data.get('identity') for s in lk._riak_object.siblings])
|
||||
siblings = []
|
||||
for s in lk._riak_object.siblings:
|
||||
if s.data.get('identity') != identity:
|
||||
siblings.append(s)
|
||||
lk._riak_object.siblings = siblings
|
||||
lk.save()
|
||||
return lk
|
||||
|
||||
|
||||
class SQLiteLock(_Lock):
|
||||
|
||||
@classmethod
|
||||
def _acquire(cls, uid, identity):
|
||||
"""It is hard to properly handle concurrent updates
|
||||
using sqlite backend.
|
||||
INSERT only should maitain integrity of
|
||||
primary keys and therefore will raise proper exception
|
||||
"""
|
||||
del DBLock._c.obj_cache[uid]
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
lk = DBLock.get(uid)
|
||||
log.debug(
|
||||
'Found lock with UID %s, owned by %s, owner %r',
|
||||
uid, lk.identity, lk.identity == identity)
|
||||
return lk
|
||||
except DBLayerNotFound:
|
||||
log.debug(
|
||||
'Create lock UID %s for %s', uid, identity)
|
||||
lk = DBLock.from_dict(uid, {'identity': identity})
|
||||
try:
|
||||
lk.save(force=True, force_insert=True)
|
||||
except peewee.IntegrityError:
|
||||
log.debug('Lock was acquired by another thread')
|
||||
return DBLock.get(uid)
|
||||
lk = DBLock.from_dict(uid, {})
|
||||
lk.change_locking_state(identity, 1, stamp)
|
||||
lk.save(force=True)
|
||||
else:
|
||||
locking = lk.who_is_locking()
|
||||
if locking is not None:
|
||||
log.debug(
|
||||
'Found lock with UID %s, owned by %s, owner %r',
|
||||
uid, locking, lk.am_i_locking(identity))
|
||||
return lk
|
||||
else:
|
||||
log.debug(
|
||||
'Create lock UID %s for %s', uid, identity)
|
||||
lk.change_locking_state(identity, 1, stamp)
|
||||
lk.save(force=True)
|
||||
del DBLock._c.obj_cache[lk.key]
|
||||
lk = DBLock.get(uid)
|
||||
locking = lk.who_is_locking()
|
||||
if locking is not None and identity != locking:
|
||||
if [identity, 1, stamp] in lk.lockers:
|
||||
lk.change_locking_state(identity, -1, stamp)
|
||||
lk.save(force=True)
|
||||
log.debug("I was not locking, so removing me %s" % identity)
|
||||
return lk
|
||||
|
||||
|
||||
class RiakLock(_CRDTishLock):
|
||||
pass
|
||||
|
||||
|
||||
class SQLiteLock(_CRDTishLock):
|
||||
pass
|
||||
|
||||
|
||||
class RiakEnsembleLock(_Lock):
|
||||
|
||||
@classmethod
|
||||
def _acquire(cls, uid, identity):
|
||||
def _acquire(cls, uid, identity, stamp):
|
||||
try:
|
||||
log.debug(
|
||||
'Create lock UID %s for %s', uid, identity)
|
||||
|
@ -143,7 +143,10 @@ def get_bucket(_, owner, mcs):
|
||||
bucket = bucket_type.bucket(name)
|
||||
if owner.bucket_properties:
|
||||
bucket.set_properties(owner.bucket_properties)
|
||||
bucket.resolver = dblayer_conflict_resolver
|
||||
if getattr(owner, 'conflict_resolver', None):
|
||||
bucket.resolver = owner.conflict_resolver
|
||||
else:
|
||||
bucket.resolver = dblayer_conflict_resolver
|
||||
return bucket
|
||||
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
|
||||
from collections import defaultdict
|
||||
from itertools import chain
|
||||
from operator import itemgetter
|
||||
from types import NoneType
|
||||
from uuid import uuid4
|
||||
@ -1155,3 +1156,92 @@ class Lock(Model):
|
||||
bucket_type = C.lock_bucket_type
|
||||
|
||||
identity = Field(basestring)
|
||||
lockers = Field(list, default=list)
|
||||
|
||||
@classmethod
|
||||
def _reduce(cls, lockers):
|
||||
# TODO: (jnowak) we could remove not needed lockers there
|
||||
# not needed means already replaced by other lock.
|
||||
_s = set()
|
||||
for x in lockers:
|
||||
_s.add(tuple(x))
|
||||
res = [list(x) for x in _s]
|
||||
return res
|
||||
|
||||
def sum_all(self):
|
||||
reduced = self.reduce()
|
||||
_pos = defaultdict(int)
|
||||
_neg = defaultdict(int)
|
||||
for locker, val, stamp in reduced:
|
||||
k = (locker, stamp)
|
||||
if val < 0:
|
||||
if k in _pos:
|
||||
del _pos[k]
|
||||
else:
|
||||
_neg[k] = -1
|
||||
elif val > 0:
|
||||
if k in _neg:
|
||||
del _neg[k]
|
||||
else:
|
||||
_pos[k] = 1
|
||||
# TODO: (jnowak) consider discard all orphaned releases
|
||||
# # key_diff = set(_neg.keys()) - set(_pos.keys())
|
||||
# # for k in key_diff:
|
||||
# # del _neg[k]
|
||||
return {locker: val for ((locker, stamp), val) in chain(
|
||||
_pos.items(),
|
||||
_neg.items()
|
||||
)}
|
||||
|
||||
def reduce(self):
|
||||
lockers = self.lockers
|
||||
self.lockers = self._reduce(lockers)
|
||||
return self.lockers
|
||||
|
||||
def am_i_locking(self, uid):
|
||||
return self.who_is_locking() == uid
|
||||
|
||||
def who_is_locking(self):
|
||||
try:
|
||||
if self.identity:
|
||||
return self.identity
|
||||
return None
|
||||
except KeyError:
|
||||
summed = self.sum_all()
|
||||
if not summed:
|
||||
return None
|
||||
to_max = sorted([(v, k) for (k, v) in summed.items()])[-1]
|
||||
if to_max[0] > 0:
|
||||
return to_max[1]
|
||||
return None
|
||||
|
||||
def change_locking_state(self, uid, value, stamp):
|
||||
try:
|
||||
if self.identity:
|
||||
if value:
|
||||
self.identity = uid
|
||||
else:
|
||||
raise Exception("Unsupported operation, to release "
|
||||
"this lock you need to delete it.")
|
||||
return True
|
||||
except KeyError:
|
||||
self.lockers.append([uid, value, stamp])
|
||||
self.reduce()
|
||||
return True
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.reduce()
|
||||
super(Lock, self).save(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def conflict_resolver(riak_object):
|
||||
siblings = riak_object.siblings
|
||||
sdatas = map(lambda x: x.data.get('lockers', []), siblings)
|
||||
l = []
|
||||
for data in sdatas:
|
||||
l.extend(data)
|
||||
reduced = Lock._reduce(l)
|
||||
first_sibling = siblings[0]
|
||||
first_sibling.data['lockers'] = reduced
|
||||
riak_object.siblings = [first_sibling]
|
||||
# del Lock._c.obj_cache[riak_object.key]
|
||||
|
@ -17,30 +17,31 @@ import pytest
|
||||
|
||||
from solar.dblayer.locking import Lock
|
||||
from solar.dblayer.model import clear_cache
|
||||
from solar.dblayer.solar_models import Lock as DBLock
|
||||
|
||||
|
||||
def test_acquire_release_logic():
|
||||
uid = '2131'
|
||||
first = '1111'
|
||||
second = '2222'
|
||||
assert Lock._acquire(uid, first).identity == first
|
||||
assert Lock._acquire(uid, first, 'a').who_is_locking() == first
|
||||
clear_cache()
|
||||
assert Lock._acquire(uid, second).identity == first
|
||||
Lock._release(uid)
|
||||
assert Lock._acquire(uid, second).identity == second
|
||||
assert Lock._acquire(uid, second, 'a').who_is_locking() == first
|
||||
Lock._release(uid, first, 'a')
|
||||
assert Lock._acquire(uid, second, 'a').who_is_locking() == second
|
||||
|
||||
|
||||
def test_lock_acquired_released():
|
||||
uid = '11'
|
||||
with Lock(uid, uid):
|
||||
clear_cache()
|
||||
assert Lock._acquire(uid, '12').identity == '11'
|
||||
assert Lock._acquire(uid, '12').identity == '12'
|
||||
assert Lock._acquire(uid, '12', 'a').who_is_locking() == '11'
|
||||
assert Lock._acquire(uid, '12', 'a').who_is_locking() == '12'
|
||||
|
||||
|
||||
def test_raise_error_if_acquired():
|
||||
uid = '11'
|
||||
Lock._acquire(uid, '12')
|
||||
Lock._acquire(uid, '12', 'a')
|
||||
clear_cache()
|
||||
with pytest.raises(RuntimeError):
|
||||
with Lock(uid, '13'):
|
||||
@ -50,7 +51,7 @@ def test_raise_error_if_acquired():
|
||||
@patch('solar.dblayer.locking.time.sleep')
|
||||
def test_time_sleep_called(msleep):
|
||||
uid = '11'
|
||||
Lock._acquire(uid, '12')
|
||||
Lock._acquire(uid, '12', 'a')
|
||||
clear_cache()
|
||||
sleep_time = 5
|
||||
with pytest.raises(RuntimeError):
|
||||
@ -65,5 +66,15 @@ def test_lock_released_exception():
|
||||
with Lock(uid, uid):
|
||||
raise Exception
|
||||
|
||||
new_lock = Lock._acquire(uid, '12')
|
||||
assert new_lock.identity == '12'
|
||||
new_lock = Lock._acquire(uid, '12', 'a')
|
||||
assert new_lock.who_is_locking() == '12'
|
||||
|
||||
|
||||
def test_locker_logic():
|
||||
uid = '11'
|
||||
l = DBLock.from_dict(uid, {})
|
||||
|
||||
l.lockers = [['a', -1, 'x'], ['a', 1, 'y'], ['b', 1, 'z']]
|
||||
l.reduce()
|
||||
assert l.am_i_locking('b')
|
||||
l.who_is_locking() == 'b'
|
||||
|
Loading…
x
Reference in New Issue
Block a user