From 4773a7735a8b802fabc084323734bdcd2861aea0 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 18 Jan 2016 13:22:44 +0100 Subject: [PATCH] CRDTish lock to avoid concurrent update/delete - Lock Acquire means +1 - Lock Release means -1 - Lock state is based on list of items, each item contains info about identity, change (-1 or 1) and uid, final state is reduced sum of these - custom sibling resolution method added. Change-Id: I81de44db3fff5c9d77bee563e80ffd833f7fae3f --- solar/cli/orch.py | 6 +- solar/dblayer/conflict_resolution.py | 16 +++- solar/dblayer/locking.py | 121 ++++++++++++++------------- solar/dblayer/model.py | 5 +- solar/dblayer/solar_models.py | 90 ++++++++++++++++++++ solar/test/test_lock.py | 31 ++++--- 6 files changed, 192 insertions(+), 77 deletions(-) diff --git a/solar/cli/orch.py b/solar/cli/orch.py index 19e15060..0b428e5b 100755 --- a/solar/cli/orch.py +++ b/solar/cli/orch.py @@ -20,7 +20,7 @@ import click from solar.cli.uids_history import remember_uid from solar.cli.uids_history import SOLARUID -from solar.dblayer.locking import Lock +from solar.dblayer.locking import DBLock from solar import errors from solar.orchestration import filters from solar.orchestration import graph @@ -197,4 +197,6 @@ def show(uid): @click.argument('uid', type=SOLARUID) def release_lock(uid): """Use if worker was killed, and lock wasnt released properly. """ - Lock._release(uid) + lock = DBLock.get(uid) + lock.delete() + return True diff --git a/solar/dblayer/conflict_resolution.py b/solar/dblayer/conflict_resolution.py index bcf8a3cb..d8c29c80 100644 --- a/solar/dblayer/conflict_resolution.py +++ b/solar/dblayer/conflict_resolution.py @@ -29,12 +29,20 @@ def naive_resolver(riak_object): if len(c) > 2: raise SiblingsError( "Too many different siblings, not sure what to do with siblings") + + # both are the same + if siblings[0]._get_encoded_data() == siblings[1]._get_encoded_data(): + riak_object.siblings = [siblings[0]] + return + if 0 not in c: + sdata = map(lambda x: x._get_encoded_data(), siblings) raise SiblingsError("No empty object for resolution" - " not sure what to do with siblings") - selected = max(siblings_len) - # TODO: pass info to obj save_lazy too - riak_object.siblings = [selected[1]] + " not sure what to do with siblings: %r" % sdata) + else: + selected = max(siblings_len) + # TODO: pass info to obj save_lazy too + riak_object.siblings = [selected[1]] dblayer_conflict_resolver = naive_resolver diff --git a/solar/dblayer/locking.py b/solar/dblayer/locking.py index 3ba735ba..d2dde7ca 100644 --- a/solar/dblayer/locking.py +++ b/solar/dblayer/locking.py @@ -19,16 +19,15 @@ from solar.utils import parse_database_conn _connection, _connection_details = parse_database_conn(C.solar_db) -if _connection.mode == 'sqlite': - import peewee -elif _connection.mode == 'riak': +if _connection.mode == 'riak': from riak import RiakError from solar.core.log import log -from solar.dblayer.conflict_resolution import SiblingsError from solar.dblayer.model import DBLayerNotFound from solar.dblayer.solar_models import Lock as DBLock +from uuid import uuid4 + class _Lock(object): @@ -45,6 +44,7 @@ class _Lock(object): self.identity = identity self.retries = retries self.wait = wait + self.stamp = str(uuid4()) @classmethod def _acquire(cls, uid, identity): @@ -52,96 +52,97 @@ class _Lock(object): 'Different strategies for handling collisions') @classmethod - def _release(cls, uid): + def _release(cls, uid, identity): lk = DBLock.get(uid) - log.debug('Release lock %s with %s', uid, lk.identity) + log.debug('Release lock %s with %s', uid, identity) lk.delete() def __enter__(self): - lk = self._acquire(self.uid, self.identity) - if lk.identity != self.identity: + lk = self._acquire(self.uid, self.identity, self.stamp) + if not lk.am_i_locking(self.identity): log.debug( 'Lock %s acquired by another identity %s != %s', - self.uid, self.identity, lk.identity) + self.uid, self.identity, lk.who_is_locking()) while self.retries: del DBLock._c.obj_cache[lk.key] time.sleep(self.wait) - lk = self._acquire(self.uid, self.identity) + lk = self._acquire(self.uid, self.identity, self.stamp) self.retries -= 1 - if lk.identity == self.identity: + if lk.am_i_locking(self.identity): break + else: + # reset stamp mark + self.stamp = str(uuid4()) else: - if lk.identity != self.identity: + if not lk.am_i_locking(self.identity): raise RuntimeError( 'Failed to acquire {},' ' owned by identity {}'.format( - lk.key, lk.identity)) + lk.key, lk.who_is_locking())) log.debug('Lock for %s acquired by %s', self.uid, self.identity) def __exit__(self, *err): - self._release(self.uid) + self._release(self.uid, self.identity, self.stamp) -class RiakLock(_Lock): +class _CRDTishLock(_Lock): @classmethod - def _acquire(cls, uid, identity): + def _release(cls, uid, identity, stamp): + log.debug("Release lock %s with %s", uid, identity) + lk = DBLock.get(uid) + lk.change_locking_state(identity, -1, stamp) + lk.save(force=True) + + @classmethod + def _acquire(cls, uid, identity, stamp): try: - try: - lk = DBLock.get(uid) - log.debug( - 'Found lock with UID %s, owned by %s, owner %r', - uid, lk.identity, lk.identity == identity) - except DBLayerNotFound: - log.debug( - 'Create lock UID %s for %s', uid, identity) - lk = DBLock.from_dict(uid, {'identity': identity}) - lk.save(force=True) - except SiblingsError: - log.debug( - 'Race condition for lock with UID %s, among %r', - uid, - [s.data.get('identity') for s in lk._riak_object.siblings]) - siblings = [] - for s in lk._riak_object.siblings: - if s.data.get('identity') != identity: - siblings.append(s) - lk._riak_object.siblings = siblings - lk.save() - return lk - - -class SQLiteLock(_Lock): - - @classmethod - def _acquire(cls, uid, identity): - """It is hard to properly handle concurrent updates - using sqlite backend. - INSERT only should maitain integrity of - primary keys and therefore will raise proper exception - """ + del DBLock._c.obj_cache[uid] + except KeyError: + pass try: lk = DBLock.get(uid) - log.debug( - 'Found lock with UID %s, owned by %s, owner %r', - uid, lk.identity, lk.identity == identity) - return lk except DBLayerNotFound: log.debug( 'Create lock UID %s for %s', uid, identity) - lk = DBLock.from_dict(uid, {'identity': identity}) - try: - lk.save(force=True, force_insert=True) - except peewee.IntegrityError: - log.debug('Lock was acquired by another thread') - return DBLock.get(uid) + lk = DBLock.from_dict(uid, {}) + lk.change_locking_state(identity, 1, stamp) + lk.save(force=True) + else: + locking = lk.who_is_locking() + if locking is not None: + log.debug( + 'Found lock with UID %s, owned by %s, owner %r', + uid, locking, lk.am_i_locking(identity)) + return lk + else: + log.debug( + 'Create lock UID %s for %s', uid, identity) + lk.change_locking_state(identity, 1, stamp) + lk.save(force=True) + del DBLock._c.obj_cache[lk.key] + lk = DBLock.get(uid) + locking = lk.who_is_locking() + if locking is not None and identity != locking: + if [identity, 1, stamp] in lk.lockers: + lk.change_locking_state(identity, -1, stamp) + lk.save(force=True) + log.debug("I was not locking, so removing me %s" % identity) return lk +class RiakLock(_CRDTishLock): + pass + + +class SQLiteLock(_CRDTishLock): + pass + + class RiakEnsembleLock(_Lock): @classmethod - def _acquire(cls, uid, identity): + def _acquire(cls, uid, identity, stamp): try: log.debug( 'Create lock UID %s for %s', uid, identity) diff --git a/solar/dblayer/model.py b/solar/dblayer/model.py index 207d5a09..19b21efd 100644 --- a/solar/dblayer/model.py +++ b/solar/dblayer/model.py @@ -143,7 +143,10 @@ def get_bucket(_, owner, mcs): bucket = bucket_type.bucket(name) if owner.bucket_properties: bucket.set_properties(owner.bucket_properties) - bucket.resolver = dblayer_conflict_resolver + if getattr(owner, 'conflict_resolver', None): + bucket.resolver = owner.conflict_resolver + else: + bucket.resolver = dblayer_conflict_resolver return bucket diff --git a/solar/dblayer/solar_models.py b/solar/dblayer/solar_models.py index 514d051a..b6f287cc 100644 --- a/solar/dblayer/solar_models.py +++ b/solar/dblayer/solar_models.py @@ -15,6 +15,7 @@ from collections import defaultdict +from itertools import chain from operator import itemgetter from types import NoneType from uuid import uuid4 @@ -1126,3 +1127,92 @@ class Lock(Model): bucket_type = C.lock_bucket_type identity = Field(basestring) + lockers = Field(list, default=list) + + @classmethod + def _reduce(cls, lockers): + # TODO: (jnowak) we could remove not needed lockers there + # not needed means already replaced by other lock. + _s = set() + for x in lockers: + _s.add(tuple(x)) + res = [list(x) for x in _s] + return res + + def sum_all(self): + reduced = self.reduce() + _pos = defaultdict(int) + _neg = defaultdict(int) + for locker, val, stamp in reduced: + k = (locker, stamp) + if val < 0: + if k in _pos: + del _pos[k] + else: + _neg[k] = -1 + elif val > 0: + if k in _neg: + del _neg[k] + else: + _pos[k] = 1 + # TODO: (jnowak) consider discard all orphaned releases + # # key_diff = set(_neg.keys()) - set(_pos.keys()) + # # for k in key_diff: + # # del _neg[k] + return {locker: val for ((locker, stamp), val) in chain( + _pos.items(), + _neg.items() + )} + + def reduce(self): + lockers = self.lockers + self.lockers = self._reduce(lockers) + return self.lockers + + def am_i_locking(self, uid): + return self.who_is_locking() == uid + + def who_is_locking(self): + try: + if self.identity: + return self.identity + return None + except KeyError: + summed = self.sum_all() + if not summed: + return None + to_max = sorted([(v, k) for (k, v) in summed.items()])[-1] + if to_max[0] > 0: + return to_max[1] + return None + + def change_locking_state(self, uid, value, stamp): + try: + if self.identity: + if value: + self.identity = uid + else: + raise Exception("Unsupported operation, to release " + "this lock you need to delete it.") + return True + except KeyError: + self.lockers.append([uid, value, stamp]) + self.reduce() + return True + + def save(self, *args, **kwargs): + self.reduce() + super(Lock, self).save(*args, **kwargs) + + @staticmethod + def conflict_resolver(riak_object): + siblings = riak_object.siblings + sdatas = map(lambda x: x.data.get('lockers', []), siblings) + l = [] + for data in sdatas: + l.extend(data) + reduced = Lock._reduce(l) + first_sibling = siblings[0] + first_sibling.data['lockers'] = reduced + riak_object.siblings = [first_sibling] + # del Lock._c.obj_cache[riak_object.key] diff --git a/solar/test/test_lock.py b/solar/test/test_lock.py index 42318068..46b6cce8 100644 --- a/solar/test/test_lock.py +++ b/solar/test/test_lock.py @@ -17,30 +17,31 @@ import pytest from solar.dblayer.locking import Lock from solar.dblayer.model import clear_cache +from solar.dblayer.solar_models import Lock as DBLock def test_acquire_release_logic(): uid = '2131' first = '1111' second = '2222' - assert Lock._acquire(uid, first).identity == first + assert Lock._acquire(uid, first, 'a').who_is_locking() == first clear_cache() - assert Lock._acquire(uid, second).identity == first - Lock._release(uid) - assert Lock._acquire(uid, second).identity == second + assert Lock._acquire(uid, second, 'a').who_is_locking() == first + Lock._release(uid, first, 'a') + assert Lock._acquire(uid, second, 'a').who_is_locking() == second def test_lock_acquired_released(): uid = '11' with Lock(uid, uid): clear_cache() - assert Lock._acquire(uid, '12').identity == '11' - assert Lock._acquire(uid, '12').identity == '12' + assert Lock._acquire(uid, '12', 'a').who_is_locking() == '11' + assert Lock._acquire(uid, '12', 'a').who_is_locking() == '12' def test_raise_error_if_acquired(): uid = '11' - Lock._acquire(uid, '12') + Lock._acquire(uid, '12', 'a') clear_cache() with pytest.raises(RuntimeError): with Lock(uid, '13'): @@ -50,7 +51,7 @@ def test_raise_error_if_acquired(): @patch('solar.dblayer.locking.time.sleep') def test_time_sleep_called(msleep): uid = '11' - Lock._acquire(uid, '12') + Lock._acquire(uid, '12', 'a') clear_cache() sleep_time = 5 with pytest.raises(RuntimeError): @@ -65,5 +66,15 @@ def test_lock_released_exception(): with Lock(uid, uid): raise Exception - new_lock = Lock._acquire(uid, '12') - assert new_lock.identity == '12' + new_lock = Lock._acquire(uid, '12', 'a') + assert new_lock.who_is_locking() == '12' + + +def test_locker_logic(): + uid = '11' + l = DBLock.from_dict(uid, {}) + + l.lockers = [['a', -1, 'x'], ['a', 1, 'y'], ['b', 1, 'z']] + l.reduce() + assert l.am_i_locking('b') + l.who_is_locking() == 'b'