Source code for webtest.app

# (c) 2005 Ian Bicking and contributors; written for Paste
# (http://pythonpaste.org)
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Routines for testing WSGI applications.

Most interesting is TestApp
"""

import os
import re
import json
import random
import fnmatch
import mimetypes

from base64 import b64encode
from http import cookiejar as http_cookiejar
from io import BytesIO, StringIO

from webtest.compat import urlparse
from webtest.compat import to_bytes
from webtest.compat import escape_cookie_value
from webtest.response import TestResponse
from webtest import forms
from webtest import lint
from webtest import utils

import webob


__all__ = ['TestApp', 'TestRequest']


class AppError(Exception):

    def __init__(self, message, *args):
        if isinstance(message, bytes):
            message = message.decode('utf8')
        str_args = ()
        for arg in args:
            if isinstance(arg, webob.Response):
                body = arg.body
                if isinstance(body, bytes):
                    if arg.charset:
                        arg = body.decode(arg.charset)
                    else:
                        arg = repr(body)
            elif isinstance(arg, bytes):
                try:
                    arg = arg.decode('utf8')
                except UnicodeDecodeError:
                    arg = repr(arg)
            str_args += (arg,)
        message = message % str_args
        Exception.__init__(self, message)


class CookiePolicy(http_cookiejar.DefaultCookiePolicy):
    """A subclass of DefaultCookiePolicy to allow cookie set for
    Domain=localhost."""

    def return_ok_domain(self, cookie, request):
        if cookie.domain == '.localhost':
            return True
        return http_cookiejar.DefaultCookiePolicy.return_ok_domain(
            self, cookie, request)

    def set_ok_domain(self, cookie, request):
        if cookie.domain == '.localhost':
            return True
        return http_cookiejar.DefaultCookiePolicy.set_ok_domain(
            self, cookie, request)


[docs]class TestRequest(webob.BaseRequest): """A subclass of webob.Request""" ResponseClass = TestResponse
[docs]class TestApp: """ Wraps a WSGI application in a more convenient interface for testing. It uses extended version of :class:`webob.BaseRequest` and :class:`webob.Response`. :param app: May be an WSGI application or Paste Deploy app, like ``'config:filename.ini#test'``. .. versionadded:: 2.0 It can also be an actual full URL to an http server and webtest will proxy requests with `WSGIProxy2 <https://pypi.org/project/WSGIProxy2/>`_. :type app: WSGI application :param extra_environ: A dictionary of values that should go into the environment for each request. These can provide a communication channel with the application. :type extra_environ: dict :param relative_to: A directory used for file uploads are calculated relative to this. Also ``config:`` URIs that aren't absolute. :type relative_to: string :param cookiejar: :class:`cookielib.CookieJar` alike API that keeps cookies across requests. :type cookiejar: CookieJar instance .. attribute:: cookies A convenient shortcut for a dict of all cookies in ``cookiejar``. :param parser_features: Passed to BeautifulSoup when parsing responses. :type parser_features: string or list :param json_encoder: Passed to json.dumps when encoding json :type json_encoder: A subclass of json.JSONEncoder :param lint: If True (default) then check that the application is WSGI compliant :type lint: A boolean """ RequestClass = TestRequest # Tell pytest not to collect this class as tests __test__ = False def __init__(self, app, extra_environ=None, relative_to=None, use_unicode=True, cookiejar=None, parser_features=None, json_encoder=None, lint=True): if 'WEBTEST_TARGET_URL' in os.environ: app = os.environ['WEBTEST_TARGET_URL'] if isinstance(app, str): if app.startswith('http'): try: from wsgiproxy import HostProxy except ImportError: # pragma: no cover raise ImportError( 'Using webtest with a real url requires WSGIProxy2. ' 'Please install it with: ' 'pip install WSGIProxy2') if '#' not in app: app += '#httplib' url, client = app.split('#', 1) app = HostProxy(url, client=client) else: from paste.deploy import loadapp # @@: Should pick up relative_to from calling module's # __file__ app = loadapp(app, relative_to=relative_to) self.app = app self.lint = lint self.relative_to = relative_to if extra_environ is None: extra_environ = {} self.extra_environ = extra_environ self.use_unicode = use_unicode if cookiejar is None: cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy()) self.cookiejar = cookiejar if parser_features is None: parser_features = 'html.parser' self.RequestClass.ResponseClass.parser_features = parser_features if json_encoder is None: json_encoder = json.JSONEncoder self.JSONEncoder = json_encoder
[docs] def get_authorization(self): """Allow to set the HTTP_AUTHORIZATION environ key. Value should look like one of the following: * ``('Basic', ('user', 'password'))`` * ``('Bearer', 'mytoken')`` * ``('JWT', 'myjwt')`` If value is None the the HTTP_AUTHORIZATION is removed """ return self.authorization_value
def set_authorization(self, value): self.authorization_value = value if value is not None: invalid_value = ( "You should use a value like ('Basic', ('user', 'password'))" " OR ('Bearer', 'token') OR ('JWT', 'token')" ) if isinstance(value, (list, tuple)) and len(value) == 2: authtype, val = value if authtype == 'Basic' and val and \ isinstance(val, (list, tuple)): val = ':'.join(list(val)) val = b64encode(to_bytes(val)).strip() val = val.decode('latin1') elif authtype in ('Bearer', 'JWT') and val and \ isinstance(val, (str, str)): val = val.strip() else: raise ValueError(invalid_value) value = str(f'{authtype} {val}') else: raise ValueError(invalid_value) self.extra_environ.update({ 'HTTP_AUTHORIZATION': value, }) else: if 'HTTP_AUTHORIZATION' in self.extra_environ: del self.extra_environ['HTTP_AUTHORIZATION'] authorization = property(get_authorization, set_authorization) @property def cookies(self): return {cookie.name: cookie.value for cookie in self.cookiejar}
[docs] def reset(self): """ Resets the state of the application; currently just clears saved cookies. """ self.cookiejar.clear()
[docs] def set_parser_features(self, parser_features): """ Changes the parser used by BeautifulSoup. See its documentation to know the supported parsers. """ self.RequestClass.ResponseClass.parser_features = parser_features
[docs] def get(self, url, params=None, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a GET request given the url path. :param params: A query string, or a dictionary that will be encoded into a query string. You may also include a URL query string on the ``url``. :param headers: Extra headers to send. :type headers: dictionary :param extra_environ: Environmental variables that should be added to the request. :type extra_environ: dictionary :param status: The HTTP status code you expect in response (if not 200 or 3xx). You can also use a wildcard, like ``'3*'`` or ``'*'``. :type status: integer or string :param expect_errors: If this is False, then if anything is written to environ ``wsgi.errors`` it will be an error. If it is True, then non-200/3xx responses are also okay. :type expect_errors: boolean :param xhr: If this is true, then marks response as ajax. The same as headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } :type xhr: boolean :returns: :class:`webtest.TestResponse` instance. """ environ = self._make_environ(extra_environ) url = str(url) url = self._remove_fragment(url) if params: url = utils.build_params(url, params) if '?' in url: url, environ['QUERY_STRING'] = url.split('?', 1) else: environ['QUERY_STRING'] = '' req = self.RequestClass.blank(url, environ) if xhr: headers = self._add_xhr_header(headers) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors)
[docs] def post(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a POST request. Similar to :meth:`~webtest.TestApp.get`. :param params: Are put in the body of the request. If params is an iterator, it will be urlencoded. If it is a string, it will not be encoded, but placed in the body directly. Can be a :class:`python:collections.OrderedDict` with :class:`webtest.forms.Upload` fields included:: app.post('/myurl', collections.OrderedDict([ ('textfield1', 'value1'), ('uploadfield', webapp.Upload('filename.txt', 'contents'), ('textfield2', 'value2')]))) :param upload_files: It should be a list of ``(fieldname, filename, file_content)``. You can also use just ``(fieldname, filename)`` and the file contents will be read from disk. :type upload_files: list :param content_type: HTTP content type, for example `application/json`. :type content_type: string :param xhr: If this is true, then marks response as ajax. The same as headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } :type xhr: boolean :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('POST', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type)
[docs] def put(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a PUT request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('PUT', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type, )
[docs] def patch(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None, xhr=False): """ Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('PATCH', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type)
[docs] def delete(self, url, params='', headers=None, extra_environ=None, status=None, expect_errors=False, content_type=None, xhr=False): """ Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('DELETE', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors, content_type=content_type)
[docs] def options(self, url, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if xhr: headers = self._add_xhr_header(headers) return self._gen_request('OPTIONS', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors)
[docs] def head(self, url, params=None, headers=None, extra_environ=None, status=None, expect_errors=False, xhr=False): """ Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ if params: url = utils.build_params(url, params) if xhr: headers = self._add_xhr_header(headers) return self._gen_request('HEAD', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors)
post_json = utils.json_method('POST') put_json = utils.json_method('PUT') patch_json = utils.json_method('PATCH') delete_json = utils.json_method('DELETE')
[docs] def encode_multipart(self, params, files): """ Encodes a set of parameters (typically a name/value list) and a set of files (a list of (name, filename, file_body, mimetype)) into a typical POST body, returning the (content_type, body). """ boundary = to_bytes(str(random.random()))[2:] boundary = b'----------a_BoUnDaRy' + boundary + b'$' lines = [] def _append_file(file_info): key, filename, value, fcontent = self._get_file_info(file_info) if isinstance(key, str): try: key = key.encode('ascii') except: # pragma: no cover raise # file name must be ascii if isinstance(filename, str): try: filename = filename.encode('utf8') except: # pragma: no cover raise # file name must be ascii or utf8 if not fcontent: fcontent = mimetypes.guess_type(filename.decode('utf8'))[0] fcontent = to_bytes(fcontent) fcontent = fcontent or b'application/octet-stream' lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; ' + b'name="' + key + b'"; filename="' + filename + b'"', b'Content-Type: ' + fcontent, b'', value]) for key, value in params: if isinstance(key, str): try: key = key.encode('ascii') except: # pragma: no cover raise # field name are always ascii if isinstance(value, forms.File): if "multiple" in value.attrs: for file in value.value: _append_file([key] + list(file)) elif value.value: _append_file([key] + list(value.value)) else: # If no file was uploaded simulate an empty file with no # name like real browsers do: _append_file([key, b'', b'']) elif isinstance(value, forms.Upload): file_info = [key, value.filename] if value.content is not None: file_info.append(value.content) if value.content_type is not None: file_info.append(value.content_type) _append_file(file_info) else: if isinstance(value, int): value = str(value).encode('utf8') elif isinstance(value, str): value = value.encode('utf8') elif not isinstance(value, (bytes, str)): raise ValueError(( 'Value for field {} is a {} ({}). ' 'It must be str, bytes or an int' ).format(key, type(value), value)) lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; name="' + key + b'"', b'', value]) for file_info in files: _append_file(file_info) lines.extend([b'--' + boundary + b'--', b'']) body = b'\r\n'.join(lines) boundary = boundary.decode('ascii') content_type = 'multipart/form-data; boundary=%s' % boundary return content_type, body
[docs] def request(self, url_or_req, status=None, expect_errors=False, **req_params): """ Creates and executes a request. You may either pass in an instantiated :class:`TestRequest` object, or you may pass in a URL and keyword arguments to be passed to :meth:`TestRequest.blank`. You can use this to run a request without the intermediary functioning of :meth:`TestApp.get` etc. For instance, to test a WebDAV method:: resp = app.request('/new-col', method='MKCOL') Note that the request won't have a body unless you specify it, like:: resp = app.request('/test.txt', method='PUT', body='test') You can use :class:`webtest.TestRequest`:: req = webtest.TestRequest.blank('/url/', method='GET') resp = app.do_request(req) """ if isinstance(url_or_req, str): url_or_req = str(url_or_req) for (k, v) in req_params.items(): if isinstance(v, str): req_params[k] = str(v) if isinstance(url_or_req, str): req = self.RequestClass.blank(url_or_req, **req_params) else: req = url_or_req.copy() for name, value in req_params.items(): setattr(req, name, value) req.environ['paste.throw_errors'] = True for name, value in self.extra_environ.items(): req.environ.setdefault(name, value) return self.do_request(req, status=status, expect_errors=expect_errors, )
[docs] def do_request(self, req, status=None, expect_errors=None): """ Executes the given webob Request (``req``), with the expected ``status``. Generally :meth:`~webtest.TestApp.get` and :meth:`~webtest.TestApp.post` are used instead. To use this:: req = webtest.TestRequest.blank('url', ...args...) resp = app.do_request(req) .. note:: You can pass any keyword arguments to ``TestRequest.blank()``, which will be set on the request. These can be arguments like ``content_type``, ``accept``, etc. """ errors = StringIO() req.environ['wsgi.errors'] = errors script_name = req.environ.get('SCRIPT_NAME', '') if script_name and req.path_info.startswith(script_name): req.path_info = req.path_info[len(script_name):] # set framework hooks req.environ['paste.testing'] = True req.environ['paste.testing_variables'] = {} # set request cookies self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req)) # verify wsgi compatibility app = lint.middleware(self.app) if self.lint else self.app # FIXME: should it be an option to not catch exc_info? res = req.get_response(app, catch_exc_info=True) # be sure to decode the content res.decode_content() # set a few handy attributes res._use_unicode = self.use_unicode res.request = req res.app = app res.test_app = self # We do this to make sure the app_iter is exhausted: try: res.body except TypeError: # pragma: no cover pass res.errors = errors.getvalue() for name, value in req.environ['paste.testing_variables'].items(): if hasattr(res, name): raise ValueError( "paste.testing_variables contains the variable %r, but " "the response object already has an attribute by that " "name" % name) setattr(res, name, value) if not expect_errors: self._check_status(status, res) self._check_errors(res) # merge cookies back in self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res), utils._RequestCookieAdapter(req)) return res
def _check_status(self, status, res): if status == '*': return res_status = res.status if (isinstance(status, str) and '*' in status): if re.match(fnmatch.translate(status), res_status, re.I): return if isinstance(status, str): if status == res_status: return if isinstance(status, (list, tuple)): if res.status_int not in status: raise AppError( "Bad response: %s (not one of %s for %s)\n%s", res_status, ', '.join(map(str, status)), res.request.url, res) return if status is None: if res.status_int >= 200 and res.status_int < 400: return raise AppError( "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s", res_status, res.request.url, res) if status != res.status_int: raise AppError( "Bad response: %s (not %s)\n%s", res_status, status, res) def _check_errors(self, res): errors = res.errors if errors: raise AppError( "Application had errors logged:\n%s", errors) def _make_environ(self, extra_environ=None): environ = self.extra_environ.copy() environ['paste.throw_errors'] = True if extra_environ: environ.update(extra_environ) return environ def _remove_fragment(self, url): scheme, netloc, path, query, fragment = urlparse.urlsplit(url) return urlparse.urlunsplit((scheme, netloc, path, query, "")) def _gen_request(self, method, url, params=utils.NoDefault, headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a generic request. """ environ = self._make_environ(extra_environ) inline_uploads = [] # this supports OrderedDict if isinstance(params, dict) or hasattr(params, 'items'): params = list(params.items()) if isinstance(params, (list, tuple)): inline_uploads = [v for (k, v) in params if isinstance(v, (forms.File, forms.Upload))] if len(inline_uploads) > 0: content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type else: params = utils.encode_params(params, content_type) if upload_files or \ (content_type and to_bytes(content_type).startswith(b'multipart')): params = urlparse.parse_qsl(params, keep_blank_values=True) content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type elif params: environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') if content_type is not None: environ['CONTENT_TYPE'] = content_type environ['REQUEST_METHOD'] = str(method) url = str(url) url = self._remove_fragment(url) req = self.RequestClass.blank(url, environ) if isinstance(params, str): params = params.encode(req.charset or 'utf8') req.environ['wsgi.input'] = BytesIO(params) req.content_length = len(params) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors) def _get_file_info(self, file_info): if len(file_info) == 2: # It only has a filename filename = file_info[1] if self.relative_to: filename = os.path.join(self.relative_to, filename) f = open(filename, 'rb') content = f.read() f.close() return (file_info[0], filename, content, None) elif 3 <= len(file_info) <= 4: content = file_info[2] if not isinstance(content, bytes): raise ValueError('File content must be %s not %s' % (bytes, type(content))) if len(file_info) == 3: return tuple(file_info) + (None,) else: return file_info else: raise ValueError( "upload_files need to be a list of tuples of (fieldname, " "filename, filecontent, mimetype) or (fieldname, " "filename, filecontent) or (fieldname, filename); " "you gave: %r" % repr(file_info)[:100]) @staticmethod def _add_xhr_header(headers): headers = headers or {} # if remove str we will be have an error in lint.middleware headers.update({'X-REQUESTED-WITH': 'XMLHttpRequest'}) return headers