
When you register a handler the type for text or data can be a string or a callback that provides a string. By the time you get to create_response this callback should have been called and only a string can be passed. This check originally would have been in place for when a callback was run and the return value was not the expected type, however create_response is a public function that can be used on its own. Correct the exception to clarify in create_response you must pass a string or bytes and not a callback. Change-Id: I3e700afddc1f40454f69a564066495bfc77d91c8 Closes-Bug: #1627506
248 lines
9.7 KiB
Python
248 lines
9.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json as jsonutils
|
|
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.cookies import MockRequest, MockResponse
|
|
from requests.cookies import RequestsCookieJar
|
|
from requests.cookies import merge_cookies, cookiejar_from_dict
|
|
from requests.packages.urllib3.response import HTTPResponse
|
|
import six
|
|
|
|
from requests_mock import compat
|
|
from requests_mock import exceptions
|
|
|
|
_BODY_ARGS = frozenset(['raw', 'body', 'content', 'text', 'json'])
|
|
_HTTP_ARGS = frozenset(['status_code', 'reason', 'headers', 'cookies'])
|
|
|
|
_DEFAULT_STATUS = 200
|
|
_http_adapter = HTTPAdapter()
|
|
|
|
|
|
class CookieJar(RequestsCookieJar):
|
|
|
|
def set(self, name, value, **kwargs):
|
|
"""Add a cookie to the Jar.
|
|
|
|
:param str name: cookie name/key.
|
|
:param str value: cookie value.
|
|
:param int version: Integer or None. Netscape cookies have version 0.
|
|
RFC 2965 and RFC 2109 cookies have a version cookie-attribute of 1.
|
|
However, note that cookielib may 'downgrade' RFC 2109 cookies to
|
|
Netscape cookies, in which case version is 0.
|
|
:param str port: String representing a port or a set of ports
|
|
(eg. '80', or '80,8080'),
|
|
:param str domain: The domain the cookie should apply to.
|
|
:param str path: Cookie path (a string, eg. '/acme/rocket_launchers').
|
|
:param bool secure: True if cookie should only be returned over a
|
|
secure connection.
|
|
:param int expires: Integer expiry date in seconds since epoch or None.
|
|
:param bool discard: True if this is a session cookie.
|
|
:param str comment: String comment from the server explaining the
|
|
function of this cookie.
|
|
:param str comment_url: URL linking to a comment from the server
|
|
explaining the function of this cookie.
|
|
"""
|
|
# just here to provide the function documentation
|
|
return super(CookieJar, self).set(name, value, **kwargs)
|
|
|
|
|
|
def _check_body_arguments(**kwargs):
|
|
# mutual exclusion, only 1 body method may be provided
|
|
provided = [x for x in _BODY_ARGS if kwargs.pop(x, None) is not None]
|
|
|
|
if len(provided) > 1:
|
|
raise RuntimeError('You may only supply one body element. You '
|
|
'supplied %s' % ', '.join(provided))
|
|
|
|
extra = [x for x in kwargs if x not in _HTTP_ARGS]
|
|
|
|
if extra:
|
|
raise TypeError('Too many arguments provided. Unexpected '
|
|
'arguments %s.' % ', '.join(extra))
|
|
|
|
|
|
class _FakeConnection(object):
|
|
"""An object that can mock the necessary parts of a socket interface."""
|
|
|
|
def send(self, request, **kwargs):
|
|
msg = 'This response was created without a connection. You are ' \
|
|
'therefore unable to make a request directly on that connection.'
|
|
raise exceptions.InvalidRequest(msg)
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
def _extract_cookies(request, response, cookies):
|
|
"""Add cookies to the response.
|
|
|
|
Cookies in requests are extracted from the headers in the original_response
|
|
httplib.HTTPMessage which we don't create so we have to do this step
|
|
manually.
|
|
"""
|
|
# This will add cookies set manually via the Set-Cookie or Set-Cookie2
|
|
# header but this only allows 1 cookie to be set.
|
|
http_message = compat._FakeHTTPMessage(response.headers)
|
|
response.cookies.extract_cookies(MockResponse(http_message),
|
|
MockRequest(request))
|
|
|
|
# This allows you to pass either a CookieJar or a dictionary to request_uri
|
|
# or directly to create_response. To allow more than one cookie to be set.
|
|
if cookies:
|
|
merge_cookies(response.cookies, cookies)
|
|
|
|
|
|
class _IOReader(six.BytesIO):
|
|
"""A reader that makes a BytesIO look like a HTTPResponse.
|
|
|
|
A HTTPResponse will return an empty string when you read from it after
|
|
the socket has been closed. A BytesIO will raise a ValueError. For
|
|
compatibility we want to do the same thing a HTTPResponse does.
|
|
"""
|
|
|
|
def read(self, *args, **kwargs):
|
|
if self.closed:
|
|
return six.b('')
|
|
|
|
# not a new style object in python 2
|
|
return six.BytesIO.read(self, *args, **kwargs)
|
|
|
|
|
|
def create_response(request, **kwargs):
|
|
"""
|
|
:param int status_code: The status code to return upon a successful
|
|
match. Defaults to 200.
|
|
:param HTTPResponse raw: A HTTPResponse object to return upon a
|
|
successful match.
|
|
:param io.IOBase body: An IO object with a read() method that can
|
|
return a body on successful match.
|
|
:param bytes content: A byte string to return upon a successful match.
|
|
:param unicode text: A text string to return upon a successful match.
|
|
:param object json: A python object to be converted to a JSON string
|
|
and returned upon a successful match.
|
|
:param dict headers: A dictionary object containing headers that are
|
|
returned upon a successful match.
|
|
:param CookieJar cookies: A cookie jar with cookies to set on the
|
|
response.
|
|
"""
|
|
connection = kwargs.pop('connection', _FakeConnection())
|
|
|
|
_check_body_arguments(**kwargs)
|
|
|
|
raw = kwargs.pop('raw', None)
|
|
body = kwargs.pop('body', None)
|
|
content = kwargs.pop('content', None)
|
|
text = kwargs.pop('text', None)
|
|
json = kwargs.pop('json', None)
|
|
encoding = None
|
|
|
|
if content and not isinstance(content, six.binary_type):
|
|
raise TypeError('Content should be binary data')
|
|
if text and not isinstance(text, six.string_types):
|
|
raise TypeError('Text should be string data')
|
|
|
|
if json is not None:
|
|
text = jsonutils.dumps(json)
|
|
if text is not None:
|
|
encoding = 'utf-8'
|
|
content = text.encode(encoding)
|
|
if content is not None:
|
|
body = _IOReader(content)
|
|
if not raw:
|
|
raw = HTTPResponse(status=kwargs.get('status_code', _DEFAULT_STATUS),
|
|
headers=kwargs.get('headers', {}),
|
|
reason=kwargs.get('reason'),
|
|
body=body or _IOReader(six.b('')),
|
|
decode_content=False,
|
|
preload_content=False,
|
|
original_response=compat._fake_http_response)
|
|
|
|
response = _http_adapter.build_response(request, raw)
|
|
response.connection = connection
|
|
response.encoding = encoding
|
|
|
|
_extract_cookies(request, response, kwargs.get('cookies'))
|
|
|
|
return response
|
|
|
|
|
|
class _Context(object):
|
|
"""Stores the data being used to process a current URL match."""
|
|
|
|
def __init__(self, headers, status_code, reason, cookies):
|
|
self.headers = headers
|
|
self.status_code = status_code
|
|
self.reason = reason
|
|
self.cookies = cookies
|
|
|
|
|
|
class _MatcherResponse(object):
|
|
|
|
def __init__(self, **kwargs):
|
|
self._exc = kwargs.pop('exc', None)
|
|
|
|
# If the user is asking for an exception to be thrown then prevent them
|
|
# specifying any sort of body or status response as it won't be used.
|
|
# This may be protecting the user too much but can be removed later.
|
|
if self._exc and kwargs:
|
|
raise TypeError('Cannot provide other arguments with exc.')
|
|
|
|
_check_body_arguments(**kwargs)
|
|
self._params = kwargs
|
|
|
|
# whilst in general you shouldn't do type checking in python this
|
|
# makes sure we don't end up with differences between the way types
|
|
# are handled between python 2 and 3.
|
|
content = self._params.get('content')
|
|
text = self._params.get('text')
|
|
|
|
if content and not (callable(content) or
|
|
isinstance(content, six.binary_type)):
|
|
raise TypeError('Content should be a callback or binary data')
|
|
|
|
if text and not (callable(text) or
|
|
isinstance(text, six.string_types)):
|
|
raise TypeError('Text should be a callback or string data')
|
|
|
|
def get_response(self, request):
|
|
# if an error was requested then raise that instead of doing response
|
|
if self._exc:
|
|
raise self._exc
|
|
|
|
# If a cookie dict is passed convert it into a CookieJar so that the
|
|
# cookies object available in a callback context is always a jar.
|
|
cookies = self._params.get('cookies', CookieJar())
|
|
if isinstance(cookies, dict):
|
|
cookies = cookiejar_from_dict(cookies, CookieJar())
|
|
|
|
context = _Context(self._params.get('headers', {}).copy(),
|
|
self._params.get('status_code', _DEFAULT_STATUS),
|
|
self._params.get('reason'),
|
|
cookies)
|
|
|
|
# if a body element is a callback then execute it
|
|
def _call(f, *args, **kwargs):
|
|
return f(request, context, *args, **kwargs) if callable(f) else f
|
|
|
|
return create_response(request,
|
|
json=_call(self._params.get('json')),
|
|
text=_call(self._params.get('text')),
|
|
content=_call(self._params.get('content')),
|
|
body=_call(self._params.get('body')),
|
|
raw=self._params.get('raw'),
|
|
status_code=context.status_code,
|
|
reason=context.reason,
|
|
headers=context.headers,
|
|
cookies=context.cookies)
|