solar/solar/dblayer/model.py
Dmitry Shulyak 8f1ca9708a Use WeakValueDictionary instead of WeakSet in DBModelProxy
Current implementation of DBModelProxy doesnt allow to use
origin hash function of Model class.

In order to avoid this problem we will store references to
Model instances in WeakValueDictionary instead of WeakSet.

Change-Id: If92af140c9aaad3a46b24872dae16969b1090df8
Closes-Bug: 1560369
2016-03-22 10:44:49 +02:00

958 lines
28 KiB
Python

# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functools import total_ordering
from functools import wraps
import time
import uuid
import weakref
from collections import defaultdict
from random import getrandbits
from threading import RLock
from solar.dblayer.conflict_resolution import dblayer_conflict_resolver
from solar.dblayer.lfu_cache import LFUCache
from solar.utils import get_local
class DBLayerException(Exception):
pass
class DBLayerNotFound(DBLayerException):
pass
class DBLayerNoRiakObj(DBLayerException):
pass
class NONE(object):
"""A None like type"""
class SingleIndexCache(object):
def __init__(self):
self.lock = RLock()
self.cached_vals = []
def __enter__(self):
self.lock.acquire()
return self
def fill(self, values):
self.cached_vals = values
def wipe(self):
self.cached_vals = []
def get_index(self, real_funct, ind_name, **kwargs):
kwargs.setdefault('max_results', 999999)
if not self.cached_vals:
recvs = real_funct(ind_name, **kwargs).results
self.fill(recvs)
def filter(self, startkey, endkey, max_results=1):
c = self.cached_vals
for (curr_val, obj_key) in c:
if max_results == 0:
break
if curr_val >= startkey:
if curr_val <= endkey:
max_results -= 1
yield (curr_val, obj_key)
else:
break
def __exit__(self, *args, **kwargs):
self.lock.release()
class SingleClassCache(object):
__slots__ = ['obj_cache', 'db_ch_state',
'lazy_save', 'origin_class',
'refs']
def __init__(self, origin_class):
self.obj_cache = LFUCache(origin_class, 200)
self.db_ch_state = {'index': set()}
self.lazy_save = set()
self.refs = defaultdict(weakref.WeakValueDictionary)
self.origin_class = origin_class
class ClassCache(object):
def __init__(self, *args, **kwargs):
self._l = RLock()
def __get__(self, inst, owner):
# th = current_thread()
with self._l:
l = Model._local
# better don't duplicate class names
cache_name = owner.__name__
try:
cache_id = l.cache_id
except AttributeError:
cache_id = uuid.UUID(int=getrandbits(128), version=4).hex
setattr(l, 'cache_id', cache_id)
if getattr(l, 'cache_id_cmp', None) != cache_id:
# new cache
setattr(l, 'cache_id_cmp', cache_id)
c = SingleClassCache(owner)
setattr(l, '_model_caches', {})
l._model_caches[cache_name] = c
try:
# already had this owner in cache
return l._model_caches[cache_name]
except KeyError:
# old cache but first time this owner
c = SingleClassCache(owner)
l._model_caches[cache_name] = c
return c
def clear_cache():
# th = current_thread()
l = Model._local
cache_id = uuid.UUID(int=getrandbits(128), version=4).hex
setattr(l, 'cache_id_cmp', cache_id)
def get_bucket(_, owner, mcs):
name = owner.get_bucket_name()
if owner.bucket_type:
bucket_type = mcs.riak_client.bucket_type(owner.bucket_type)
else:
# using default bucket type
bucket_type = mcs.riak_client
bucket = bucket_type.bucket(name)
if owner.bucket_properties:
bucket.set_properties(owner.bucket_properties)
if getattr(owner, 'conflict_resolver', None):
bucket.resolver = owner.conflict_resolver
else:
bucket.resolver = dblayer_conflict_resolver
return bucket
def changes_state_for(_type):
def _inner1(f):
@wraps(f)
def _inner2(obj, *args, **kwargs):
obj._c.db_ch_state['index'].add(obj.key)
obj.save_lazy()
return f(obj, *args, **kwargs)
return _inner2
return _inner1
def clears_state_for(_type):
def _inner1(f):
@wraps(f)
def _inner2(obj, *args, **kwargs):
try:
obj._c.db_ch_state[_type].remove(obj.key)
except KeyError:
pass
return f(obj, *args, **kwargs)
return _inner2
return _inner1
def requires_clean_state(_type):
def _inner1(f):
@wraps(f)
def _inner2(obj, *args, **kwargs):
check_state_for(_type, obj)
return f(obj, *args, **kwargs)
return _inner2
return _inner1
def check_state_for(_type, obj):
with obj._lock:
state = obj._c.db_ch_state.get(_type)
if state:
if True:
# TODO: solve it
obj.save_all_lazy()
state = obj._c.db_ch_state.get(_type)
if not state:
return
raise Exception("Dirty state, save all %r objects first" %
obj.__class__)
@total_ordering
class StrInt(object):
precision = 3
positive_char = 'p'
negative_char = 'n'
format_size = 10 + precision
def __init__(self, val=None):
self._val = self._make_val(val)
def __str__(self):
return self._val.__str__()
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self._val)
@classmethod
def p_max(cls):
return cls(int('9' * cls.format_size))
@classmethod
def p_min(cls):
return cls(1)
@classmethod
def n_max(cls):
return -cls.p_max()
@classmethod
def n_min(cls):
return -cls.p_min()
def __neg__(self):
time_ = self.int_val()
ret = self.__class__(-time_)
return ret
@classmethod
def greater(cls, inst):
if isinstance(inst, cls):
return cls(inst._val + 'g')
return cls(inst + 'g')
@classmethod
def to_hex(cls, value):
char = cls.positive_char
if value < 0:
value = int('9' * cls.format_size) + value
char = cls.negative_char
f = '%s%%.%dx' % (char, cls.format_size - 2)
value = f % value
if value[-1] == 'L':
value = value[:-1]
return value
@classmethod
def from_hex(cls, value):
v = int(value[1:], 16)
if value[0] == cls.negative_char:
v -= int('9' * cls.format_size)
return v
def int_val(self):
return self.from_hex(self._val)
@classmethod
def from_simple(cls, value):
return cls(value)
@classmethod
def to_simple(cls, value):
return value._val
@classmethod
def _make_val(cls, val):
if val is None:
val = time.time()
if isinstance(val, (long, int, float)):
if isinstance(val, float):
val = int(val * (10 ** cls.precision))
val = cls.to_hex(val)
elif isinstance(val, cls):
val = val._val
return val
def __eq__(self, other):
if isinstance(other, basestring):
first_ch = other[0]
if first_ch not in (self.positive_char, self.negative_char):
raise Exception("Cannot compare %r with %r" % (self, other))
else:
other = self.from_simple(other)
if not isinstance(other, self.__class__):
raise Exception("Cannot compare %r with %r" % (self, other))
so = other._val[0]
ss = self._val[0]
son = so == other.negative_char
ssn = ss == self.negative_char
if son != ssn:
return False
return self._val[1:] == other._val[1:]
def __gt__(self, other):
if isinstance(other, basestring):
first_ch = other[0]
if first_ch not in (self.positive_char, self.negative_char):
raise Exception("Cannot compare %r with %r" % (self, other))
else:
other = self.from_simple(other)
if not isinstance(other, self.__class__):
raise Exception("Cannot compare %r with %r" % (self, other))
so = other._val[0]
ss = self._val[0]
if ss == self.positive_char and so == other.negative_char:
return -1
elif ss == self.negative_char and so == other.positive_char:
return 1
else:
return other._val[1:] < self._val[1:]
class Replacer(object):
def __init__(self, name, fget, *args):
self.name = name
self.fget = fget
self.args = args
def __get__(self, instance, owner):
val = self.fget(instance, owner, *self.args)
if instance is not None:
setattr(instance, self.name, val)
else:
setattr(owner, self.name, val)
return val
class FieldBase(object):
def __init__(self, fname, default):
self._fname = fname
self._default = default
@property
def fname(self):
return self._fname
@fname.setter
def fname(self, value):
if self._fname is None:
self._fname = value
else:
raise Exception("fname already set")
@property
def default(self):
if self._default is NONE:
return self._default
if callable(self._default):
return self._default()
return self._default
class Field(FieldBase):
# in from_dict, when you set value to None,
# then types that are *not* there are set to NONE
_not_nullable_types = {int, float, long, str, unicode, basestring}
_simple_types = {int, float, long, str, unicode, basestring, list, tuple,
dict}
def __init__(self, _type, fname=None, default=NONE):
if _type == str:
_type = basestring
self._type = _type
super(Field, self).__init__(fname=fname, default=default)
def __get__(self, instance, owner):
if instance is None:
return self
val = instance._data_container[self.fname]
if self._type in self._simple_types:
return val
else:
return self._type.from_simple(val)
def __set__(self, instance, value):
if not isinstance(value, self._type):
raise Exception("Invalid type %r for %r, expected %r" %
(type(value), self.fname, self._type))
if self._type not in self._simple_types:
value = self._type.to_simple(value)
instance._field_changed(self)
instance._data_container[self.fname] = value
return value
def __str__(self):
return "<%s:%r>" % (self.__class__.__name__, self.fname)
__repr__ = __str__
class IndexedField(Field):
def __set__(self, instance, value):
value = super(IndexedField, self).__set__(instance, value)
instance._set_index('{}_bin'.format(self.fname), value)
return value
def _filter(self, startkey, endkey=None, **kwargs):
if isinstance(startkey,
self._type) and self._type not in self._simple_types:
startkey = self._type.to_simple(startkey)
if isinstance(endkey,
self._type) and self._type not in self._simple_types:
endkey = self._type.to_simple(endkey)
kwargs.setdefault('max_results', 1000000)
res = self._declared_in._get_index('{}_bin'.format(self.fname),
startkey=startkey,
endkey=endkey,
**kwargs).results
return res
def filter(self, *args, **kwargs):
kwargs['return_terms'] = False
res = self._filter(*args, **kwargs)
return res
class IndexFieldWrp(object):
def __init__(self, field_obj, instance):
self._field_obj = field_obj
self._instance = instance
self._c = self._instance._c
@property
def fname(self):
return self._field_obj.fname
def __str__(self):
return "<%s for field %s>" % (self.__class__.__name__, self._field_obj)
def _get_field_val(self, name):
return self._instance._data_container[self.fname][name]
def __getitem__(self, name):
return self._get_field_val(name)
def __setitem__(self, name, value):
inst = self._instance
inst._add_index('%s_bin' % self.fname, '{}|{}'.format(name, value))
def __delitem__(self, name):
inst = self._instance
del inst._data_container[self.fname][name]
indexes = inst._riak_object.indexes
# TODO: move this to backend layer
for ind_name, ind_value in indexes:
if ind_name == ('%s_bin' % self.fname):
if ind_value.startswith('{}|'.format(name)):
inst._remove_index(ind_name, ind_value)
break
class IndexField(FieldBase):
_wrp_class = IndexFieldWrp
def __init__(self, fname=None, default=NONE):
super(IndexField, self).__init__(fname, default)
def _on_no_inst(self, instance, owner):
return self
def __get__(self, instance, owner):
if instance is None:
return self._on_no_inst(instance, owner)
cached = getattr(instance, '_real_obj_%s' % self.fname, None)
if cached:
return cached
obj = self._wrp_class(self, instance)
setattr(instance, '_real_obj_%s' % self.fname, obj)
return obj
def __set__(self, instance, value):
wrp = getattr(instance, self.fname)
instance._data_container[self.fname] = self.default
for f_name, f_value in value.iteritems():
wrp[f_name] = f_value
def _parse_key(self, k):
if '=' in k:
val, subval = k.split('=', 1)
if subval is None:
subval = ''
if not isinstance(subval, basestring):
subval = str(subval)
return '{}|{}'.format(val, subval)
def filter(self, startkey, endkey=None, **kwargs):
startkey = self._parse_key(startkey)
if endkey is None:
if startkey.endswith('*'):
startkey = startkey[:-1]
endkey = startkey + '~'
else:
endkey = startkey + ' '
kwargs.setdefault('max_results', 1000000)
kwargs['return_terms'] = False
res = self._declared_in._get_index('{}_bin'.format(self.fname),
startkey=startkey,
endkey=endkey,
**kwargs).results
return list(res)
class CompositeIndexFieldWrp(IndexFieldWrp):
def reset(self):
index = []
for f in self._field_obj.fields:
index.append(self._instance._data_container.get(f, ''))
index = '|'.join(index)
index_to_del = []
for index_name, index_val in self._instance._riak_object.indexes:
if index_name == '%s_bin' % self.fname:
if index != index_val:
index_to_del.append((index_name, index_val))
for index_name, index_val in index_to_del:
self._instance._remove_index(index_name, index_val)
self._instance._add_index('%s_bin' % self.fname, index)
class CompositeIndexField(IndexField):
_wrp_class = CompositeIndexFieldWrp
def __init__(self, fields=(), *args, **kwargs):
super(CompositeIndexField, self).__init__(*args, **kwargs)
self.fields = fields
def _parse_key(self, startkey):
vals = [startkey[f] for f in self.fields if f in startkey]
return '|'.join(vals) + '*'
class ModelMeta(type):
_defined_models = set()
def __new__(mcs, name, bases, attrs):
cls = super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)
model_fields = set((name
for (name, attr) in attrs.items()
if isinstance(attr, FieldBase) and
not name.startswith('_')))
for f in model_fields:
field = getattr(cls, f)
if hasattr(field, 'fname') and field.fname is None:
setattr(field, 'fname', f)
setattr(field, 'gname', f)
# need to set declared_in because `with_tag`
# no need to wrap descriptor with another object then
setattr(field, '_declared_in', cls)
for base in bases:
try:
model_fields_base = base._model_fields
except AttributeError:
continue
else:
for given in model_fields_base:
model_fields.add(given)
cls._model_fields = [getattr(cls, x) for x in model_fields]
if bases == (object, ):
return cls
if issubclass(cls, NestedModel):
return cls
cls.bucket = Replacer('bucket', get_bucket, mcs)
mcs._defined_models.add(cls)
return cls
@classmethod
def setup(mcs, riak_client):
if hasattr(mcs, 'riak_client'):
raise DBLayerException("Setup already done")
mcs.riak_client = riak_client
@classmethod
def remove_all(mcs):
for model in mcs._defined_models:
model.delete_all()
@classmethod
def save_all_lazy(mcs, result=True):
for cls in mcs._defined_models:
for to_save in cls._c.lazy_save:
try:
to_save.save()
except DBLayerException:
continue
cls._c.lazy_save.clear()
@classmethod
def session_end(mcs, result=True):
mcs.save_all_lazy()
mcs.riak_client.session_end(result)
@classmethod
def session_start(mcs):
mcs.riak_client.session_start()
class NestedField(FieldBase):
def __init__(self, _class, fname=None, default=NONE, hash_key=None):
self._class = _class
self._hash_key = hash_key
super(NestedField, self).__init__(fname=fname, default=default)
def __get__(self, instance, owner):
if instance is None:
return self
cached = getattr(instance, '_real_obj_%s' % self.fname, None)
if cached:
return cached
if self._hash_key is not None:
obj = NestedModelHash(self, instance, self._class, self._hash_key)
else:
obj = self._class(self, instance)
setattr(instance, '_real_obj_%s' % self.fname, obj)
return obj
def __set__(self, instance, value):
obj = getattr(instance, self.fname)
obj.from_dict(value)
def __delete__(self, instance):
obj = getattr(instance, self.fname)
obj.delete()
class NestedModel(object):
__metaclass__ = ModelMeta
_nested_value = None
def __init__(self, field, parent):
self._field = field
self._parent = parent
def from_dict(self, data):
for field in self._model_fields:
fname = field.fname
gname = field.gname
val = data.get(fname, NONE)
default = field.default
if val is NONE and default is not NONE:
setattr(self, gname, default)
elif val is not NONE:
setattr(self, gname, val)
return
@property
def _data_container(self):
pdc = self._parent._data_container
try:
ppdc = pdc[self._field.fname]
except KeyError:
ppdc = pdc[self._field.fname] = {}
if self._field._hash_key is None:
return ppdc
else:
try:
ret = ppdc[self._nested_value]
except KeyError:
ret = ppdc[self._nested_value] = {}
return ret
def _field_changed(self, field):
return self._parent._modified_fields.add(self._field.fname)
def delete(self):
if self._field._hash_key is None:
del self._parent._data_container[self._field.fname]
class NestedModelHash(object):
def __init__(self, field, parent, _class, hash_key):
self._field = field
self._parent = parent
self._class = _class
self._hash_key = hash_key
self._cache = {}
def __getitem__(self, name):
try:
return self._cache[name]
except KeyError:
obj = self._class(self._field, self._parent)
obj._nested_value = name
self._cache[name] = obj
return obj
def __setitem__(self, name, value):
obj = self[name]
return obj.from_dict(value)
def __delitem__(self, name):
obj = self[name]
obj.delete()
del self._cache[name]
def from_dict(self, data):
hk = data[self._hash_key]
self[hk] = data
class Model(object):
__metaclass__ = ModelMeta
_c = ClassCache()
_key = None
_new = None
_real_riak_object = None
_changed = False
_local = get_local()()
_lock = RLock() # for class objs
bucket_properties = {}
bucket_type = None
def __init__(self, key=None):
self._modified_fields = set()
# TODO: that _indexes_changed should be smarter
self._indexes_changed = False
self.key = key
self._lock = RLock()
@property
def key(self):
return self._key
@key.setter
def key(self, value):
if self._key is None:
self._key = value
else:
raise Exception("Can't set key again")
@property
def _riak_object(self):
if self._real_riak_object is None:
raise DBLayerNoRiakObj("You cannot access _riak_object now")
return self._real_riak_object
@_riak_object.setter
def _riak_object(self, value):
if self._real_riak_object is not None:
raise DBLayerException("Already have _riak_object")
self._real_riak_object = value
@property
def _data_container(self):
return self._riak_object.data
@changes_state_for('index')
def _set_index(self, name, value):
self._indexes_changed = True
return self._riak_object.set_index(name, value)
@changes_state_for('index')
def _add_index(self, *args, **kwargs):
self._indexes_changed = True
return self._riak_object.add_index(*args, **kwargs)
@changes_state_for('index')
def _remove_index(self, *args, **kwargs):
self._indexes_changed = True
return self._riak_object.remove_index(*args, **kwargs)
@classmethod
def _get_index(cls, *args, **kwargs):
return cls.bucket.get_index(*args, **kwargs)
@property
def _bucket(self):
return self._riak_object.bucket
@classmethod
def get_bucket_name(cls):
# XXX: should be changed and more smart
return cls.__name__
def _field_changed(self, field):
self._modified_fields.add(field.fname)
def changed(self):
if self._modified_fields:
return True
return self._indexes_changed
def to_dict(self):
d = dict(self._riak_object.data)
d['key'] = self.key
return d
def __str__(self):
if self._riak_object is None:
return "<%s not initialized>" % (self.__class__.__name__)
return "<%s %s:%s>" % (self.__class__.__name__,
self._riak_object.bucket.name, self.key)
def __hash__(self):
return hash(self.key)
@classmethod
def new(cls, key, data):
return cls.from_dict(key, data)
@classmethod
def get_or_create(cls, key):
try:
return cls.get(key)
except DBLayerNotFound:
return cls.new(key, {})
@classmethod
def from_riakobj(cls, riak_obj):
obj = cls(riak_obj.key)
obj._riak_object = riak_obj
if obj._new is None:
obj._new = False
cache = cls._c.obj_cache
cache.set(riak_obj.key, obj)
# cache may adjust object
return cache.get(riak_obj.key)
@classmethod
def _pre_from_dict_check(cls, key, data=None):
if isinstance(key, dict) and data is None:
data = key
try:
key = data['key']
except KeyError:
raise DBLayerException("No key specified")
if key and 'key' in data and data['key'] != key:
raise DBLayerException("Different key values detected")
# shouldn't be needed, but may cover some weird usecase
# when inproperly using from_dict, because it then leads to conflicts
if key in cls._c.obj_cache:
raise DBLayerException("Object already exists in cache"
" cannot create second")
return key, data
@classmethod
def from_dict(cls, key, data=None):
key, data = cls._pre_from_dict_check(key, data)
data['key'] = key
with cls._c.obj_cache._lock:
if key in cls._c.obj_cache:
return cls._c.obj_cache.get(key)
riak_obj = cls.bucket.new(key, data={})
obj = cls.from_riakobj(riak_obj)
obj._new = True
for field in cls._model_fields:
# if field is cls._pkey_field:
# continue # pkey already set
fname = field.fname
gname = field.gname
val = data.get(fname, NONE)
default = field.default
if val is None and field._type not in field._not_nullable_types:
val = NONE
if val is NONE and default is not NONE:
setattr(obj, gname, default)
elif val is not NONE:
setattr(obj, gname, val)
return obj
@classmethod
def get(cls, key):
try:
return cls._c.obj_cache.get(key)
except KeyError:
riak_object = cls.bucket.get(key)
if not riak_object.exists:
raise DBLayerNotFound(key)
else:
return cls.from_riakobj(riak_object)
@classmethod
def multi_get(cls, keys):
# TODO: parallel execution
ret = map(cls.get, keys)
return ret
def _reset_state(self):
self._new = False
self._modified_fields.clear()
self._indexes_changed = False
@classmethod
def save_all_lazy(cls):
for to_save in set(cls._c.lazy_save):
try:
to_save.save()
except DBLayerException:
continue
cls._c.lazy_save.clear()
@clears_state_for('index')
def save(self, force=False, **kwargs):
with self._lock:
if self.changed() or force or self._new:
res = self._riak_object.store(**kwargs)
self._reset_state()
return res
else:
raise DBLayerException("No changes")
def save_lazy(self):
self._c.lazy_save.add(self)
@classmethod
def delete_all(cls):
cls.riak_client.delete_all(cls)
def delete(self):
ls = self._c.lazy_save
try:
ls.remove(self)
except KeyError:
pass
try:
del self._c.obj_cache[self.key]
except KeyError:
pass
self._riak_object.delete()
return self