# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php """ Routines for testing WSGI applications. Most interesting is the `TestApp `_ for testing WSGI applications, and the `TestFileEnvironment `_ class for testing the effects of command-line scripts. """ from __future__ import print_function import sys import random import mimetypes import time import os import shutil import smtplib import shlex import re import six import subprocess from six.moves import cStringIO as StringIO from six.moves.urllib.parse import urlencode from six.moves.urllib import parse as urlparse try: # Python 3 from http.cookies import BaseCookie from urllib.parse import splittype, splithost except ImportError: # Python 2 from Cookie import BaseCookie from urllib import splittype, splithost from paste import wsgilib from paste import lint from paste.response import HeaderDict def tempnam_no_warning(*args): """ An os.tempnam with the warning turned off, because sometimes you just need to use this and don't care about the stupid security warning. """ return os.tempnam(*args) class NoDefault(object): pass def sorted(l): l = list(l) l.sort() return l class Dummy_smtplib(object): existing = None def __init__(self, server): import warnings warnings.warn( 'Dummy_smtplib is not maintained and is deprecated', DeprecationWarning, 2) assert not self.existing, ( "smtplib.SMTP() called again before Dummy_smtplib.existing.reset() " "called.") self.server = server self.open = True self.__class__.existing = self def quit(self): assert self.open, ( "Called %s.quit() twice" % self) self.open = False def sendmail(self, from_address, to_addresses, msg): self.from_address = from_address self.to_addresses = to_addresses self.message = msg def install(cls): smtplib.SMTP = cls install = classmethod(install) def reset(self): assert not self.open, ( "SMTP connection not quit") self.__class__.existing = None class AppError(Exception): pass class TestApp(object): # for py.test disabled = True def __init__(self, app, namespace=None, relative_to=None, extra_environ=None, pre_request_hook=None, post_request_hook=None): """ Wraps a WSGI application in a more convenient interface for testing. ``app`` may be an application, or a Paste Deploy app URI, like ``'config:filename.ini#test'``. ``namespace`` is a dictionary that will be written to (if provided). This can be used with doctest or some other system, and the variable ``res`` will be assigned everytime you make a request (instead of returning the request). ``relative_to`` is a directory, and filenames used for file uploads are calculated relative to this. Also ``config:`` URIs that aren't absolute. ``extra_environ`` is a dictionary of values that should go into the environment for each request. These can provide a communication channel with the application. ``pre_request_hook`` is a function to be called prior to making requests (such as ``post`` or ``get``). This function must take one argument (the instance of the TestApp). ``post_request_hook`` is a function, similar to ``pre_request_hook``, to be called after requests are made. """ if isinstance(app, (six.binary_type, six.text_type)): from paste.deploy import loadapp # @@: Should pick up relative_to from calling module's # __file__ app = loadapp(app, relative_to=relative_to) self.app = app self.namespace = namespace self.relative_to = relative_to if extra_environ is None: extra_environ = {} self.extra_environ = extra_environ self.pre_request_hook = pre_request_hook self.post_request_hook = post_request_hook self.reset() def reset(self): """ Resets the state of the application; currently just clears saved cookies. """ self.cookies = {} def _make_environ(self): environ = self.extra_environ.copy() environ['paste.throw_errors'] = True return environ def get(self, url, params=None, headers=None, extra_environ=None, status=None, expect_errors=False): """ Get the given url (well, actually a path like ``'/page.html'``). ``params``: A query string, or a dictionary that will be encoded into a query string. You may also include a query string on the ``url``. ``headers``: A dictionary of extra headers to send. ``extra_environ``: A dictionary of environmental variables that should be added to the request. ``status``: The integer status code you expect (if not 200 or 3xx). If you expect a 404 response, for instance, you must give ``status=404`` or it will be an error. You can also give a wildcard, like ``'3*'`` or ``'*'``. ``expect_errors``: If this is not true, then if anything is written to ``wsgi.errors`` it will be an error. If it is true, then non-200/3xx responses are also okay. Returns a `response object `_ """ if extra_environ is None: extra_environ = {} # Hide from py.test: __tracebackhide__ = True if params: if not isinstance(params, (six.binary_type, six.text_type)): params = urlencode(params, doseq=True) if '?' in url: url += '&' else: url += '?' url += params environ = self._make_environ() url = str(url) if '?' in url: url, environ['QUERY_STRING'] = url.split('?', 1) else: environ['QUERY_STRING'] = '' self._set_headers(headers, environ) environ.update(extra_environ) req = TestRequest(url, environ, expect_errors) return self.do_request(req, status=status) def _gen_request(self, method, url, params=b'', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False): """ Do a generic request. """ if headers is None: headers = {} if extra_environ is None: extra_environ = {} environ = self._make_environ() # @@: Should this be all non-strings? if isinstance(params, (list, tuple, dict)): params = urlencode(params) if hasattr(params, 'items'): # Some other multi-dict like format params = urlencode(params.items()) if six.PY3: params = params.encode('utf8') if upload_files: params = urlparse.parse_qsl(params, keep_blank_values=True) content_type, params = self.encode_multipart( params, upload_files) environ['CONTENT_TYPE'] = content_type elif params: environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') if '?' in url: url, environ['QUERY_STRING'] = url.split('?', 1) else: environ['QUERY_STRING'] = '' environ['CONTENT_LENGTH'] = str(len(params)) environ['REQUEST_METHOD'] = method environ['wsgi.input'] = six.BytesIO(params) self._set_headers(headers, environ) environ.update(extra_environ) req = TestRequest(url, environ, expect_errors) return self.do_request(req, status=status) def post(self, url, params=b'', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False): """ Do a POST request. Very like the ``.get()`` method. ``params`` are put in the body of the request. ``upload_files`` is for file uploads. It should be a list of ``[(fieldname, filename, file_content)]``. You can also use just ``[(fieldname, filename)]`` and the file content will be read from disk. Returns a `response object `_ """ return self._gen_request('POST', url, params=params, headers=headers, extra_environ=extra_environ,status=status, upload_files=upload_files, expect_errors=expect_errors) def put(self, url, params=b'', headers=None, extra_environ=None, status=None, upload_files=None, expect_errors=False): """ Do a PUT request. Very like the ``.get()`` method. ``params`` are put in the body of the request. ``upload_files`` is for file uploads. It should be a list of ``[(fieldname, filename, file_content)]``. You can also use just ``[(fieldname, filename)]`` and the file content will be read from disk. Returns a `response object `_ """ return self._gen_request('PUT', url, params=params, headers=headers, extra_environ=extra_environ,status=status, upload_files=upload_files, expect_errors=expect_errors) def delete(self, url, params=b'', headers=None, extra_environ=None, status=None, expect_errors=False): """ Do a DELETE request. Very like the ``.get()`` method. ``params`` are put in the body of the request. Returns a `response object `_ """ return self._gen_request('DELETE', url, params=params, headers=headers, extra_environ=extra_environ,status=status, upload_files=None, expect_errors=expect_errors) def _set_headers(self, headers, environ): """ Turn any headers into environ variables """ if not headers: return for header, value in headers.items(): if header.lower() == 'content-type': var = 'CONTENT_TYPE' elif header.lower() == 'content-length': var = 'CONTENT_LENGTH' else: var = 'HTTP_%s' % header.replace('-', '_').upper() environ[var] = value def encode_multipart(self, params, files): """ Encodes a set of parameters (typically a name/value list) and a set of files (a list of (name, filename, file_body)) into a typical POST body, returning the (content_type, body). """ boundary = '----------a_BoUnDaRy%s$' % random.random() content_type = 'multipart/form-data; boundary=%s' % boundary if six.PY3: boundary = boundary.encode('ascii') lines = [] for key, value in params: lines.append(b'--'+boundary) line = 'Content-Disposition: form-data; name="%s"' % key if six.PY3: line = line.encode('utf8') lines.append(line) lines.append(b'') line = value if six.PY3 and isinstance(line, six.text_type): line = line.encode('utf8') lines.append(line) for file_info in files: key, filename, value = self._get_file_info(file_info) lines.append(b'--'+boundary) line = ('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) if six.PY3: line = line.encode('utf8') lines.append(line) fcontent = mimetypes.guess_type(filename)[0] line = ('Content-Type: %s' % (fcontent or 'application/octet-stream')) if six.PY3: line = line.encode('utf8') lines.append(line) lines.append(b'') lines.append(value) lines.append(b'--' + boundary + b'--') lines.append(b'') body = b'\r\n'.join(lines) return content_type, body def _get_file_info(self, file_info): if len(file_info) == 2: # It only has a filename filename = file_info[1] if self.relative_to: filename = os.path.join(self.relative_to, filename) f = open(filename, 'rb') content = f.read() f.close() return (file_info[0], filename, content) elif len(file_info) == 3: return file_info else: raise ValueError( "upload_files need to be a list of tuples of (fieldname, " "filename, filecontent) or (fieldname, filename); " "you gave: %r" % repr(file_info)[:100]) def do_request(self, req, status): """ Executes the given request (``req``), with the expected ``status``. Generally ``.get()`` and ``.post()`` are used instead. """ if self.pre_request_hook: self.pre_request_hook(self) __tracebackhide__ = True if self.cookies: c = BaseCookie() for name, value in self.cookies.items(): c[name] = value hc = '; '.join(['='.join([m.key, m.value]) for m in c.values()]) req.environ['HTTP_COOKIE'] = hc req.environ['paste.testing'] = True req.environ['paste.testing_variables'] = {} app = lint.middleware(self.app) old_stdout = sys.stdout out = CaptureStdout(old_stdout) try: sys.stdout = out start_time = time.time() raise_on_wsgi_error = not req.expect_errors raw_res = wsgilib.raw_interactive( app, req.url, raise_on_wsgi_error=raise_on_wsgi_error, **req.environ) end_time = time.time() finally: sys.stdout = old_stdout sys.stderr.write(out.getvalue()) res = self._make_response(raw_res, end_time - start_time) res.request = req for name, value in req.environ['paste.testing_variables'].items(): if hasattr(res, name): raise ValueError( "paste.testing_variables contains the variable %r, but " "the response object already has an attribute by that " "name" % name) setattr(res, name, value) if self.namespace is not None: self.namespace['res'] = res if not req.expect_errors: self._check_status(status, res) self._check_errors(res) res.cookies_set = {} for header in res.all_headers('set-cookie'): c = BaseCookie(header) for key, morsel in c.items(): self.cookies[key] = morsel.value res.cookies_set[key] = morsel.value if self.post_request_hook: self.post_request_hook(self) if self.namespace is None: # It's annoying to return the response in doctests, as it'll # be printed, so we only return it is we couldn't assign # it anywhere return res def _check_status(self, status, res): __tracebackhide__ = True if status == '*': return if isinstance(status, (list, tuple)): if res.status not in status: raise AppError( "Bad response: %s (not one of %s for %s)\n%s" % (res.full_status, ', '.join(map(str, status)), res.request.url, res.body)) return if status is None: if res.status >= 200 and res.status < 400: return body = res.body if six.PY3: body = body.decode('utf8', 'xmlcharrefreplace') raise AppError( "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s" % (res.full_status, res.request.url, body)) if status != res.status: raise AppError( "Bad response: %s (not %s)" % (res.full_status, status)) def _check_errors(self, res): if res.errors: raise AppError( "Application had errors logged:\n%s" % res.errors) def _make_response(self, resp, total_time): status, headers, body, errors = resp return TestResponse(self, status, headers, body, errors, total_time) class CaptureStdout(object): def __init__(self, actual): self.captured = StringIO() self.actual = actual def write(self, s): self.captured.write(s) self.actual.write(s) def flush(self): self.actual.flush() def writelines(self, lines): for item in lines: self.write(item) def getvalue(self): return self.captured.getvalue() class TestResponse(object): # for py.test disabled = True """ Instances of this class are return by `TestApp `_ """ def __init__(self, test_app, status, headers, body, errors, total_time): self.test_app = test_app self.status = int(status.split()[0]) self.full_status = status self.headers = headers self.header_dict = HeaderDict.fromlist(self.headers) self.body = body self.errors = errors self._normal_body = None self.time = total_time self._forms_indexed = None def forms__get(self): """ Returns a dictionary of ``Form`` objects. Indexes are both in order (from zero) and by form id (if the form is given an id). """ if self._forms_indexed is None: self._parse_forms() return self._forms_indexed forms = property(forms__get, doc=""" A list of
s found on the page (instances of `Form `_) """) def form__get(self): forms = self.forms if not forms: raise TypeError( "You used response.form, but no forms exist") if 1 in forms: # There is more than one form raise TypeError( "You used response.form, but more than one form exists") return forms[0] form = property(form__get, doc=""" Returns a single `Form `_ instance; it is an error if there are multiple forms on the page. """) _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)(.*?)>', re.S|re.I) def _parse_forms(self): forms = self._forms_indexed = {} form_texts = [] started = None for match in self._tag_re.finditer(self.body): end = match.group(1) == '/' tag = match.group(2).lower() if tag != 'form': continue if end: assert started, ( " unexpected at %s" % match.start()) form_texts.append(self.body[started:match.end()]) started = None else: assert not started, ( "Nested form tags at %s" % match.start()) started = match.start() assert not started, ( "Danging form: %r" % self.body[started:]) for i, text in enumerate(form_texts): form = Form(self, text) forms[i] = form if form.id: forms[form.id] = form def header(self, name, default=NoDefault): """ Returns the named header; an error if there is not exactly one matching header (unless you give a default -- always an error if there is more than one header) """ found = None for cur_name, value in self.headers: if cur_name.lower() == name.lower(): assert not found, ( "Ambiguous header: %s matches %r and %r" % (name, found, value)) found = value if found is None: if default is NoDefault: raise KeyError( "No header found: %r (from %s)" % (name, ', '.join([n for n, v in self.headers]))) else: return default return found def all_headers(self, name): """ Gets all headers by the ``name``, returns as a list """ found = [] for cur_name, value in self.headers: if cur_name.lower() == name.lower(): found.append(value) return found def follow(self, **kw): """ If this request is a redirect, follow that redirect. It is an error if this is not a redirect response. Returns another response object. """ assert self.status >= 300 and self.status < 400, ( "You can only follow redirect responses (not %s)" % self.full_status) location = self.header('location') type, rest = splittype(location) host, path = splithost(rest) # @@: We should test that it's not a remote redirect return self.test_app.get(location, **kw) def click(self, description=None, linkid=None, href=None, anchor=None, index=None, verbose=False): """ Click the link as described. Each of ``description``, ``linkid``, and ``url`` are *patterns*, meaning that they are either strings (regular expressions), compiled regular expressions (objects with a ``search`` method), or callables returning true or false. All the given patterns are ANDed together: * ``description`` is a pattern that matches the contents of the anchor (HTML and all -- everything between ```` and ````) * ``linkid`` is a pattern that matches the ``id`` attribute of the anchor. It will receive the empty string if no id is given. * ``href`` is a pattern that matches the ``href`` of the anchor; the literal content of that attribute, not the fully qualified attribute. * ``anchor`` is a pattern that matches the entire anchor, with its contents. If more than one link matches, then the ``index`` link is followed. If ``index`` is not given and more than one link matches, or if no link matches, then ``IndexError`` will be raised. If you give ``verbose`` then messages will be printed about each link, and why it does or doesn't match. If you use ``app.click(verbose=True)`` you'll see a list of all the links. You can use multiple criteria to essentially assert multiple aspects about the link, e.g., where the link's destination is. """ __tracebackhide__ = True found_html, found_desc, found_attrs = self._find_element( tag='a', href_attr='href', href_extract=None, content=description, id=linkid, href_pattern=href, html_pattern=anchor, index=index, verbose=verbose) return self.goto(found_attrs['uri']) def clickbutton(self, description=None, buttonid=None, href=None, button=None, index=None, verbose=False): """ Like ``.click()``, except looks for link-like buttons. This kind of button should look like ``