Merge pull request #370 from pigmej/dblayer_pep8

pep8 + flake for dblayer
This commit is contained in:
Łukasz Oleś 2015-11-25 16:09:47 +01:00
commit 39435de0fb
16 changed files with 435 additions and 434 deletions

View File

@ -2,26 +2,31 @@ from solar.dblayer.model import ModelMeta
from solar.dblayer.riak_client import RiakClient
from solar.config import C
if C.solar_db.mode == 'sqlite':
from solar.dblayer.sql_client import SqlClient
if C.solar_db.backend == 'memory':
client = SqlClient(C.solar_db.location, threadlocals=False, autocommit=False)
client = SqlClient(C.solar_db.location,
threadlocals=False,
autocommit=False)
elif C.solar_db.backend == 'file':
client = SqlClient(C.solar_db.location, threadlocals=True,
autocommit=False, pragmas=(('journal_mode', 'WAL'),
('synchronous', 'NORMAL')))
client = SqlClient(
C.solar_db.location,
threadlocals=True,
autocommit=False,
pragmas=(('journal_mode', 'WAL'), ('synchronous', 'NORMAL')))
else:
raise Exception('Unknown sqlite backend %s', C.solar_db.backend)
elif C.solar_db.mode == 'riak':
from solar.dblayer.riak_client import RiakClient
if C.solar_db.protocol == 'pbc':
client = RiakClient(
protocol=C.solar_db.protocol, host=C.solar_db.host, pb_port=C.solar_db.port)
client = RiakClient(protocol=C.solar_db.protocol,
host=C.solar_db.host,
pb_port=C.solar_db.port)
elif C.solar_db.protocol == 'http':
client = RiakClient(
protocol=C.solar_db.protocol, host=C.solar_db.host, http_port=C.solar_db.port)
client = RiakClient(protocol=C.solar_db.protocol,
host=C.solar_db.host,
http_port=C.solar_db.port)
else:
raise Exception('Unknown riak protocol %s', C.solar_db.protocol)
else:

View File

@ -4,13 +4,16 @@ from collections import Counter
def naive_resolver(riak_object):
# for now we support deleted vs existing object
siblings = riak_object.siblings
siblings_len = map(lambda sibling: (len(sibling._get_encoded_data()), sibling), siblings)
siblings_len = map(
lambda sibling: (len(sibling._get_encoded_data()), sibling), siblings)
siblings_len.sort()
c = Counter((x[0] for x in siblings_len))
if len(c) > 2:
raise RuntimeError("Too many different siblings, not sure what to do with siblings")
if not 0 in c:
raise RuntimeError("No empty object for resolution, not sure what to do with siblings")
raise RuntimeError(
"Too many different siblings, not sure what to do with siblings")
if 0 not in c:
raise RuntimeError("No empty object for resolution"
" not sure what to do with siblings")
selected = max(siblings_len)
# TODO: pass info to obj save_lazy too
riak_object.siblings = [selected[1]]

View File

@ -12,14 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from gevent.pool import Pool
import gevent
from solar.dblayer.solar_models import Resource
class DBLayerPool(Pool):
def __init__(self, *args, **kwargs):
super(DBLayerPool, self).__init__(*args, **kwargs)
self.parent = gevent.getcurrent()

View File

@ -77,6 +77,7 @@ class _localimpl(object):
_local = wrlocal()
if _local is not None:
_local.dicts.pop(idt, None)
wrlocal = ref(self, local_deleted)
wrthread = ref(thread, thread_deleted)
thread.__dict__[key] = wrlocal
@ -87,6 +88,7 @@ class _localimpl(object):
dicts = wrdicts()
if dicts:
dicts.pop(idt, None)
rawlink(clear)
wrthread = None
@ -101,8 +103,10 @@ def _patch(self):
try:
dct = impl.get_dict()
except KeyError:
# it's OK to acquire the lock here and not earlier, because the above code won't switch out
# however, subclassed __init__ might switch, so we do need to acquire the lock here
# it's OK to acquire the lock here and not earlier,
# because the above code won't switch out
# however, subclassed __init__ might switch,
# so we do need to acquire the lock here
dct = impl.create_dict()
args, kw = impl.localargs
with impl.locallock:
@ -118,7 +122,8 @@ class local(object):
def __new__(cls, *args, **kw):
if args or kw:
if (PYPY and cls.__init__ == object.__init__) or (not PYPY and cls.__init__ is object.__init__):
if (PYPY and cls.__init__ == object.__init__) or (
not PYPY and cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
self = object.__new__(cls)
impl = _localimpl()
@ -137,17 +142,15 @@ class local(object):
def __setattr__(self, name, value):
if name == '__dict__':
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
raise AttributeError("%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__setattr__(self, name, value)
def __delattr__(self, name):
if name == '__dict__':
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
raise AttributeError("%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__delattr__(self, name)
@ -159,7 +162,8 @@ class local(object):
duplicate = copy(d)
cls = type(self)
if (PYPY and cls.__init__ != object.__init__) or (not PYPY and cls.__init__ is not object.__init__):
if (PYPY and cls.__init__ != object.__init__) or (
not PYPY and cls.__init__ is not object.__init__):
args, kw = impl.localargs
instance = cls(*args, **kw)
else:

View File

@ -19,20 +19,16 @@ def _patch(obj, name, target):
setattr(obj, name, target)
def patch_all():
from solar.dblayer.model import ModelMeta
if ModelMeta._defined_models:
raise RuntimeError("You should run patch_multi_get before defining models")
raise RuntimeError(
"You should run patch_multi_get before defining models")
from solar.dblayer.model import Model
from solar.dblayer.solar_models import InputsFieldWrp
from solar.dblayer.gevent_helpers import (multi_get,
solar_map,
get_local)
from solar.dblayer.gevent_helpers import (multi_get, solar_map, get_local)
from solar import utils
_patch(Model, 'multi_get', multi_get)
_patch(utils, 'solar_map', solar_map)

View File

@ -2,9 +2,7 @@ from solar.utils import get_local
from random import getrandbits
import uuid
from functools import wraps, total_ordering
from operator import itemgetter
import time
from contextlib import contextmanager
from threading import RLock
from solar.dblayer.conflict_resolution import dblayer_conflict_resolver
@ -27,7 +25,6 @@ class NONE:
class SingleIndexCache(object):
def __init__(self):
self.lock = RLock()
self.cached_vals = []
@ -64,11 +61,9 @@ class SingleIndexCache(object):
self.lock.release()
class SingleClassCache(object):
__slots__ = ['obj_cache', 'db_ch_state',
'lazy_save', 'origin_class']
__slots__ = ['obj_cache', 'db_ch_state', 'lazy_save', 'origin_class']
def __init__(self, origin_class):
self.obj_cache = {}
@ -78,7 +73,6 @@ class SingleClassCache(object):
class ClassCache(object):
def __init__(self, *args, **kwargs):
self._l = RLock()
@ -130,7 +124,9 @@ def changes_state_for(_type):
obj._c.db_ch_state['index'].add(obj.key)
obj.save_lazy()
return f(obj, *args, **kwargs)
return _inner2
return _inner1
@ -143,7 +139,9 @@ def clears_state_for(_type):
except KeyError:
pass
return f(obj, *args, **kwargs)
return _inner2
return _inner1
@ -153,7 +151,9 @@ def requires_clean_state(_type):
def _inner2(obj, *args, **kwargs):
check_state_for(_type, obj)
return f(obj, *args, **kwargs)
return _inner2
return _inner1
@ -162,12 +162,12 @@ def check_state_for(_type, obj):
if state:
if True:
# TODO: solve it
orig_state = state
obj.save_all_lazy()
state = obj._c.db_ch_state.get(_type)
if not state:
return
raise Exception("Dirty state, save all %r objects first" % obj.__class__)
raise Exception("Dirty state, save all %r objects first" %
obj.__class__)
@total_ordering
@ -178,7 +178,6 @@ class StrInt(object):
negative_char = 'n'
format_size = 10 + precision
def __init__(self, val=None):
self._val = self._make_val(val)
@ -231,7 +230,7 @@ class StrInt(object):
def from_hex(cls, value):
v = int(value[1:], 16)
if value[0] == cls.negative_char:
v -= int('9' * self.format_size)
v -= int('9' * cls.format_size)
return v
def int_val(self):
@ -269,7 +268,7 @@ class StrInt(object):
so = other._val[0]
ss = self._val[0]
son = so == other.negative_char
ssn = so == self.negative_char
ssn = ss == self.negative_char
if son != ssn:
return False
return self._val[1:] == other._val[1:]
@ -293,9 +292,7 @@ class StrInt(object):
return other._val[1:] < self._val[1:]
class Replacer(object):
def __init__(self, name, fget, *args):
self.name = name
self.fget = fget
@ -337,13 +334,15 @@ class FieldBase(object):
class Field(FieldBase):
# in from_dict, when you set value to None, then types that are *not* there are set to NONE
# in from_dict, when you set value to None,
# then types that are *not* there are set to NONE
_not_nullable_types = {int, float, long, str, unicode, basestring}
_simple_types = {int, float, long, str, unicode, basestring, list, tuple, dict}
_simple_types = {int, float, long, str, unicode, basestring, list, tuple,
dict}
def __init__(self, _type, fname=None, default=NONE):
if _type == str:
_type = basestring
_type = basestring
self._type = _type
super(Field, self).__init__(fname=fname, default=default)
@ -358,7 +357,8 @@ class Field(FieldBase):
def __set__(self, instance, value):
if not isinstance(value, self._type):
raise Exception("Invalid type %r for %r, expected %r" % (type(value), self.fname, self._type))
raise Exception("Invalid type %r for %r, expected %r" %
(type(value), self.fname, self._type))
if self._type not in self._simple_types:
value = self._type.to_simple(value)
instance._field_changed(self)
@ -372,16 +372,17 @@ class Field(FieldBase):
class IndexedField(Field):
def __set__(self, instance, value):
value = super(IndexedField, self).__set__(instance, value)
instance._set_index('{}_bin'.format(self.fname), value)
return value
def _filter(self, startkey, endkey=None, **kwargs):
if isinstance(startkey, self._type) and self._type not in self._simple_types:
if isinstance(startkey,
self._type) and self._type not in self._simple_types:
startkey = self._type.to_simple(startkey)
if isinstance(endkey, self._type) and self._type not in self._simple_types:
if isinstance(endkey,
self._type) and self._type not in self._simple_types:
endkey = self._type.to_simple(endkey)
kwargs.setdefault('max_results', 1000000)
res = self._declared_in._get_index('{}_bin'.format(self.fname),
@ -397,7 +398,6 @@ class IndexedField(Field):
class IndexFieldWrp(object):
def __init__(self, field_obj, instance):
self._field_obj = field_obj
self._instance = instance
@ -486,7 +486,6 @@ class IndexField(FieldBase):
class CompositeIndexFieldWrp(IndexFieldWrp):
def reset(self):
index = []
for f in self._field_obj.fields:
@ -504,6 +503,7 @@ class CompositeIndexFieldWrp(IndexFieldWrp):
self._instance._add_index('%s_bin' % self.fname, index)
class CompositeIndexField(IndexField):
_wrp_class = CompositeIndexFieldWrp
@ -523,8 +523,10 @@ class ModelMeta(type):
def __new__(mcs, name, bases, attrs):
cls = super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)
model_fields = set((name for (name, attr) in attrs.items()
if isinstance(attr, FieldBase) and not name.startswith('_')))
model_fields = set((name
for (name, attr) in attrs.items()
if isinstance(attr, FieldBase) and
not name.startswith('_')))
for f in model_fields:
field = getattr(cls, f)
if hasattr(field, 'fname') and field.fname is None:
@ -536,14 +538,13 @@ class ModelMeta(type):
for base in bases:
try:
model_fields_base = base._model_fields
model_fields_base = base._model_fields
except AttributeError:
continue
else:
for given in base._model_fields:
for given in model_fields_base:
model_fields.add(given)
cls._model_fields = [getattr(cls, x) for x in model_fields]
if bases == (object, ):
@ -556,14 +557,12 @@ class ModelMeta(type):
mcs._defined_models.add(cls)
return cls
@classmethod
def setup(mcs, riak_client):
if hasattr(mcs, 'riak_client'):
raise DBLayerException("Setup already done")
mcs.riak_client = riak_client
@classmethod
def remove_all(mcs):
for model in mcs._defined_models:
@ -591,7 +590,6 @@ class ModelMeta(type):
class NestedField(FieldBase):
def __init__(self, _class, fname=None, default=NONE, hash_key=None):
self._class = _class
self._hash_key = hash_key
@ -665,9 +663,7 @@ class NestedModel(object):
del self._parent._data_container[self._field.fname]
class NestedModelHash(object):
def __init__(self, field, parent, _class, hash_key):
self._field = field
self._parent = parent
@ -698,7 +694,6 @@ class NestedModelHash(object):
self[hk] = data
class Model(object):
__metaclass__ = ModelMeta
@ -742,7 +737,6 @@ class Model(object):
raise DBLayerException("Already have _riak_object")
self._real_riak_object = value
@property
def _data_container(self):
return self._riak_object.data
@ -791,8 +785,8 @@ class Model(object):
def __str__(self):
if self._riak_object is None:
return "<%s not initialized>" % (self.__class__.__name__)
return "<%s %s:%s>" % (self.__class__.__name__, self._riak_object.bucket.name, self.key)
return "<%s %s:%s>" % (self.__class__.__name__,
self._riak_object.bucket.name, self.key)
@classmethod
def new(cls, key, data):
@ -879,7 +873,6 @@ class Model(object):
continue
cls._c.lazy_save.clear()
@clears_state_for('index')
def save(self, force=False):
if self.changed() or force or self._new:

View File

@ -5,7 +5,6 @@ from solar.dblayer.model import clear_cache
class RiakClient(OrigRiakClient):
def session_start(self):
clear_cache()
@ -16,10 +15,11 @@ class RiakClient(OrigRiakClient):
def delete_all(self, cls):
for _ in xrange(10):
# riak dislikes deletes without dvv
rst = cls.bucket.get_index('$bucket', startkey='_', max_results=100000).results
rst = cls.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
for key in rst:
cls.bucket.delete(key)
else:
return
time.sleep(0.5)

View File

@ -1,9 +1,7 @@
# -*- coding: utf-8 -*-
from solar.dblayer.model import (Model, Field, IndexField,
IndexFieldWrp,
from solar.dblayer.model import (Model, Field, IndexField, IndexFieldWrp,
DBLayerException,
requires_clean_state, check_state_for,
StrInt, SingleIndexCache,
check_state_for, StrInt, SingleIndexCache,
IndexedField, CompositeIndexField)
from types import NoneType
from operator import itemgetter
@ -13,8 +11,7 @@ from collections import defaultdict
from solar.utils import solar_map
InputTypes = Enum('InputTypes',
'simple list hash list_hash')
InputTypes = Enum('InputTypes', 'simple list hash list_hash')
class DBLayerSolarException(DBLayerException):
@ -57,8 +54,7 @@ class InputsFieldWrp(IndexFieldWrp):
if dlen == 5:
meta = None
elif dlen == 7:
meta = {'destination_key': data[5],
'tag': data[4]}
meta = {'destination_key': data[5], 'tag': data[4]}
else:
raise Exception("Unsupported case")
yield (other_resource, other_input), (my_resource, my_input), meta
@ -104,16 +100,17 @@ class InputsFieldWrp(IndexFieldWrp):
return list(self.__iter__())
def as_dict(self):
items = solar_map(lambda x: (x, self._get_field_val(x)), [x for x in self], concurrency=3)
items = solar_map(lambda x: (x, self._get_field_val(x)),
[x for x in self],
concurrency=3)
return dict(items)
def _connect_my_simple(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
def _connect_my_simple(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
types_mapping = '|{}_{}'.format(my_type.value, other_type.value)
my_ind_name = '{}_recv_bin'.format(self.fname)
my_ind_val = '{}|{}|{}|{}'.format(my_resource.key,
my_inp_name,
other_resource.key,
other_inp_name)
my_ind_val = '{}|{}|{}|{}'.format(my_resource.key, my_inp_name,
other_resource.key, other_inp_name)
my_ind_val += types_mapping
real_my_type = self._input_type(my_resource, my_inp_name)
@ -128,15 +125,15 @@ class InputsFieldWrp(IndexFieldWrp):
my_resource._add_index(my_ind_name, my_ind_val)
return my_inp_name
def _connect_other_simple(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
def _connect_other_simple(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
other_ind_name = '{}_emit_bin'.format(self.fname)
real_my_type = self._input_type(my_resource, my_inp_name)
if real_my_type == InputTypes.simple or ':' not in my_inp_name:
other_ind_val = '{}|{}|{}|{}'.format(other_resource.key,
other_inp_name,
my_resource.key,
my_inp_name)
my_resource.key, my_inp_name)
for ind_name, ind_value in my_resource._riak_object.indexes:
if ind_name == other_ind_name:
try:
@ -150,19 +147,17 @@ class InputsFieldWrp(IndexFieldWrp):
my_resource._remove_index(ind_name, ind_value)
break
elif real_my_type in (InputTypes.list_hash, InputTypes.hash, InputTypes.list):
elif real_my_type in (InputTypes.list_hash, InputTypes.hash,
InputTypes.list):
my_key, my_val = my_inp_name.split(':', 1)
if '|' in my_val:
my_val, my_tag = my_val.split('|', 1)
else:
my_tag = other_resource.name
my_inp_name = my_key
other_ind_val = '{}|{}|{}|{}|{}|{}'.format(other_resource.key,
other_inp_name,
my_resource.key,
my_inp_name,
my_tag,
my_val)
other_ind_val = '{}|{}|{}|{}|{}|{}'.format(
other_resource.key, other_inp_name, my_resource.key,
my_inp_name, my_tag, my_val)
for ind_name, ind_value in my_resource._riak_object.indexes:
if ind_name == other_ind_name:
try:
@ -178,21 +173,30 @@ class InputsFieldWrp(IndexFieldWrp):
break
else:
raise Exception("Unsupported connection type")
my_resource._add_index(other_ind_name,
other_ind_val)
my_resource._add_index(other_ind_name, other_ind_val)
return other_inp_name
def _connect_other_hash(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
return self._connect_other_simple(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
def _connect_other_hash(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
return self._connect_other_simple(
my_resource, my_inp_name, other_resource, other_inp_name, my_type,
other_type)
def _connect_other_list_hash(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
return self._connect_other_simple(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
def _connect_other_list_hash(self, my_resource, my_inp_name,
other_resource, other_inp_name, my_type,
other_type):
return self._connect_other_simple(
my_resource, my_inp_name, other_resource, other_inp_name, my_type,
other_type)
def _connect_my_list(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
ret = self._connect_my_simple(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
def _connect_my_list(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
ret = self._connect_my_simple(my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type)
return ret
def _connect_my_hash(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
def _connect_my_hash(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
my_key, my_val = my_inp_name.split(':', 1)
if '|' in my_val:
@ -201,27 +205,25 @@ class InputsFieldWrp(IndexFieldWrp):
my_tag = other_resource.name
types_mapping = '|{}_{}'.format(my_type.value, other_type.value)
my_ind_name = '{}_recv_bin'.format(self.fname)
my_ind_val = '{}|{}|{}|{}|{}|{}'.format(my_resource.key,
my_key,
my_ind_val = '{}|{}|{}|{}|{}|{}'.format(my_resource.key, my_key,
other_resource.key,
other_inp_name,
my_tag,
my_val
)
other_inp_name, my_tag, my_val)
my_ind_val += types_mapping
my_resource._add_index(my_ind_name, my_ind_val)
return my_key
def _connect_my_list_hash(self, my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type):
return self._connect_my_hash(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
def _connect_my_list_hash(self, my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type):
return self._connect_my_hash(my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type)
def connect(self, my_inp_name, other_resource, other_inp_name):
my_resource = self._instance
other_type = self._input_type(other_resource, other_inp_name)
my_type = self._input_type(my_resource, my_inp_name)
if my_type == other_type and not ':' in my_inp_name:
if my_type == other_type and ':' not in my_inp_name:
# if the type is the same map 1:1, and flat
my_type = InputTypes.simple
other_type = InputTypes.simple
@ -235,11 +237,13 @@ class InputsFieldWrp(IndexFieldWrp):
# set my side
my_meth = getattr(self, '_connect_my_{}'.format(my_type.name))
my_affected = my_meth(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
my_affected = my_meth(my_resource, my_inp_name, other_resource,
other_inp_name, my_type, other_type)
# set other side
other_meth = getattr(self, '_connect_other_{}'.format(other_type.name))
other_meth(my_resource, my_inp_name, other_resource, other_inp_name, my_type, other_type)
other_meth(my_resource, my_inp_name, other_resource, other_inp_name,
my_type, other_type)
try:
del self._cache[my_affected]
@ -261,7 +265,7 @@ class InputsFieldWrp(IndexFieldWrp):
# emit_name = '{}|{}'.format(my_tag, my_val)
full_name = '{}|{}|{}'.format(normalized_name, my_tag, my_val)
name = normalized_name
elif '|'in name:
elif '|' in name:
# disconnect everything from given input|resource
my_input, other_resource, other_input = name.split('|', 2)
full_name = my_input
@ -277,20 +281,24 @@ class InputsFieldWrp(IndexFieldWrp):
my_val, my_tag = None, None
indexes = self._instance._riak_object.indexes
to_dels = []
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname), indexes)
recvs = filter(lambda x: x[0] == '{}_recv_bin'.format(self.fname),
indexes)
for recv in recvs:
_, ind_value = recv
if ind_value.startswith('{}|{}|'.format(self._instance.key, normalized_name)):
if ind_value.startswith('{}|{}|'.format(self._instance.key,
normalized_name)):
spl = ind_value.split('|')
if len(spl) == 7 and my_tag and my_val:
if spl[-3] == my_tag and spl[-2] == my_val:
to_dels.append(recv)
else:
to_dels.append(recv)
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname), indexes)
emits = filter(lambda x: x[0] == '{}_emit_bin'.format(self.fname),
indexes)
for emit in emits:
_, ind_value = emit
if ind_value.endswith('|{}|{}'.format(self._instance.key, full_name)):
if ind_value.endswith('|{}|{}'.format(self._instance.key,
full_name)):
if emit_name:
if ind_value.startswith(emit_name):
to_dels.append(emit)
@ -317,7 +325,8 @@ class InputsFieldWrp(IndexFieldWrp):
try:
self._get_raw_field_val(name)
except KeyError:
raise DBLayerSolarException('No input {} for {}'.format(name, my_name))
raise DBLayerSolarException('No input {} for {}'.format(name,
my_name))
else:
return True
@ -360,12 +369,13 @@ class InputsFieldWrp(IndexFieldWrp):
my_meth = getattr(self, '_map_field_val_{}'.format(my_type.name))
return my_meth(recvs, name, my_name, other=other)
def _map_field_val_simple(self, recvs, input_name, name, other=None):
recvs = recvs[0]
index_val, obj_key = recvs
_, inp, emitter_key, emitter_inp, _mapping_type = index_val.split('|', 4)
res = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
_, inp, emitter_key, emitter_inp, _mapping_type = index_val.split('|',
4)
res = Resource.get(emitter_key).inputs._get_field_val(emitter_inp,
other)
self._cache[name] = res
return res
@ -373,16 +383,21 @@ class InputsFieldWrp(IndexFieldWrp):
if len(recvs) == 1:
recv = recvs[0]
index_val, obj_key = recv
_, inp, emitter_key, emitter_inp, mapping_type = index_val.split('|', 4)
res = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
if mapping_type != "{}_{}".format(InputTypes.simple.value, InputTypes.simple.value):
_, inp, emitter_key, emitter_inp, mapping_type = index_val.split(
'|', 4)
res = Resource.get(emitter_key).inputs._get_field_val(emitter_inp,
other)
if mapping_type != "{}_{}".format(InputTypes.simple.value,
InputTypes.simple.value):
res = [res]
else:
res = []
for recv in recvs:
index_val, obj_key = recv
_, _, emitter_key, emitter_inp, mapping_type = index_val.split('|', 4)
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
_, _, emitter_key, emitter_inp, mapping_type = index_val.split(
'|', 4)
cres = Resource.get(emitter_key).inputs._get_field_val(
emitter_inp, other)
res.append(cres)
self._cache[name] = res
return res
@ -392,8 +407,10 @@ class InputsFieldWrp(IndexFieldWrp):
tags = set()
for recv in recvs:
index_val, obj_key = recv
_, _, emitter_key, emitter_inp, my_tag, my_val, mapping_type = index_val.split('|', 6)
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
(_, _, emitter_key, emitter_inp,
my_tag, my_val, mapping_type) = index_val.split('|', 6)
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp,
other)
items.append((my_tag, my_val, cres))
tags.add(my_tag)
return items, tags
@ -408,25 +425,32 @@ class InputsFieldWrp(IndexFieldWrp):
if splen == 5:
# 1:1
_, inp, emitter_key, emitter_inp, mapping_type = splitted
if mapping_type != "{}_{}".format(InputTypes.simple.value, InputTypes.simple.value):
if mapping_type != "{}_{}".format(InputTypes.simple.value,
InputTypes.simple.value):
raise NotImplementedError()
res = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
res = Resource.get(emitter_key).inputs._get_field_val(
emitter_inp, other)
elif splen == 7:
# partial
_, _, emitter_key, emitter_inp, my_tag, my_val, mapping_type = splitted
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
(_, _, emitter_key, emitter_inp,
my_tag, my_val, mapping_type) = splitted
cres = Resource.get(emitter_key).inputs._get_field_val(
emitter_inp, other)
res = {my_val: cres}
my_resource = self._instance
my_resource_value = my_resource.inputs._get_raw_field_val(input_name)
my_resource_value = my_resource.inputs._get_raw_field_val(
input_name)
if my_resource_value:
for my_val, cres in my_resource_value.iteritems():
res[my_val] = cres
else:
raise Exception("Not supported splen %s", splen)
else:
items, tags = self._map_field_val_hash_single(recvs, input_name, other)
items, tags = self._map_field_val_hash_single(recvs, input_name,
other)
my_resource = self._instance
my_resource_value = my_resource.inputs._get_raw_field_val(input_name)
my_resource_value = my_resource.inputs._get_raw_field_val(
input_name)
if my_resource_value:
res = my_resource_value
else:
@ -441,19 +465,20 @@ class InputsFieldWrp(IndexFieldWrp):
def _map_field_val_list_hash(self, recvs, input_name, name, other=None):
items = []
tags = set()
for recv in recvs:
index_val, obj_key = recv
splitted_val = index_val.split('|', 6)
if len(splitted_val) == 5:
# it was list hash but with whole dict mapping
_, _, emitter_key, emitter_inp, mapping_type = splitted_val
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
cres = Resource.get(emitter_key).inputs._get_field_val(
emitter_inp, other)
items.append((emitter_key, None, cres))
else:
_, _, emitter_key, emitter_inp, my_tag, my_val, mapping_type = splitted_val
cres = Resource.get(emitter_key).inputs._get_field_val(emitter_inp, other)
mapping_type = splitted_val[-1]
(_, _, emitter_key, emitter_inp,
my_tag, my_val, mapping_type) = splitted_val
cres = Resource.get(emitter_key).inputs._get_field_val(
emitter_inp, other)
items.append((my_tag, my_val, cres))
tmp_res = {}
for first, my_val, value in items:
@ -482,7 +507,8 @@ class InputsFieldWrp(IndexFieldWrp):
except KeyError:
pass
inst = self._instance
inst._riak_object.remove_index('%s_bin' % self.fname, '{}|{}'.format(self._instance.key, name))
inst._riak_object.remove_index('%s_bin' % self.fname, '{}|{}'.format(
self._instance.key, name))
del inst._data_container[self.fname][name]
def __setitem__(self, name, value):
@ -502,19 +528,22 @@ class InputsFieldWrp(IndexFieldWrp):
fname = self.fname
my_name = self._instance.key
ind_name = '{}_recv_bin'.format(fname)
recvs = self._instance._get_index(ind_name,
startkey='{}|{}|'.format(my_name, name),
endkey='{}|{}|~'.format(my_name,name),
max_results=1,
return_terms=True).results
recvs = self._instance._get_index(
ind_name,
startkey='{}|{}|'.format(my_name, name),
endkey='{}|{}|~'.format(my_name, name),
max_results=1,
return_terms=True).results
if recvs:
recvs = recvs[0]
res, inp, emitter_name, emitter_inp = recvs[0].split('|')[:4]
raise Exception("%s:%s is connected with resource %s:%s" % (res, inp, emitter_name, emitter_inp))
raise Exception("%s:%s is connected with resource %s:%s" %
(res, inp, emitter_name, emitter_inp))
# inst = self._instance
robj = self._instance._riak_object
if name not in robj.data[self.fname]:
self._instance._add_index('%s_bin' % self.fname, '{}|{}'.format(my_name, name))
self._instance._add_index('%s_bin' % self.fname, '{}|{}'.format(
my_name, name))
robj.data[self.fname][name] = value
with self.inputs_index_cache as c:
@ -540,7 +569,6 @@ class InputsField(IndexField):
class TagsFieldWrp(IndexFieldWrp):
def __getitem__(self, name):
raise TypeError('You cannot get tags like this')
@ -573,7 +601,8 @@ class TagsFieldWrp(IndexFieldWrp):
if full_value in fld:
return
# indexes = inst._riak_object.indexes.copy() # copy it
inst._add_index('{}_bin'.format(self.fname), '{}~{}'.format(name, value))
inst._add_index('{}_bin'.format(self.fname), '{}~{}'.format(name,
value))
try:
fld.append(full_value)
except KeyError:
@ -582,7 +611,7 @@ class TagsFieldWrp(IndexFieldWrp):
def has_tag(self, name, subval=None):
fld = self._instance._data_container[self.fname]
if not name in fld:
if name not in fld:
return False
if subval is not None:
subvals = fld[name]
@ -598,15 +627,15 @@ class TagsFieldWrp(IndexFieldWrp):
fld = inst._data_container[self.fname]
full_value = '{}={}'.format(name, value)
try:
vals = fld.remove(full_value)
fld.remove(full_value)
except ValueError:
pass
else:
inst._remove_index('{}_bin'.format(self.fname), '{}~{}'.format(name, value))
inst._remove_index('{}_bin'.format(self.fname), '{}~{}'.format(
name, value))
return True
class TagsField(IndexField):
_wrp_class = TagsFieldWrp
@ -626,23 +655,22 @@ class TagsField(IndexField):
subval = str(subval)
# maxresults because of riak bug with small number of results
# https://github.com/basho/riak/issues/608
declared = self._declared_in
if not subval.endswith('*'):
res = self._declared_in._get_index('{}_bin'.format(self.fname),
startkey='{}~{}'.format(name, subval),
endkey='{}~{} '.format(name, subval), # space required
max_results=100000,
return_terms=True).results
res = declared._get_index('{}_bin'.format(self.fname),
startkey='{}~{}'.format(name, subval),
endkey='{}~{} '.format(name, subval),
max_results=100000,
return_terms=True).results
else:
subval = subval.replace('*', '')
res = self._declared_in._get_index('{}_bin'.format(self.fname),
startkey='{}~{}'.format(name, subval),
endkey='{}~{}~'.format(name, subval), # space required
max_results=100000,
return_terms=True).results
res = declared._get_index('{}_bin'.format(self.fname),
startkey='{}~{}'.format(name, subval),
endkey='{}~{}~'.format(name, subval),
max_results=100000,
return_terms=True).results
return set(map(itemgetter(1), res))
# class MetaInput(NestedModel):
# name = Field(str)
@ -670,7 +698,6 @@ class Resource(Model):
inputs = InputsField(default=dict)
tags = TagsField(default=list)
updated = IndexedField(StrInt)
def _connect_single(self, other_inputs, other_name, my_name):
@ -682,14 +709,17 @@ class Resource(Model):
other_inputs.connect(other_name, self, my_name)
def connect(self, other, mapping):
my_inputs = self.inputs
other_inputs = other.inputs
if mapping is None:
return
if self == other:
raise Exception('Trying to connect value-.* to itself')
solar_map(lambda (my_name, other_name): self._connect_single(other_inputs, other_name, my_name),
mapping.iteritems(), concurrency=2)
solar_map(
lambda (my_name, other_name): self._connect_single(other_inputs,
other_name,
my_name),
mapping.iteritems(),
concurrency=2)
def disconnect(self, other, inputs):
def _to_disconnect((emitter, receiver, meta)):
@ -699,7 +729,7 @@ class Resource(Model):
if not emitter[0] == self.key:
return False
key = emitter[1]
if not key in converted:
if key not in converted:
return False
convs = converted[key]
for conv in convs:
@ -718,8 +748,7 @@ class Resource(Model):
# normal input
return input, None
elif spl_len == 3:
return spl[0], {'tag': spl[1],
'destination_key': spl[2]}
return spl[0], {'tag': spl[1], 'destination_key': spl[2]}
else:
raise Exception("Cannot convert input %r" % input)
@ -731,7 +760,6 @@ class Resource(Model):
tag = meta.get('tag', other.name)
return '{}:{}|{}'.format(input, dest_key, tag)
converted = defaultdict(list)
for k, v in map(_convert_input, inputs):
converted[k].append(v)
@ -749,12 +777,11 @@ class Resource(Model):
@classmethod
def childs(cls, parents):
all_indexes = cls.bucket.get_index(
'inputs_recv_bin',
startkey='',
endkey='~',
return_terms=True,
max_results=999999)
all_indexes = cls.bucket.get_index('inputs_recv_bin',
startkey='',
endkey='~',
return_terms=True,
max_results=999999)
tmp = defaultdict(set)
to_visit = parents[:]
@ -774,20 +801,21 @@ class Resource(Model):
return visited
def delete(self):
inputs_index = self.bucket.get_index(
'inputs_emit_bin',
startkey=self.key,
endkey=self.key+'~',
return_terms=True,
max_results=999999)
inputs_index = self.bucket.get_index('inputs_emit_bin',
startkey=self.key,
endkey=self.key + '~',
return_terms=True,
max_results=999999)
to_disconnect_all = defaultdict(list)
for emit_bin in inputs_index.results:
index_vals = emit_bin[0].split('|')
index_vals_len = len(index_vals)
if index_vals_len == 6: # hash
_, my_input, other_res, other_input, my_tag, my_val = index_vals
to_disconnect_all[other_res].append("{}|{}|{}".format(my_input, my_tag, my_val))
(_, my_input, other_res,
other_input, my_tag, my_val) = index_vals
to_disconnect_all[other_res].append("{}|{}|{}".format(
my_input, my_tag, my_val))
elif index_vals_len == 4:
_, my_input, other_res, other_input = index_vals
to_disconnect_all[other_res].append(other_input)
@ -812,11 +840,12 @@ class CommitedResource(Model):
Type of operations:
- load all tasks for execution
- load single task + childs + all parents of childs (and transitions between them)
- load single task + childs + all parents
of childs (and transitions between them)
"""
class TasksFieldWrp(IndexFieldWrp):
class TasksFieldWrp(IndexFieldWrp):
def add(self, task):
return True
@ -857,9 +886,7 @@ class TasksField(IndexField):
return startkey
class ChildFieldWrp(TasksFieldWrp):
def add(self, task):
return self._add(self._instance, task)
@ -870,7 +897,6 @@ class ChildField(TasksField):
class ParentFieldWrp(TasksFieldWrp):
def add(self, task):
return self._add(task, self._instance)
@ -910,6 +936,7 @@ system log
5. keep order of history
"""
class NegativeCounter(Model):
count = Field(int, default=int)
@ -928,11 +955,11 @@ class LogItem(Model):
diff = Field(list)
connections_diff = Field(list)
state = Field(basestring)
base_path = Field(basestring) # remove me
base_path = Field(basestring) # remove me
updated = Field(StrInt)
history = IndexedField(StrInt)
log = Field(basestring) # staged/history
log = Field(basestring) # staged/history
composite = CompositeIndexField(fields=('log', 'resource', 'action'))
@ -942,7 +969,9 @@ class LogItem(Model):
@classmethod
def history_last(cls):
items = cls.history.filter(StrInt.n_max(), StrInt.n_min(), max_results=1)
items = cls.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=1)
if not items:
return None
return cls.get(items[0])
@ -952,7 +981,8 @@ class LogItem(Model):
self.composite.reset()
if 'log' in self._modified_fields and self.log == 'history':
self.history = StrInt(next(NegativeCounter.get_or_create('history')))
self.history = StrInt(next(NegativeCounter.get_or_create(
'history')))
return super(LogItem, self).save()
@classmethod

View File

@ -1,33 +1,29 @@
# -*- coding: utf-8 -*-
from collections import deque
import inspect
import os
import uuid
import sys
from peewee import CharField, BlobField, IntegerField, \
ForeignKeyField, Model, BooleanField, TextField, Field, Database
from peewee import CharField, BlobField, \
ForeignKeyField, Model
from solar.dblayer.model import clear_cache
from threading import RLock
# msgpack is way faster but less readable
# using json for easier debug
import json
encoder = json.dumps
def wrapped_loads(data, *args, **kwargs):
if not isinstance(data, basestring):
data = str(data)
return json.loads(data, *args, **kwargs)
decoder = wrapped_loads
class _DataField(BlobField):
def db_value(self, value):
return super(_DataField, self).db_value(encoder(value))
@ -36,7 +32,6 @@ class _DataField(BlobField):
class _LinksField(_DataField):
def db_value(self, value):
return super(_LinksField, self).db_value(list(value))
@ -46,7 +41,6 @@ class _LinksField(_DataField):
class _SqlBucket(Model):
def __init__(self, *args, **kwargs):
self._new = kwargs.pop('_new', False)
ed = kwargs.pop('encoded_data', None)
@ -82,7 +76,6 @@ class _SqlBucket(Model):
class FieldWrp(object):
def __init__(self, name):
self.name = name
@ -138,8 +131,8 @@ class RiakObj(object):
# TODO: possible optimization
# update only what's needed
# don't delete all at first
q = self.bucket._sql_idx.delete().where(
self.bucket._sql_idx.key == self.key)
q = self.bucket._sql_idx.delete().where(self.bucket._sql_idx.key ==
self.key)
q.execute()
for iname, ival in self.indexes:
@ -174,7 +167,8 @@ class RiakObj(object):
# (self.bucket._sql_idx.name == field) &
# (self.bucket._sql_idx.value == value))
# q.execute()
to_rem = set((x for x in self.indexes if x[0] == field and x[1] == value))
to_rem = set((
x for x in self.indexes if x[0] == field and x[1] == value))
self.indexes.difference_update(to_rem)
return self
@ -224,8 +218,8 @@ class RiakObj(object):
class IndexPage(object):
def __init__(self, index, results, return_terms, max_results, continuation):
def __init__(self, index, results, return_terms, max_results,
continuation):
self.max_results = max_results
self.index = index
if not return_terms:
@ -247,7 +241,6 @@ class IndexPage(object):
class Bucket(object):
def __init__(self, name, client):
self.client = client
table_name = "bucket_%s" % name.lower()
@ -258,19 +251,17 @@ class Bucket(object):
db_table = table_name
database = self.client.sql_session
self._sql_model = type(table_name, (_SqlBucket,),
{'Meta': ModelMeta,
'bucket': self})
self._sql_model = type(table_name, (_SqlBucket, ), {'Meta': ModelMeta,
'bucket': self})
_idx_key = ForeignKeyField(self._sql_model, null=False, index=True)
class IdxMeta:
db_table = idx_table_name
database = self.client.sql_session
self._sql_idx = type(idx_table_name, (_SqlIdx,),
{'Meta': IdxMeta,
'bucket': self,
'key': _idx_key})
self._sql_idx = type(idx_table_name, (_SqlIdx, ), {'Meta': IdxMeta,
'bucket': self,
'key': _idx_key})
def search(self, q, rows=10, start=0, sort=''):
raise NotImplementedError()
@ -323,8 +314,15 @@ class Bucket(object):
ret.vclock = "new"
return RiakObj(ret, new)
def get_index(self, index, startkey, endkey=None, return_terms=None,
max_results=None, continuation=None, timeout=None, fmt=None,
def get_index(self,
index,
startkey,
endkey=None,
return_terms=None,
max_results=None,
continuation=None,
timeout=None,
fmt=None,
term_regex=None):
if startkey and endkey is None:
endkey = startkey
@ -333,33 +331,31 @@ class Bucket(object):
if index == '$key':
if return_terms:
q = self._sql_model.select(
self._sql_model.value, self._sql_model.key)
q = self._sql_model.select(self._sql_model.value,
self._sql_model.key)
else:
q = self._sql_model.select(self._sql_model.key)
q = q.where(
self._sql_model.key >= startkey, self._sql_model.key <= endkey
).order_by(self._sql_model.key)
self._sql_model.key >= startkey,
self._sql_model.key <= endkey).order_by(self._sql_model.key)
elif index == '$bucket':
if return_terms:
q = self._sql_model.select(
self._sql_model.value, self._sql_model.key)
q = self._sql_model.select(self._sql_model.value,
self._sql_model.key)
else:
q = self._sql_model.select(self._sql_model.key)
if not startkey == '_' and endkey == '_':
q = q.where(
self._sql_model.key >= startkey, self._sql_model.key <= endkey
)
q = q.where(self._sql_model.key >= startkey,
self._sql_model.key <= endkey)
else:
if return_terms:
q = self._sql_idx.select(
self._sql_idx.value, self._sql_idx.key)
q = self._sql_idx.select(self._sql_idx.value,
self._sql_idx.key)
else:
q = self._sql_idx.select(self._sql_idx.key)
q = q.where(
self._sql_idx.name == index,
self._sql_idx.value >= startkey, self._sql_idx.value <= endkey
).order_by(self._sql_idx.value)
self._sql_idx.name == index, self._sql_idx.value >= startkey,
self._sql_idx.value <= endkey).order_by(self._sql_idx.value)
max_results = int(max_results or 0)
continuation = int(continuation or 0)
@ -376,7 +372,8 @@ class Bucket(object):
if not keys:
return []
else:
q = self._sql_model.select().where(self._sql_model.key << list(keys))
q = self._sql_model.select().where(self._sql_model.key << list(
keys))
print q
return map(RiakObj, list(q))
@ -431,6 +428,8 @@ class SqlClient(object):
def delete_all(self, cls):
# naive way for SQL, we could delete whole table contents
rst = cls.bucket.get_index('$bucket', startkey='_', max_results=100000).results
rst = cls.bucket.get_index('$bucket',
startkey='_',
max_results=100000).results
for key in rst:
cls.bucket.delete(key)

View File

@ -14,6 +14,7 @@ else:
from solar.dblayer.gevent_patches import patch_all
patch_all()
def create_all():
import sys

View File

@ -4,13 +4,15 @@ import time
import string
import random
def patched_get_bucket_name(cls):
return cls.__name__ + str(time.time())
class RndObj(object):
class RndObj(object):
def __init__(self, name):
self.rnd = name + ''.join((random.choice(string.ascii_lowercase) for x in xrange(8)))
self.rnd = name + ''.join((random.choice(string.ascii_lowercase)
for x in xrange(8)))
self.calls = 0
def next(self):
@ -21,6 +23,7 @@ class RndObj(object):
def __iter__(self):
return self
@pytest.fixture(scope='function')
def rk(request):
@ -30,6 +33,7 @@ def rk(request):
return obj
@pytest.fixture(scope='function')
def rt(request):
@ -51,6 +55,7 @@ def pytest_runtest_teardown(item, nextitem):
ModelMeta.session_end(result=True)
return nextitem
def pytest_runtest_call(item):
ModelMeta.session_start()

View File

@ -1,11 +1,9 @@
import pytest
from solar.dblayer.model import (Field, IndexField,
clear_cache, Model,
StrInt,
DBLayerNotFound,
DBLayerNoRiakObj,
from solar.dblayer.model import (Field, IndexField, clear_cache, Model, StrInt,
DBLayerNotFound, DBLayerNoRiakObj,
DBLayerException)
class M1(Model):
f1 = Field(str)
@ -27,7 +25,6 @@ class M3(Model):
ind = IndexField(default=dict)
def test_from_dict(rk):
key = next(rk)
@ -98,7 +95,7 @@ def test_cache_logic(rk):
M1.session_start()
assert M1._c.obj_cache == {}
m12 = M1.get(k)
M1.get(k)
aid = id(M1._c)
assert pid != aid
@ -108,19 +105,21 @@ def test_normal_index(rk):
key = next(rk)
key2 = next(rk)
m1 = M1.from_dict(key, {'f1': 'blah', 'f2': 150,
m1 = M1.from_dict(key, {'f1': 'blah',
'f2': 150,
'ind': {'blah': 'something'}})
m1.save()
m2 = M1.from_dict(key2, {'f1': 'blah', 'f2': 150,
'ind': {'blah': 'something2'}})
m2 = M1.from_dict(key2, {'f1': 'blah',
'f2': 150,
'ind': {'blah': 'something2'}})
m2.save()
assert set(M1.ind.filter('blah=somethi*')) == set([key, key2])
assert set(M1.ind.filter('blah=something')) == set([key])
assert set(M1.ind.filter('blah=something2')) == set([key2])
def test_update(rk):
def test_update_behaviour(rk):
key = next(rk)
m1 = M1.from_dict(key, {'f1': 'blah', 'f2': 150})
@ -232,7 +231,7 @@ def test_delete_cache_behaviour(rk):
M1.get(key1).delete()
with pytest.raises(DBLayerNotFound):
m12 = M1.get(key1)
M1.get(key1)
def test_fast_delete(rk):

View File

@ -1,4 +1,3 @@
import pytest
from solar.dblayer.solar_models import Task
@ -7,16 +6,12 @@ def test_tasks_selected_by_execution_id(rk):
execution = next(rk)
for i in range(2):
t = Task.new(
{'name': str(i),
'execution': execution})
t = Task.new({'name': str(i), 'execution': execution})
t.save()
another_execution = next(rk)
for i in range(2):
t = Task.new(
{'name': str(i),
'execution': another_execution})
t = Task.new({'name': str(i), 'execution': another_execution})
t.save()
assert len(Task.execution.filter(execution)) == 2
@ -26,13 +21,9 @@ def test_tasks_selected_by_execution_id(rk):
def test_parent_child(rk):
execution = next(rk)
t1 = Task.new(
{'name': '1',
'execution': execution})
t1 = Task.new({'name': '1', 'execution': execution})
t2 = Task.new(
{'name': '2',
'execution': execution})
t2 = Task.new({'name': '2', 'execution': execution})
t1.childs.add(t2)
t1.save()
t2.save()
@ -41,5 +32,3 @@ def test_parent_child(rk):
assert Task.parents.filter(t2.key) == [t1.key]
assert t1.childs.all_tasks() == [t2]
assert t2.parents.all_names() == [t1.name]

View File

@ -1,4 +1,3 @@
import pytest
from solar.dblayer.solar_models import LogItem, NegativeCounter
from solar.dblayer.model import StrInt
@ -30,8 +29,10 @@ def test_multiple_filter():
l1.save()
l2.save()
assert LogItem.composite.filter({'log': 'history', 'resource': 'a'}) == [l1.key]
assert LogItem.composite.filter({'log': 'history', 'resource': 'b'}) == [l2.key]
assert LogItem.composite.filter({'log': 'history',
'resource': 'a'}) == [l1.key]
assert LogItem.composite.filter({'log': 'history',
'resource': 'b'}) == [l2.key]
def test_changed_index():
@ -60,8 +61,9 @@ def test_reversed_order_is_preserved():
li.save()
added.append(li.key)
added.reverse()
assert list(LogItem.history.filter(
StrInt.n_max(), StrInt.n_min(), max_results=2)) == added[:2]
assert list(LogItem.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=2)) == added[:2]
def test_staged_not_indexed():
@ -75,8 +77,10 @@ def test_staged_not_indexed():
li.log = 'history'
li.save()
assert set(LogItem.history.filter(
StrInt.n_max(), StrInt.n_min())) == {li.key for li in added[:2]}
assert set(LogItem.history.filter(StrInt.n_max(), StrInt.n_min())) == {
li.key
for li in added[:2]
}
def test_history_last_filter():
@ -89,5 +93,4 @@ def test_history_last_filter():
def test_history_last_returns_none():
assert LogItem.history_last() == None
assert LogItem.history_last() is None

View File

@ -1,11 +1,5 @@
import pytest
from solar.dblayer.model import (Field, IndexField,
clear_cache, Model,
NestedField,
NestedModel,
DBLayerNotFound,
DBLayerNoRiakObj,
DBLayerException)
from solar.dblayer.model import (Field, Model,
NestedField, NestedModel)
class N1(NestedModel):
@ -14,7 +8,6 @@ class N1(NestedModel):
f_nested2 = Field(int, default=150)
class M1(Model):
f1 = Field(str)
@ -22,20 +15,20 @@ class M1(Model):
f3 = NestedField(N1, hash_key='f_nested1')
def test_nested_simple(rk):
key = next(rk)
m1 = M1.from_dict(key, {'f1': 'blah',
'f2': {'f_nested1': 'foo'}})
m1 = M1.from_dict(key, {'f1': 'blah', 'f2': {'f_nested1': 'foo'}})
assert m1.f2.f_nested1 == 'foo'
assert m1.f2.f_nested2 == 150
assert m1._modified_fields == set(['f1', 'f2'])
assert m1._data_container == {'f1': 'blah', 'f2': {'f_nested1': 'foo', 'f_nested2': 150}}
assert m1._data_container == {'f1': 'blah',
'f2': {'f_nested1': 'foo',
'f_nested2': 150}}
del m1.f2
assert m1._data_container == {'f1': 'blah'}
assert m1._data_container == {'f1': 'blah'}
def test_nested(rk):
@ -43,7 +36,8 @@ def test_nested(rk):
m1 = M1.from_dict(key, {'f1': 'blah',
'f2': {'f_nested1': 'foo'},
'f3': {'f_nested1': 'foo', 'f_nested2': 150}})
'f3': {'f_nested1': 'foo',
'f_nested2': 150}})
assert m1.f2.f_nested1 == 'foo'
assert m1.f2.f_nested2 == 150
@ -55,7 +49,12 @@ def test_nested(rk):
assert m1.f3['blah'].f_nested2 == 250
assert m1._modified_fields == set(['f1', 'f2', 'f3'])
exp = {'f1': 'blah', 'f2': {'f_nested1': 'foo', 'f_nested2': 150}, 'f3': {'blah': {'f_nested2': 250}, 'foo': {'f_nested1': 'foo', 'f_nested2': 150}}}
exp = {'f1': 'blah',
'f2': {'f_nested1': 'foo',
'f_nested2': 150},
'f3': {'blah': {'f_nested2': 250},
'foo': {'f_nested1': 'foo',
'f_nested2': 150}}}
assert m1._data_container == exp
del m1.f2
@ -63,4 +62,3 @@ def test_nested(rk):
assert m1._data_container == exp
assert m1._modified_fields == set(['f1', 'f2', 'f3'])

View File

@ -1,7 +1,7 @@
import pytest
import random
from solar.dblayer.model import Model, Field, IndexField, clear_cache, check_state_for, StrInt
from solar.dblayer.model import StrInt
from solar.dblayer.model import check_state_for
from solar.dblayer.solar_models import Resource, DBLayerSolarException
@ -21,6 +21,7 @@ def create_resource(key, data):
data['meta_inputs'] = mi
return Resource.from_dict(key, data)
@pytest.mark.xfail(reason="Not YET decided how it should work")
def test_changes_state(rk):
key = next(rk)
@ -28,7 +29,7 @@ def test_changes_state(rk):
r.inputs['a'] = 1
with pytest.raises(Exception):
# raise exception when something is changed
val = r.inputs['a']
r.inputs['a']
r.save()
check_state_for('index', r)
@ -50,14 +51,14 @@ def test_basic_input(rk):
def test_input_in_dict(rk):
key = next(rk)
r = create_resource(key, {'name': 'a name',
'inputs': {'input1': 15,
'input2': None}})
'inputs': {'input1': 15,
'input2': None}})
r.save()
assert r._riak_object.data['inputs']['input1'] == 15
assert r.inputs['input1'] == 15
assert r._riak_object.data['inputs']['input2'] == None
assert r.inputs['input2'] == None
assert r._riak_object.data['inputs']['input2'] is None
assert r.inputs['input2'] is None
def test_basic_connect(rk):
@ -65,11 +66,11 @@ def test_basic_connect(rk):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r1.connect(r2, {'input1': 'input1', 'input2': 'input2'})
r1.save()
@ -78,13 +79,13 @@ def test_basic_connect(rk):
assert r1._riak_object.data['inputs']['input1'] == 10
assert r1.inputs['input1'] == 10
assert r2._riak_object.data['inputs']['input1'] == None
assert r2._riak_object.data['inputs']['input1'] is None
assert r2.inputs['input1'] == 10
assert r1._riak_object.data['inputs']['input2'] == 15
assert r1.inputs['input2'] == 15
assert r2._riak_object.data['inputs']['input2'] == None
assert r2._riak_object.data['inputs']['input2'] is None
assert r2.inputs['input2'] == 15
@ -94,12 +95,12 @@ def test_adv_connect(rk, depth):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
prev = create_resource(k2, {'name': 'second',
'inputs': {'input1': None,
'input2': None,
'input3': 0}})
'inputs': {'input1': None,
'input2': None,
'input3': 0}})
conn = {'input1': 'input1', 'input2': 'input2'}
r1.save()
r1.connect(prev, conn)
@ -109,9 +110,9 @@ def test_adv_connect(rk, depth):
for x in xrange(depth - 1):
k = next(rk)
res = create_resource(k, {'name': 'next %d' % (x + 1),
'inputs': {'input1': None,
'input2': None,
'input3': x + 1}})
'inputs': {'input1': None,
'input2': None,
'input3': x + 1}})
created.append(res)
prev.connect(res, conn)
res.save()
@ -126,15 +127,14 @@ def test_adv_connect(rk, depth):
@pytest.mark.parametrize('depth', (1, 3, 5, 10, 50, 100))
def test_perf_inputs(rk, depth):
k1 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 'target'}})
r1 = create_resource(k1, {'name': 'first', 'inputs': {'input1': 'target'}})
r1.save()
prev = r1
for x in xrange(depth):
k = next(rk)
res = create_resource(k, {'name': 'next %d' % (x + 1),
'inputs': {'input1': None}})
'inputs': {'input1': None}})
prev.connect(res, {'input1': 'input1'})
res.save()
prev = res
@ -152,15 +152,15 @@ def test_change_connect(rk):
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input1': None,
'input2': None,
'input3': 0}})
'inputs': {'input1': None,
'input2': None,
'input3': 0}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': 30,
'input2': 35}})
'inputs': {'input1': 30,
'input2': 35}})
r1.connect(r2, {'input1': 'input1', 'input2': 'input2'})
r3.connect(r2, {'input1': 'input1'})
@ -173,13 +173,12 @@ def test_change_connect(rk):
assert r2.inputs['input2'] == 15
def test_simple_tag(rk, rt):
k1 = next(rk)
tag = next(rt)
r1 = create_resource(k1, {'name': 'first',
'tags': ['%s' % tag, '%s=10' % tag]})
'tags': ['%s' % tag, '%s=10' % tag]})
r1.save()
assert list(r1.tags) == ['%s=' % tag, '%s=10' % tag]
@ -190,12 +189,10 @@ def test_list_by_tag(rk, rt):
k2 = next(rk)
tag1 = next(rt)
tag2 = next(rt)
r1 = create_resource(k1, {'name': 'first',
'tags': [tag1, '%s=10' % tag1]})
r1 = create_resource(k1, {'name': 'first', 'tags': [tag1, '%s=10' % tag1]})
r1.save()
r2 = create_resource(k2, {'name': 'first',
'tags': [tag1, '%s=10' % tag2]})
r2 = create_resource(k2, {'name': 'first', 'tags': [tag1, '%s=10' % tag2]})
r2.save()
assert len(Resource.tags.filter(tag1)) == 2
@ -234,10 +231,9 @@ def test_list_inputs(rk):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': []}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second', 'inputs': {'input': []}})
r1.connect(r2, {'input1': 'input'})
r1.connect(r2, {'input2': 'input'})
@ -252,15 +248,13 @@ def test_dict_to_dict_inputs(rk):
k1 = next(rk)
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input': {'input1': 10,
'input2': 15}
}})
'inputs': {'input': {'input1': 10,
'input2': 15}}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': {'input1': None,
'input2': None,
'input3': None}}})
'inputs': {'input': {'input1': None,
'input2': None,
'input3': None}}})
r1.connect(r2, {'input': 'input'})
r1.save()
@ -275,10 +269,8 @@ def test_list_to_list_inputs(rk):
k1 = next(rk)
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input': [10, 15]}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': []}})
r1 = create_resource(k1, {'name': 'first', 'inputs': {'input': [10, 15]}})
r2 = create_resource(k2, {'name': 'second', 'inputs': {'input': []}})
r1.connect(r2, {'input': 'input'})
@ -293,15 +285,13 @@ def test_simple_to_dict_inputs(rk):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': {'input1': None,
'input2': None}}})
'inputs': {'input': {'input1': None,
'input2': None}}})
r1.connect(r2, {'input1': 'input:input1',
'input2': 'input:input2'})
r1.connect(r2, {'input1': 'input:input1', 'input2': 'input:input2'})
r1.save()
r2.save()
@ -316,15 +306,14 @@ def test_simple_to_dict_inputs_with_tag(rk):
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': 110,
'input2': 115}})
'inputs': {'input1': 110,
'input2': 115}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': {'input1': None,
'input2': None}}})
'inputs': {'input': {'input1': None,
'input2': None}}})
r1.connect(r2, {'input1': 'input:input1|tag'})
r3.connect(r2, {'input2': 'input:input2|tag'})
@ -345,21 +334,19 @@ def test_simple_to_listdict_inputs(rk):
k4 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': 110,
'input2': 115}})
'inputs': {'input1': 110,
'input2': 115}})
r4 = create_resource(k4, {'name': 'first',
'inputs': {'input1': 1110,
'input2': 1115}})
'inputs': {'input1': 1110,
'input2': 1115}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': [{'input1': None,
'input2': None}]}})
'inputs': {'input': [{'input1': None,
'input2': None}]}})
r1.connect(r2, {'input1': 'input:input1',
'input2': 'input:input2'})
r1.connect(r2, {'input1': 'input:input1', 'input2': 'input:input2'})
r3.connect(r2, {'input2': 'input:input2|tag2',
'input1': 'input:input1|tag1'})
r4.connect(r2, {'input2': 'input:input2|tag1',
@ -370,9 +357,11 @@ def test_simple_to_listdict_inputs(rk):
r3.save()
r4.save()
assert r2.inputs['input'] == [{u'input2': 1115, u'input1': 110},
{u'input2': 115, u'input1': 1110},
{u'input2': 15, u'input1': 10}]
assert r2.inputs['input'] == [{u'input2': 1115,
u'input1': 110}, {u'input2': 115,
u'input1': 1110},
{u'input2': 15,
u'input1': 10}]
def test_dict_to_list_inputs(rk):
@ -381,8 +370,7 @@ def test_dict_to_list_inputs(rk):
k2 = next(rk)
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'modules': [{}]}})
r1 = create_resource(k1, {'name': 'first', 'inputs': {'modules': [{}]}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'module': {'name': 'blah2'}}})
r3 = create_resource(k3, {'name': 'third',
@ -394,9 +382,8 @@ def test_dict_to_list_inputs(rk):
r2.save()
r3.save()
assert sorted(r1.inputs['modules']) == sorted([{'name': 'blah2'}, {'name': 'blah3'}])
assert sorted(r1.inputs['modules']) == sorted([{'name': 'blah2'},
{'name': 'blah3'}])
def test_passthrough_inputs(rk):
@ -406,19 +393,17 @@ def test_passthrough_inputs(rk):
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r2.connect(r3, {'input1': 'input1',
'input2': 'input2'})
r1.connect(r2, {'input1': 'input1',
'input2': 'input2'})
r2.connect(r3, {'input1': 'input1', 'input2': 'input2'})
r1.connect(r2, {'input1': 'input1', 'input2': 'input2'})
r1.save()
r2.save()
@ -434,19 +419,17 @@ def test_disconnect_by_input(rk):
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r2.connect(r3, {'input1': 'input1',
'input2': 'input2'})
r1.connect(r2, {'input1': 'input1',
'input2': 'input2'})
r2.connect(r3, {'input1': 'input1', 'input2': 'input2'})
r1.connect(r2, {'input1': 'input1', 'input2': 'input2'})
r1.save()
r2.save()
@ -477,14 +460,14 @@ def test_resource_childs(rk):
k3 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r3 = create_resource(k3, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r2.connect(r3, {'input1': 'input1'})
r1.connect(r2, {'input1': 'input1'})
@ -511,11 +494,11 @@ def test_delete(rk):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'first',
'inputs': {'input1': None,
'input2': None}})
'inputs': {'input1': None,
'input2': None}})
r1.connect(r2, {'input1': 'input1'})
r1.save()
@ -535,15 +518,13 @@ def test_delete_hash(rk):
k2 = next(rk)
r1 = create_resource(k1, {'name': 'first',
'inputs': {'input1': 10,
'input2': 15}})
'inputs': {'input1': 10,
'input2': 15}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'input': {'input1': None,
'input2': None}}})
'inputs': {'input': {'input1': None,
'input2': None}}})
r1.connect(r2, {'input1': 'input:input1',
'input2': 'input:input2'})
r1.connect(r2, {'input1': 'input:input1', 'input2': 'input:input2'})
r1.save()
r2.save()
@ -566,10 +547,8 @@ def test_nested_simple_listdict(rk):
r1 = create_resource(k1, {'name': 'first',
'inputs': {'config': [{"backends": [{}],
'listen_port': 1}]}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {'backend': {}}})
r3 = create_resource(k3, {'name': 'third',
'inputs': {'backend': {}}})
r2 = create_resource(k2, {'name': 'second', 'inputs': {'backend': {}}})
r3 = create_resource(k3, {'name': 'third', 'inputs': {'backend': {}}})
r5 = create_resource(k5, {'name': 'fifth',
'inputs': {"port": 5,
"host": "fifth_host"}})
@ -577,11 +556,8 @@ def test_nested_simple_listdict(rk):
'inputs': {"port": 4,
"host": "fourth_host"}})
r4.connect(r2, {'port': "backend:port",
'host': 'backend:host'})
r5.connect(r3, {'port': "backend:port",
'host': 'backend:host'})
r4.connect(r2, {'port': "backend:port", 'host': 'backend:host'})
r5.connect(r3, {'port': "backend:port", 'host': 'backend:host'})
assert r2.inputs['backend'] == {'host': 'fourth_host', 'port': 4}
assert r3.inputs['backend'] == {'host': 'fifth_host', 'port': 5}
@ -591,7 +567,8 @@ def test_nested_simple_listdict(rk):
Resource.save_all_lazy()
backends = next(x['backends'] for x in r1.inputs['config'] if 'backends' in x)
backends = next(x['backends'] for x in r1.inputs['config']
if 'backends' in x)
assert len(backends) == 2
@ -604,10 +581,12 @@ def test_nested_two_listdict(rk):
'inputs': {'config': [{"backends": [{}],
'something': 0}]}})
r2 = create_resource(k2, {'name': 'second',
'inputs': {"backends": [{"host": "second_host", "port": 2}],
'inputs': {"backends": [{"host": "second_host",
"port": 2}],
'something': 1}})
r3 = create_resource(k3, {'name': 'third',
'inputs': {"backends": [{"host": "third_host", "port": 3}],
'inputs': {"backends": [{"host": "third_host",
"port": 3}],
'something': 2}})
r2.connect(r1, {'backends': 'config:backends',