wsme/wsmeext/tests/test_soap.py
Stéphane Bisinger 9a0d3c1461 Convert built-in types when passed as strings
If on a service exposed with some arguments with a built-in type among
int, long, bool or float a request is made passing a JSON with string
values instead of the intended type, the called function would have
strings as parameters instead of the expected types. This also means
that invalid strings would still be passed without error, leading to
unexpected failures. This patch tries to convert the string to the
intended type before failing with an InvalidInput exception if the
string can't be converted. This is to try and be as nice as possible
with whatever input is thrown at wsme.

Closes-Bug: 1450544
Change-Id: I705c183bb68457d539074b78ce81339b9464e1e0
2015-05-06 23:04:18 +02:00

420 lines
12 KiB
Python

import decimal
import datetime
import base64
import six
import wsme.tests.protocol
try:
import xml.etree.ElementTree as et
except:
import cElementTree as et # noqa
import suds.cache
import suds.client
import suds.transport
import wsme.utils
class XDecimal(suds.xsd.sxbuiltin.XBuiltin):
def translate(self, value, topython=True):
if topython:
if isinstance(value, six.string_types) and len(value):
return decimal.Decimal(value)
else:
if isinstance(value, (decimal.Decimal, int, float)):
return str(value)
return value
suds.xsd.sxbuiltin.Factory.tags['decimal'] = XDecimal
class WebtestSudsTransport(suds.transport.Transport):
def __init__(self, app):
suds.transport.Transport.__init__(self)
self.app = app
def open(self, request):
res = self.app.get(request.url, headers=request.headers)
return six.BytesIO(res.body)
def send(self, request):
res = self.app.post(
request.url,
request.message,
headers=dict((
(key, str(value)) for key, value in request.headers.items()
)),
expect_errors=True
)
return suds.transport.Reply(
res.status_int,
dict(res.headers),
res.body
)
class SudsCache(suds.cache.Cache):
def __init__(self):
self.d = {}
def get(self, id):
return self.d.get(id)
def getf(self, id):
b = self.get(id)
if b is not None:
return six.StringIO(self.get(id))
def put(self, id, bfr):
self.d[id] = bfr
def putf(self, id, fp):
self.put(id, fp.read())
def purge(self, id):
try:
del self.d[id]
except:
pass
def clear(self, id):
self.d = {}
sudscache = SudsCache()
tns = "http://foo.bar.baz/soap/"
typenamespace = "http://foo.bar.baz/types/"
soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
body_qn = '{%s}Body' % soapenv_ns
fault_qn = '{%s}Fault' % soapenv_ns
faultcode_qn = '{%s}faultcode' % soapenv_ns
faultstring_qn = '{%s}faultstring' % soapenv_ns
faultdetail_qn = '{%s}detail' % soapenv_ns
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
def build_soap_message(method, params=""):
message = """<?xml version="1.0"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<soap:Body xmlns="%(typenamespace)s">
<%(method)s>
%(params)s
</%(method)s>
</soap:Body>
</soap:Envelope>
""" % dict(method=method, params=params, typenamespace=typenamespace)
return message
python_types = {
int: ('xs:int', str),
float: ('xs:float', str),
bool: ('xs:boolean', str),
wsme.types.bytes: (
'xs:string',
lambda x: x.decode('ascii') if isinstance(x, wsme.types.bytes) else x
),
wsme.types.text: ('xs:string', wsme.types.text),
wsme.types.binary: (
'xs:base64Binary',
lambda x: base64.encodestring(x).decode('ascii')
),
decimal.Decimal: ('xs:decimal', str),
datetime.date: ('xs:date', datetime.date.isoformat),
datetime.time: ('xs:time', datetime.time.isoformat),
datetime.datetime: ('xs:dateTime', datetime.datetime.isoformat),
}
array_types = {
wsme.types.bytes: "String_Array",
wsme.types.text: "String_Array",
int: "Int_Array",
float: "Float_Array",
bool: "Boolean_Array",
datetime.datetime: "dateTime_Array"
}
if not six.PY3:
array_types[long] = "Long_Array"
def tosoap(tag, value):
el = et.Element(tag)
if isinstance(value, tuple):
value, datatype = value
else:
datatype = type(value)
if value is None:
el.set('xsi:nil', 'true')
return el
if datatype in python_types:
stype, conv = python_types[datatype]
el.text = conv(value)
el.set('xsi:type', stype)
el.text = str(value)
return el
def tosuds(client, value):
if value is None:
return None
if isinstance(value, tuple):
value, datatype = value
else:
datatype = type(value)
if value is None:
return None
if isinstance(datatype, list):
if datatype[0] in array_types:
tname = array_types[datatype[0]]
else:
tname = datatype[0].__name__ + '_Array'
o = client.factory.create('types:' + tname)
o.item = [tosuds(client, (item, datatype[0])) for item in value]
return o
elif datatype in python_types:
return python_types[datatype][1](value)
else:
o = client.factory.create('types:' + datatype.__name__)
for attr in datatype._wsme_attributes:
if attr.name in value:
setattr(
o, attr.name,
tosuds(client, (value[attr.name], attr.datatype))
)
return o
def read_bool(value):
return value == 'true'
soap_types = {
'xs:string': wsme.types.text,
'xs:int': int,
'xs:long': int if six.PY3 else long,
'xs:float': float,
'xs:decimal': decimal.Decimal,
'xs:boolean': read_bool,
'xs:date': wsme.utils.parse_isodate,
'xs:time': wsme.utils.parse_isotime,
'xs:dateTime': wsme.utils.parse_isodatetime,
'xs:base64Binary': base64.decodestring,
}
def fromsoap(el):
if el.get(nil_qn) == 'true':
return None
t = el.get(type_qn)
if t == 'xs:string':
return wsme.types.text(el.text if el.text else '')
if t in soap_types:
return soap_types[t](el.text)
elif t and t.endswith('_Array'):
return [fromsoap(i) for i in el]
else:
d = {}
for child in el:
name = child.tag
assert name.startswith('{%s}' % typenamespace), name
name = name[len(typenamespace) + 2:]
d[name] = fromsoap(child)
return d
def tobytes(value):
if isinstance(value, wsme.types.text):
value = value.encode()
return value
def tobin(value):
value = base64.decodestring(value.encode())
return value
fromsuds_types = {
wsme.types.binary: tobin,
wsme.types.bytes: tobytes,
decimal.Decimal: decimal.Decimal,
}
def fromsuds(dt, value):
if value is None:
return None
if isinstance(dt, list):
return [fromsuds(dt[0], item) for item in value.item]
if wsme.types.isarray(dt):
return [fromsuds(dt.item_type, item) for item in value.item]
if wsme.types.isusertype(dt) and dt not in fromsuds_types:
dt = dt.basetype
if dt in fromsuds_types:
print(dt, value)
value = fromsuds_types[dt](value)
print(value)
return value
if wsme.types.iscomplex(dt):
d = {}
for attrdef in dt._wsme_attributes:
if not hasattr(value, attrdef.name):
continue
d[attrdef.name] = fromsuds(
attrdef.datatype, getattr(value, attrdef.name)
)
return d
return value
class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
protocol = 'soap'
protocol_options = dict(tns=tns, typenamespace=typenamespace)
ws_path = '/'
_sudsclient = None
def setUp(self):
wsme.tests.protocol.ProtocolTestCase.setUp(self)
def test_simple_call(self):
message = build_soap_message('touch')
print(message)
res = self.app.post(
self.ws_path,
message,
headers={"Content-Type": "application/soap+xml; charset=utf-8"},
expect_errors=True
)
print(res.body)
assert res.status.startswith('200')
def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
**kw):
if _no_result_decode or _accept or self._testMethodName in (
'test_missing_argument', 'test_invalid_path', 'test_settext_empty',
'test_settext_none'
):
return self.raw_call(fpath, _rt, _accept, _no_result_decode, **kw)
path = fpath.strip('/').split('/')
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
m = getattr(self.sudsclient.service, methodname)
kw = dict((
(key, tosuds(self.sudsclient, value)) for key, value in kw.items()
))
print(kw)
try:
return fromsuds(_rt, m(**kw))
except suds.WebFault as exc:
raise wsme.tests.protocol.CallException(
exc.fault.faultcode,
exc.fault.faultstring,
getattr(exc.fault, 'detail', None) or None
)
def raw_call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
**kw):
path = fpath.strip('/').split('/')
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
# get the actual definition so we can build the adequate request
if kw:
el = et.Element('parameters')
for key, value in kw.items():
el.append(tosoap(key, value))
params = six.b("\n").join(et.tostring(el) for el in el)
else:
params = ""
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
message = build_soap_message(methodname, params)
print(message)
headers = {"Content-Type": "application/soap+xml; charset=utf-8"}
if _accept is not None:
headers['Accept'] = _accept
res = self.app.post(
self.ws_path,
message,
headers=headers,
expect_errors=True
)
print("Status: ", res.status, "Received:", res.body)
if _no_result_decode:
return res
el = et.fromstring(res.body)
body = el.find(body_qn)
print(body)
if res.status_int == 200:
response_tag = '{%s}%sResponse' % (typenamespace, methodname)
r = body.find(response_tag)
result = r.find('{%s}result' % typenamespace)
print("Result element: ", result)
return fromsoap(result)
elif res.status_int == 400:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
"")
elif res.status_int == 500:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
fault.find(faultdetail_qn) is not None and
fault.find(faultdetail_qn).text or None)
@property
def sudsclient(self):
if self._sudsclient is None:
self._sudsclient = suds.client.Client(
self.ws_path + 'api.wsdl',
transport=WebtestSudsTransport(self.app),
cache=sudscache
)
return self._sudsclient
def test_wsdl(self):
c = self.sudsclient
assert c.wsdl.tns[1] == tns, c.wsdl.tns
sd = c.sd[0]
assert len(sd.ports) == 1
port, methods = sd.ports[0]
self.assertEquals(len(methods), 50)
methods = dict(methods)
assert 'returntypesGettext' in methods
print(methods)
assert methods['argtypesSettime'][0][0] == 'value'
def test_return_nesteddict(self):
pass
def test_setnesteddict(self):
pass
def test_return_objectdictattribute(self):
pass
def test_setnested_nullobj(self):
pass # TODO write a soap adapted version of this test.