Merge "Fix locking problems"
This commit is contained in:
commit
47da01d5c7
@ -80,6 +80,7 @@ class _Lock(object):
|
|||||||
' owned by identity {}'.format(
|
' owned by identity {}'.format(
|
||||||
lk.key, lk.who_is_locking()))
|
lk.key, lk.who_is_locking()))
|
||||||
log.debug('Lock for %s acquired by %s', self.uid, self.identity)
|
log.debug('Lock for %s acquired by %s', self.uid, self.identity)
|
||||||
|
return lk
|
||||||
|
|
||||||
def __exit__(self, *err):
|
def __exit__(self, *err):
|
||||||
self._release(self.uid, self.identity, self.stamp)
|
self._release(self.uid, self.identity, self.stamp)
|
||||||
@ -100,15 +101,23 @@ class _CRDTishLock(_Lock):
|
|||||||
del DBLock._c.obj_cache[uid]
|
del DBLock._c.obj_cache[uid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
_check = True
|
||||||
try:
|
try:
|
||||||
lk = DBLock.get(uid)
|
lk = DBLock.get(uid)
|
||||||
except DBLayerNotFound:
|
except DBLayerNotFound:
|
||||||
log.debug(
|
log.debug(
|
||||||
'Create lock UID %s for %s', uid, identity)
|
'Create new lock UID %s for %s', uid, identity)
|
||||||
lk = DBLock.from_dict(uid, {})
|
lk = DBLock.from_dict(uid, {})
|
||||||
lk.change_locking_state(identity, 1, stamp)
|
lk.change_locking_state(identity, 1, stamp)
|
||||||
lk.save(force=True)
|
lk.save(force=True)
|
||||||
else:
|
if len(lk.sum_all().keys()) != 1:
|
||||||
|
# concurrent create
|
||||||
|
lk.change_locking_state(identity, -1, stamp)
|
||||||
|
lk.save(force=True)
|
||||||
|
log.debug("Concurrent lock %s create", uid)
|
||||||
|
else:
|
||||||
|
_check = False
|
||||||
|
if _check:
|
||||||
locking = lk.who_is_locking()
|
locking = lk.who_is_locking()
|
||||||
if locking is not None:
|
if locking is not None:
|
||||||
log.debug(
|
log.debug(
|
||||||
@ -120,14 +129,13 @@ class _CRDTishLock(_Lock):
|
|||||||
'Create lock UID %s for %s', uid, identity)
|
'Create lock UID %s for %s', uid, identity)
|
||||||
lk.change_locking_state(identity, 1, stamp)
|
lk.change_locking_state(identity, 1, stamp)
|
||||||
lk.save(force=True)
|
lk.save(force=True)
|
||||||
del DBLock._c.obj_cache[lk.key]
|
summed = lk.sum_all()
|
||||||
lk = DBLock.get(uid)
|
if len(summed.keys()) != 1:
|
||||||
locking = lk.who_is_locking()
|
log.debug("More than one acquire")
|
||||||
if locking is not None and identity != locking:
|
if identity in summed:
|
||||||
if [identity, 1, stamp] in lk.lockers:
|
|
||||||
lk.change_locking_state(identity, -1, stamp)
|
lk.change_locking_state(identity, -1, stamp)
|
||||||
lk.save(force=True)
|
lk.save(force=True)
|
||||||
log.debug("I was not locking, so removing me %s" % identity)
|
log.debug("I may be not locking, so removing me %s", identity)
|
||||||
return lk
|
return lk
|
||||||
|
|
||||||
|
|
||||||
|
@ -1231,7 +1231,12 @@ class Lock(Model):
|
|||||||
|
|
||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
self.reduce()
|
self.reduce()
|
||||||
super(Lock, self).save(*args, **kwargs)
|
res = super(Lock, self).save(*args, **kwargs)
|
||||||
|
all_lockers = []
|
||||||
|
all_lockers.extend(res.data['lockers'])
|
||||||
|
all_lockers.extend(self.lockers)
|
||||||
|
self.lockers = self._reduce(all_lockers)
|
||||||
|
return res
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def conflict_resolver(riak_object):
|
def conflict_resolver(riak_object):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user