Fork me on GitHub

Source code for webtest.app

# (c) 2005 Ian Bicking and contributors; written for Paste
# (http://pythonpaste.org)
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Routines for testing WSGI applications.

Most interesting is TestApp
"""
from __future__ import unicode_literals

import cgi
import fnmatch
import mimetypes
import os
import random
import re
import warnings

from six import StringIO
from six import BytesIO
from six import string_types
from six import binary_type
from six import text_type
from six.moves import http_cookiejar

from webtest.compat import urlparse
from webtest.compat import urlencode
from webtest.compat import to_bytes
from webtest.response import TestResponse
from webtest import forms
from webtest import lint
from webtest import utils

import webob


__all__ = ['TestApp', 'TestRequest']


class AppError(Exception):

    def __init__(self, message, *args):
        if isinstance(message, binary_type):
            message = message.decode('utf8')
        str_args = ()
        for arg in args:
            if isinstance(arg, webob.Response):
                body = arg.body
                if isinstance(body, binary_type):
                    if arg.charset:
                        arg = body.decode(arg.charset)
                    else:
                        arg = repr(body)
            elif isinstance(arg, binary_type):
                try:
                    arg = arg.decode('utf8')
                except UnicodeDecodeError:
                    arg = repr(arg)
            str_args += (arg,)
        message = message % str_args
        Exception.__init__(self, message)


[docs]class TestRequest(webob.BaseRequest): """A subclass of webob.Requset""" ResponseClass = TestResponse
[docs]class TestApp(object): """ Wraps a WSGI application in a more convenient interface for testing. It uses extended version of :class:`webob.BaseRequest` and :class:`webob.Response`. :param app: May be an WSGI application or Paste Deploy app, like ``'config:filename.ini#test'``. .. versionadded:: 2.0 It can also be an actual full URL to an http server and webtest will proxy requests with `wsgiproxy`. :type app: WSGI application :param extra_environ: A dictionary of values that should go into the environment for each request. These can provide a communication channel with the application. :type extra_environ: dict :param relative_to: A directory used for file uploads are calculated relative to this. Also ``config:`` URIs that aren't absolute. :type relative_to: string :param cookiejar: :class:`cookielib.CookieJar` alike API that keeps cookies across requets. :type cookiejar: CookieJar instance .. attribute:: cookies A convenient shortcut for a dict of all cookies in ``cookiejar``. """ RequestClass = TestRequest def __init__(self, app, extra_environ=None, relative_to=None, use_unicode=True, cookiejar=None): if 'WEBTEST_TARGET_URL' in os.environ: app = os.environ['WEBTEST_TARGET_URL'] if isinstance(app, string_types): if app.startswith('http'): from wsgiproxy import HostProxy if '#' not in app: app += '#httplib' url, client = app.split('#', 1) app = HostProxy(url, client=client) else: from paste.deploy import loadapp # @@: Should pick up relative_to from calling module's # __file__ app = loadapp(app, relative_to=relative_to) self.app = app self.relative_to = relative_to if extra_environ is None: extra_environ = {} self.extra_environ = extra_environ self.use_unicode = use_unicode self.cookiejar = cookiejar or http_cookiejar.CookieJar() @property def cookies(self): return dict([(cookie.name, cookie.value) for cookie in self.cookiejar])
[docs] def reset(self): """ Resets the state of the application; currently just clears saved cookies. """ self.cookiejar.clear()
[docs] def get(self, url, params=None, headers=None, extra_environ=None, status=None, expect_errors=False): """ Do a GET request given the url path. :param params: A query string, or a dictionary that will be encoded into a query string. You may also include a URL query string on the ``url``. :param headers: Extra headers to send. :type headers: dictionary :param extra_environ: Environmental variables that should be added to the request. :type extra_environ: dictionary :param status: The HTTP status code you expect in response (if not 200 or 3xx). You can also use a wildcard, like ``'3*'`` or ``'*'``. :type status: integer or string :param expect_errors: If this is False, then if anything is written to environ ``wsgi.errors`` it will be an error. If it is True, then non-200/3xx responses are also okay. :type expect_errors: boolean :returns: :class:`webtest.TestResponse` instance. """ environ = self._make_environ(extra_environ) url = str(url) url = self._remove_fragment(url) if params: if not isinstance(params, string_types): params = urlencode(params, doseq=True) if str('?') in url: url += str('&') else: url += str('?') url += params if str('?') in url: url, environ['QUERY_STRING'] = url.split(str('?'), 1) else: environ['QUERY_STRING'] = str('') req = self.RequestClass.blank(url, environ) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors)
[docs] def post(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a POST request. Similar to :meth:`~webtest.TestApp.get`. :param params: Are put in the body of the request. If params is a iterator it will be urlencoded, if it is string it will not be encoded, but placed in the body directly. Can be a collections.OrderedDict with :class:`webtest.forms.Upload` fields included:: app.post('/myurl', collections.OrderedDict([ ('textfield1', 'value1'), ('uploadfield', webapp.Upload('filename.txt', 'contents'), ('textfield2', 'value2')]))) :param upload_files: It should be a list of ``(fieldname, filename, file_content)``. You can also use just ``(fieldname, filename)`` and the file contents will be read from disk. :type upload_files: list :param content_type: HTTP content type, for example `application/json`. :type content_type: string :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('POST', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type, )
[docs] def put(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a PUT request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('PUT', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type, )
[docs] def patch(self, url, params='', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`. :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('PATCH', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=upload_files, expect_errors=expect_errors, content_type=content_type, )
[docs] def delete(self, url, params='', headers=None, extra_environ=None, status=None, expect_errors=False, content_type=None): """ Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('DELETE', url, params=params, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors, content_type=content_type, )
[docs] def options(self, url, headers=None, extra_environ=None, status=None, expect_errors=False): """ Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('OPTIONS', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors, )
[docs] def head(self, url, headers=None, extra_environ=None, status=None, expect_errors=False): """ Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`. :returns: :class:`webtest.TestResponse` instance. """ return self._gen_request('HEAD', url, headers=headers, extra_environ=extra_environ, status=status, upload_files=None, expect_errors=expect_errors, )
post_json = utils.json_method('POST') put_json = utils.json_method('PUT') patch_json = utils.json_method('PATCH') delete_json = utils.json_method('DELETE')
[docs] def encode_multipart(self, params, files): """ Encodes a set of parameters (typically a name/value list) and a set of files (a list of (name, filename, file_body)) into a typical POST body, returning the (content_type, body). """ boundary = to_bytes(str(random.random()))[2:] boundary = b'----------a_BoUnDaRy' + boundary + b'$' lines = [] def _append_file(file_info): key, filename, value = self._get_file_info(file_info) if isinstance(key, text_type): try: key = key.encode('ascii') except: # pragma: no cover raise # file name must be ascii if isinstance(filename, text_type): fcontent = mimetypes.guess_type(filename)[0] try: filename = filename.encode('utf8') except: # pragma: no cover raise # file name must be ascii or utf8 else: fcontent = mimetypes.guess_type(filename.decode('ascii'))[0] fcontent = to_bytes(fcontent) fcontent = fcontent or b'application/octet-stream' lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; ' + b'name="' + key + b'"; filename="' + filename + b'"', b'Content-Type: ' + fcontent, b'', value]) for key, value in params: if isinstance(key, text_type): try: key = key.encode('ascii') except: # pragma: no cover raise # field name are always ascii if isinstance(value, forms.File): if value.value: _append_file([key] + list(value.value)) elif isinstance(value, forms.Upload): file_info = [key, value.filename] if value.content is not None: file_info.append(value.content) _append_file(file_info) else: if isinstance(value, text_type): value = value.encode('utf8') lines.extend([ b'--' + boundary, b'Content-Disposition: form-data; name="' + key + b'"', b'', value]) for file_info in files: _append_file(file_info) lines.extend([b'--' + boundary + b'--', b'']) body = b'\r\n'.join(lines) boundary = boundary.decode('ascii') content_type = 'multipart/form-data; boundary=%s' % boundary return content_type, body
[docs] def request(self, url_or_req, status=None, expect_errors=False, **req_params): """ Creates and executes a request. You may either pass in an instantiated :class:`TestRequest` object, or you may pass in a URL and keyword arguments to be passed to :meth:`TestRequest.blank`. You can use this to run a request without the intermediary functioning of :meth:`TestApp.get` etc. For instance, to test a WebDAV method:: resp = app.request('/new-col', method='MKCOL') Note that the request won't have a body unless you specify it, like:: resp = app.request('/test.txt', method='PUT', body='test') You can use :class:`webtest.TestRequest`:: req = webtest.TestRequest.blank('/url/', method='GET') resp = app.do_request(req) """ if isinstance(url_or_req, text_type): url_or_req = str(url_or_req) for (k, v) in req_params.items(): if isinstance(v, text_type): req_params[k] = str(v) if isinstance(url_or_req, string_types): req = self.RequestClass.blank(url_or_req, **req_params) else: req = url_or_req.copy() for name, value in req_params.items(): setattr(req, name, value) req.environ['paste.throw_errors'] = True for name, value in self.extra_environ.items(): req.environ.setdefault(name, value) return self.do_request(req, status=status, expect_errors=expect_errors, )
[docs] def do_request(self, req, status, expect_errors): """ Executes the given webob Request (``req``), with the expected ``status``. Generally :meth:`~webtest.TestApp.get` and :meth:`~webtest.TestApp.post` are used instead. To use this:: req = webtest.TestRequest.blank('url', ...args...) resp = app.do_request(req) .. note:: You can pass any keyword arguments to ``TestRequest.blank()``, which will be set on the request. These can be arguments like ``content_type``, ``accept``, etc. """ errors = StringIO() req.environ['wsgi.errors'] = errors script_name = req.environ.get('SCRIPT_NAME', '') if script_name and req.path_info.startswith(script_name): req.path_info = req.path_info[len(script_name):] # set framework hooks req.environ['paste.testing'] = True req.environ['paste.testing_variables'] = {} # set request cookies self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req)) # verify wsgi compatibility app = lint.middleware(self.app) ## FIXME: should it be an option to not catch exc_info? res = req.get_response(app, catch_exc_info=True) # set a few handy attributes res._use_unicode = self.use_unicode res.request = req res.app = app res.test_app = self # We do this to make sure the app_iter is exausted: try: res.body except TypeError: # pragma: no cover pass res.errors = errors.getvalue() for name, value in req.environ['paste.testing_variables'].items(): if hasattr(res, name): raise ValueError( "paste.testing_variables contains the variable %r, but " "the response object already has an attribute by that " "name" % name) setattr(res, name, value) if not expect_errors: self._check_status(status, res) self._check_errors(res) # merge cookies back in self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res), utils._RequestCookieAdapter(req)) return res
def _check_status(self, status, res): if status == '*': return res_status = res.status if (isinstance(status, string_types) and '*' in status): if re.match(fnmatch.translate(status), res_status, re.I): return if isinstance(status, (list, tuple)): if res.status_int not in status: raise AppError( "Bad response: %s (not one of %s for %s)\n%s", res_status, ', '.join(map(str, status)), res.request.url, res) return if status is None: if res.status_int >= 200 and res.status_int < 400: return raise AppError( "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s", res_status, res.request.url, res) if status != res.status_int: raise AppError( "Bad response: %s (not %s)", res_status, status) def _check_errors(self, res): errors = res.errors if errors: raise AppError( "Application had errors logged:\n%s", errors) def _make_environ(self, extra_environ=None): environ = self.extra_environ.copy() environ['paste.throw_errors'] = True if extra_environ: environ.update(extra_environ) return environ def _remove_fragment(self, url): scheme, netloc, path, query, fragment = urlparse.urlsplit(url) return urlparse.urlunsplit((scheme, netloc, path, query, "")) def _gen_request(self, method, url, params=utils.NoDefault, headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False, content_type=None): """ Do a generic request. """ if method == 'DELETE' and params is not utils.NoDefault: warnings.warn(('You are not supposed to send a body in a ' 'DELETE request. Most web servers will ignore it'), lint.WSGIWarning) environ = self._make_environ(extra_environ) inline_uploads = [] # this supports OrderedDict if isinstance(params, dict) or hasattr(params, 'items'): params = list(params.items()) if isinstance(params, (list, tuple)): inline_uploads = [v for (k, v) in params if isinstance(v, (forms.File, forms.Upload))] if len(inline_uploads) > 0: content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type else: params = utils.encode_params(params, content_type) if upload_files or \ (content_type and to_bytes(content_type).startswith(b'multipart')): params = cgi.parse_qsl(params, keep_blank_values=True) content_type, params = self.encode_multipart( params, upload_files or ()) environ['CONTENT_TYPE'] = content_type elif params: environ.setdefault('CONTENT_TYPE', str('application/x-www-form-urlencoded')) if content_type is not None: environ['CONTENT_TYPE'] = content_type environ['REQUEST_METHOD'] = str(method) url = str(url) url = self._remove_fragment(url) req = self.RequestClass.blank(url, environ) if isinstance(params, text_type): params = params.encode(req.charset or 'utf8') req.environ['wsgi.input'] = BytesIO(params) req.content_length = len(params) if headers: req.headers.update(headers) return self.do_request(req, status=status, expect_errors=expect_errors) def _get_file_info(self, file_info): if len(file_info) == 2: # It only has a filename filename = file_info[1] if self.relative_to: filename = os.path.join(self.relative_to, filename) f = open(filename, 'rb') content = f.read() f.close() return (file_info[0], filename, content) elif len(file_info) == 3: content = file_info[2] if not isinstance(content, binary_type): raise ValueError('File content must be %s not %s' % (binary_type, type(content))) return file_info else: raise ValueError( "upload_files need to be a list of tuples of (fieldname, " "filename, filecontent) or (fieldname, filename); " "you gave: %r" % repr(file_info)[:100])

This Page