wsme/wsme/types.py
2013-04-14 20:37:36 +02:00

637 lines
19 KiB
Python

import base64
import datetime
import decimal
import inspect
import logging
import six
import sys
import weakref
log = logging.getLogger(__name__)
#: The 'str' (python 2) or 'bytes' (python 3) type.
#: Its use should be restricted to
#: pure ascii strings as the protocols will generally not be
#: be able to send non-unicode strings.
#: To transmit binary strings, use the :class:`binary` type
bytes = six.binary_type
#: Unicode string.
text = six.text_type
class ArrayType(object):
def __init__(self, item_type):
if iscomplex(item_type):
self._item_type = weakref.ref(item_type)
else:
self._item_type = item_type
def __hash__(self):
return hash(self.item_type)
def __eq__(self, other):
return isinstance(other, ArrayType) \
and self.item_type == other.item_type
def sample(self):
return [getattr(self.item_type, 'sample', self.item_type)()]
@property
def item_type(self):
if isinstance(self._item_type, weakref.ref):
return self._item_type()
else:
return self._item_type
def validate(self, value):
if value is None:
return
if not isinstance(value, list):
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
self.item_type, type(value)
))
return [
validate_value(self.item_type, item)
for item in value
]
class DictType(object):
def __init__(self, key_type, value_type):
if key_type not in pod_types:
raise ValueError("Dictionnaries key can only be a pod type")
self.key_type = key_type
if iscomplex(value_type):
self._value_type = weakref.ref(value_type)
else:
self._value_type = value_type
def __hash__(self):
return hash((self.key_type, self.value_type))
def sample(self):
key = getattr(self.key_type, 'sample', self.key_type)()
value = getattr(self.value_type, 'sample', self.value_type)()
return {key: value}
@property
def value_type(self):
if isinstance(self._value_type, weakref.ref):
return self._value_type()
else:
return self._value_type
def validate(self, value):
if not isinstance(value, dict):
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
self.key_type, self.value_type, type(value)
))
return dict((
(
validate_value(self.key_type, key),
validate_value(self.value_type, v)
) for key, v in value.items()
))
class UserType(object):
basetype = None
name = None
def validate(self, value):
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
def isusertype(class_):
return isinstance(class_, UserType)
class BinaryType(UserType):
"""
A user type that use base64 strings to carry binary data.
"""
basetype = bytes
name = 'binary'
def tobasetype(self, value):
if value is None:
return None
return base64.encodestring(value)
def frombasetype(self, value):
if value is None:
return None
return base64.decodestring(value)
#: The binary almost-native type
binary = BinaryType()
class Enum(UserType):
"""
A simple enumeration type. Can be based on any non-complex type.
:param basetype: The actual data type
:param values: A set of possible values
If nullable, 'None' should be added the values set.
Example::
Gender = Enum(str, 'male', 'female')
Specie = Enum(str, 'cat', 'dog')
"""
def __init__(self, basetype, *values, **kw):
self.basetype = basetype
self.values = set(values)
name = kw.pop('name', None)
if name is None:
name = "Enum(%s)" % ', '.join((str(v) for v in values))
self.name = name
def validate(self, value):
if value not in self.values:
raise ValueError("Value '%s' is invalid (should be one of: %s)" % (
value, ', '.join(self.values)))
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
class UnsetType(object):
if sys.version < '3':
def __nonzero__(self):
return False
else:
def __bool__(self):
return False
def __repr__(self):
return 'Unset'
Unset = UnsetType()
#: A special type that corresponds to the host framework request object.
#: It can only be used in the function parameters, and if so the request object
#: of the host framework will be passed to the function.
HostRequest = object()
pod_types = six.integer_types + (
bytes, text, float, bool)
dt_types = (datetime.date, datetime.time, datetime.datetime)
extra_types = (binary, decimal.Decimal)
native_types = pod_types + dt_types + extra_types
# The types for which we allow promotion to certain numbers.
_promotable_types = six.integer_types + (text, bytes)
def iscomplex(datatype):
return inspect.isclass(datatype) \
and '_wsme_attributes' in datatype.__dict__
def isarray(datatype):
return isinstance(datatype, ArrayType)
def isdict(datatype):
return isinstance(datatype, DictType)
def validate_value(datatype, value):
if value in (Unset, None):
return value
# Try to promote the data type to one of our complex types.
if isinstance(datatype, list):
datatype = ArrayType(datatype[0])
elif isinstance(datatype, dict):
datatype = DictType(*list(datatype.items())[0])
# If the datatype has its own validator, use that.
if hasattr(datatype, 'validate'):
return datatype.validate(value)
# Do type promotion/conversion and data validation for builtin
# types.
v_type = type(value)
if datatype in six.integer_types:
if v_type in _promotable_types:
try:
# Try to turn the value into an int
value = datatype(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is float and v_type in _promotable_types:
try:
value = float(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is text and isinstance(value, bytes):
value = value.decode()
elif datatype is bytes and isinstance(value, text):
value = value.encode()
if not isinstance(value, datatype):
raise ValueError(
"Wrong type. Expected '%s', got '%s'" % (
datatype, v_type
))
return value
class wsproperty(property):
"""
A specialised :class:`property` to define typed-property on complex types.
Example::
class MyComplexType(wsme.types.Base):
def get_aint(self):
return self._aint
def set_aint(self, value):
assert avalue < 10 # Dummy input validation
self._aint = value
aint = wsproperty(int, get_aint, set_aint, mandatory=True)
"""
def __init__(self, datatype, fget, fset=None,
mandatory=False, doc=None, name=None):
property.__init__(self, fget, fset)
#: The property name in the parent python class
self.key = None
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
#: property data type
self.datatype = datatype
#: True if the property is mandatory
self.mandatory = mandatory
class wsattr(object):
"""
Complex type attribute definition.
Example::
class MyComplexType(wsme.types.Base):
optionalvalue = int
mandatoryvalue = wsattr(int, mandatory=True)
named_value = wsattr(int, name='named.value')
After inspection, the non-wsattr attributes will be replaced, and
the above class will be equivalent to::
class MyComplexType(wsme.types.Base):
optionalvalue = wsattr(int)
mandatoryvalue = wsattr(int, mandatory=True)
"""
def __init__(self, datatype, mandatory=False, name=None, default=Unset):
#: The attribute name in the parent python class.
#: Set by :func:`inspect_class`
self.key = None # will be set by class inspection
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
self._datatype = (datatype,)
#: True if the attribute is mandatory
self.mandatory = mandatory
#: Default value. The attribute will return this instead
#: of :data:`Unset` if no value has been set.
self.default = default
self.complextype = None
def _get_dataholder(self, instance):
dataholder = getattr(instance, '_wsme_dataholder', None)
if dataholder is None:
dataholder = instance._wsme_DataHolderClass()
instance._wsme_dataholder = dataholder
return dataholder
def __get__(self, instance, owner):
if instance is None:
return self
return getattr(
self._get_dataholder(instance),
self.key,
self.default
)
def __set__(self, instance, value):
try:
value = validate_value(self.datatype, value)
except ValueError:
e = sys.exc_info()[1]
raise ValueError("%s: %s" % (self.name, e))
dataholder = self._get_dataholder(instance)
if value is Unset:
if hasattr(dataholder, self.key):
delattr(dataholder, self.key)
else:
setattr(dataholder, self.key, value)
def __delete__(self, instance):
self.__set__(instance, Unset)
def _get_datatype(self):
if isinstance(self._datatype, tuple):
self._datatype = \
self.complextype().__registry__.resolve_type(self._datatype[0])
if isinstance(self._datatype, weakref.ref):
return self._datatype()
if isinstance(self._datatype, list):
return [
item() if isinstance(item, weakref.ref) else item
for item in self._datatype
]
return self._datatype
def _set_datatype(self, datatype):
self._datatype = datatype
#: attribute data type. Can be either an actual type,
#: or a type name, in which case the actual type will be
#: determined when needed (generaly just before scaning the api).
datatype = property(_get_datatype, _set_datatype)
def iswsattr(attr):
if inspect.isfunction(attr) or inspect.ismethod(attr):
return False
if isinstance(attr, property) and not isinstance(attr, wsproperty):
return False
return True
def sort_attributes(class_, attributes):
"""Sort a class attributes list.
3 mechanisms are attempted :
#. Look for a _wsme_attr_order attribute on the class_. This allow
to define an arbitrary order of the attributes (usefull for
generated types).
#. Access the object source code to find the declaration order.
#. Sort by alphabetically"""
if not len(attributes):
return
attrs = dict((a.key, a) for a in attributes)
if hasattr(class_, '_wsme_attr_order'):
names_order = class_._wsme_attr_order
else:
names = attrs.keys()
names_order = []
try:
lines = []
for cls in inspect.getmro(class_):
if cls is object:
continue
lines[len(lines):] = inspect.getsourcelines(cls)[0]
for line in lines:
line = line.strip().replace(" ", "")
if '=' in line:
aname = line[:line.index('=')]
if aname in names and aname not in names_order:
names_order.append(aname)
if len(names_order) < len(names):
names_order.extend((
name for name in names if name not in names_order))
assert len(names_order) == len(names)
except (TypeError, IOError):
names_order = list(names)
names_order.sort()
attributes[:] = [attrs[name] for name in names_order]
def inspect_class(class_):
"""Extract a list of (name, wsattr|wsproperty) for the given class_"""
attributes = []
for name, attr in inspect.getmembers(class_, iswsattr):
if name.startswith('_'):
continue
if inspect.isroutine(attr):
continue
if isinstance(attr, wsattr):
attrdef = attr
elif isinstance(attr, wsproperty):
attrdef = attr
else:
if attr not in native_types and (
inspect.isclass(attr)
or isinstance(attr, list)
or isinstance(attr, dict)):
register_type(attr)
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
attrdef.key = name
if attrdef.name is None:
attrdef.name = name
attrdef.complextype = weakref.ref(class_)
attributes.append(attrdef)
setattr(class_, name, attrdef)
sort_attributes(class_, attributes)
return attributes
def list_attributes(class_):
"""
Returns a list of a complex type attributes.
"""
if not iscomplex(class_):
raise TypeError("%s is not a registered type")
return class_._wsme_attributes
def make_dataholder(class_):
# the slots are computed outside the class scope to avoid
# 'attr' to pullute the class namespace, which leads to weird
# things if one of the slots is named 'attr'.
slots = [attr.key for attr in class_._wsme_attributes]
class DataHolder(object):
__slots__ = slots
DataHolder.__name__ = class_.__name__ + 'DataHolder'
return DataHolder
class Registry(object):
def __init__(self):
self._complex_types = []
self.array_types = set()
self.dict_types = set()
@property
def complex_types(self):
return [t() for t in self._complex_types if t()]
def register(self, class_):
"""
Make sure a type is registered.
It is automatically called by :class:`expose() <wsme.expose>`
and :class:`validate() <wsme.validate>`.
Unless you want to control when the class inspection is done there
is no need to call it.
"""
if class_ is None or \
class_ in native_types or \
isusertype(class_) or iscomplex(class_) or \
isarray(class_) or isdict(class_):
return class_
if isinstance(class_, list):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = ArrayType(class_[0])
self.register(dt.item_type)
self.array_types.add(dt)
return dt
if isinstance(class_, dict):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = DictType(*list(class_.items())[0])
self.register(dt.value_type)
self.dict_types.add(dt)
return dt
class_._wsme_attributes = None
class_._wsme_attributes = inspect_class(class_)
class_._wsme_DataHolderClass = make_dataholder(class_)
class_.__registry__ = self
self._complex_types.append(weakref.ref(class_))
return class_
def lookup(self, typename):
log.debug('Lookup %s' % typename)
modname = None
if '.' in typename:
modname, typename = typename.rsplit('.', 1)
for ct in self._complex_types:
ct = ct()
if ct is not None and typename == ct.__name__ and (
modname is None or modname == ct.__module__):
return ct
def resolve_type(self, type_):
if isinstance(type_, six.string_types):
return self.lookup(type_)
if isinstance(type_, list):
type_ = ArrayType(type_[0])
if isinstance(type_, dict):
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
if isinstance(type_, ArrayType):
type_ = ArrayType(self.resolve_type(type_.item_type))
self.array_types.add(type_)
elif isinstance(type_, DictType):
type_ = DictType(
type_.key_type,
self.resolve_type(type_.value_type)
)
self.dict_types.add(type_)
else:
type_ = self.register(type_)
return type_
# Default type registry
registry = Registry()
def register_type(class_):
return registry.register(class_)
class BaseMeta(type):
def __new__(cls, name, bases, dct):
if bases[0] is not object and '__registry__' not in dct:
dct['__registry__'] = registry
return type.__new__(cls, name, bases, dct)
def __init__(cls, name, bases, dct):
if bases[0] is not object:
cls.__registry__.register(cls)
class Base(six.with_metaclass(BaseMeta)):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)
class File(Base):
"""A complex type that represents a file.
In the particular case of protocol accepting form encoded data as
input, File can be loaded from a form file field.
"""
#: The file name
filename = wsattr(text)
#: Mime type of the content
contenttype = wsattr(text)
def _get_content(self):
if self._content is None and self._file:
self._content = self._file.read()
return self._content
def _set_content(self, value):
self._content = value
self._file = None
#: File content
content = wsproperty(binary, _get_content, _set_content)
def __init__(self, filename=None, file=None, content=None,
contenttype=None, fieldstorage=None):
self.filename = filename
self.contenttype = contenttype
self._file = file
self._content = content
if fieldstorage is not None:
if fieldstorage.file:
self._file = fieldstorage.file
self.filename = fieldstorage.filename
self.contenttype = text(fieldstorage.type)
else:
self._content = fieldstorage.value
@property
def file(self):
if self._file is None and self._content:
self._file = six.BytesIO(self._content)
return self._file