Source code for _pytest.fixtures

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import functools
import inspect
import sys
import warnings
from collections import defaultdict
from collections import deque
from collections import OrderedDict

import attr
import py
import six
from more_itertools import flatten
from py._code.code import FormattedExcinfo

import _pytest
from _pytest import nodes
from _pytest._code.code import TerminalRepr
from _pytest.compat import _format_args
from _pytest.compat import _PytestWrapper
from _pytest.compat import exc_clear
from _pytest.compat import FuncargnamesCompatAttr
from _pytest.compat import get_real_func
from _pytest.compat import get_real_method
from _pytest.compat import getfslineno
from _pytest.compat import getfuncargnames
from _pytest.compat import getimfunc
from _pytest.compat import getlocation
from _pytest.compat import is_generator
from _pytest.compat import isclass
from _pytest.compat import NOTSET
from _pytest.compat import safe_getattr
from _pytest.deprecated import FIXTURE_FUNCTION_CALL
from _pytest.deprecated import FIXTURE_NAMED_REQUEST
from _pytest.outcomes import fail
from _pytest.outcomes import TEST_OUTCOME

FIXTURE_MSG = 'fixtures cannot have "pytest_funcarg__" prefix and be decorated with @pytest.fixture:\n{}'


@attr.s(frozen=True)
class PseudoFixtureDef(object):
    cached_result = attr.ib()
    scope = attr.ib()


def pytest_sessionstart(session):
    import _pytest.python
    import _pytest.nodes

    scopename2class.update(
        {
            "package": _pytest.python.Package,
            "class": _pytest.python.Class,
            "module": _pytest.python.Module,
            "function": _pytest.nodes.Item,
            "session": _pytest.main.Session,
        }
    )
    session._fixturemanager = FixtureManager(session)


scopename2class = {}


scope2props = dict(session=())
scope2props["package"] = ("fspath",)
scope2props["module"] = ("fspath", "module")
scope2props["class"] = scope2props["module"] + ("cls",)
scope2props["instance"] = scope2props["class"] + ("instance",)
scope2props["function"] = scope2props["instance"] + ("function", "keywords")


def scopeproperty(name=None, doc=None):
    def decoratescope(func):
        scopename = name or func.__name__

        def provide(self):
            if func.__name__ in scope2props[self.scope]:
                return func(self)
            raise AttributeError(
                "%s not available in %s-scoped context" % (scopename, self.scope)
            )

        return property(provide, None, None, func.__doc__)

    return decoratescope


def get_scope_package(node, fixturedef):
    import pytest

    cls = pytest.Package
    current = node
    fixture_package_name = "%s/%s" % (fixturedef.baseid, "__init__.py")
    while current and (
        type(current) is not cls or fixture_package_name != current.nodeid
    ):
        current = current.parent
    if current is None:
        return node.session
    return current


def get_scope_node(node, scope):
    cls = scopename2class.get(scope)
    if cls is None:
        raise ValueError("unknown scope")
    return node.getparent(cls)


def add_funcarg_pseudo_fixture_def(collector, metafunc, fixturemanager):
    # this function will transform all collected calls to a functions
    # if they use direct funcargs (i.e. direct parametrization)
    # because we want later test execution to be able to rely on
    # an existing FixtureDef structure for all arguments.
    # XXX we can probably avoid this algorithm  if we modify CallSpec2
    # to directly care for creating the fixturedefs within its methods.
    if not metafunc._calls[0].funcargs:
        return  # this function call does not have direct parametrization
    # collect funcargs of all callspecs into a list of values
    arg2params = {}
    arg2scope = {}
    for callspec in metafunc._calls:
        for argname, argvalue in callspec.funcargs.items():
            assert argname not in callspec.params
            callspec.params[argname] = argvalue
            arg2params_list = arg2params.setdefault(argname, [])
            callspec.indices[argname] = len(arg2params_list)
            arg2params_list.append(argvalue)
            if argname not in arg2scope:
                scopenum = callspec._arg2scopenum.get(argname, scopenum_function)
                arg2scope[argname] = scopes[scopenum]
        callspec.funcargs.clear()

    # register artificial FixtureDef's so that later at test execution
    # time we can rely on a proper FixtureDef to exist for fixture setup.
    arg2fixturedefs = metafunc._arg2fixturedefs
    for argname, valuelist in arg2params.items():
        # if we have a scope that is higher than function we need
        # to make sure we only ever create an according fixturedef on
        # a per-scope basis. We thus store and cache the fixturedef on the
        # node related to the scope.
        scope = arg2scope[argname]
        node = None
        if scope != "function":
            node = get_scope_node(collector, scope)
            if node is None:
                assert scope == "class" and isinstance(collector, _pytest.python.Module)
                # use module-level collector for class-scope (for now)
                node = collector
        if node and argname in node._name2pseudofixturedef:
            arg2fixturedefs[argname] = [node._name2pseudofixturedef[argname]]
        else:
            fixturedef = FixtureDef(
                fixturemanager,
                "",
                argname,
                get_direct_param_fixture_func,
                arg2scope[argname],
                valuelist,
                False,
                False,
            )
            arg2fixturedefs[argname] = [fixturedef]
            if node is not None:
                node._name2pseudofixturedef[argname] = fixturedef


def getfixturemarker(obj):
    """ return fixturemarker or None if it doesn't exist or raised
    exceptions."""
    try:
        return getattr(obj, "_pytestfixturefunction", None)
    except TEST_OUTCOME:
        # some objects raise errors like request (from flask import request)
        # we don't expect them to be fixture functions
        return None


def get_parametrized_fixture_keys(item, scopenum):
    """ return list of keys for all parametrized arguments which match
    the specified scope. """
    assert scopenum < scopenum_function  # function
    try:
        cs = item.callspec
    except AttributeError:
        pass
    else:
        # cs.indices.items() is random order of argnames.  Need to
        # sort this so that different calls to
        # get_parametrized_fixture_keys will be deterministic.
        for argname, param_index in sorted(cs.indices.items()):
            if cs._arg2scopenum[argname] != scopenum:
                continue
            if scopenum == 0:  # session
                key = (argname, param_index)
            elif scopenum == 1:  # package
                key = (argname, param_index, item.fspath.dirpath())
            elif scopenum == 2:  # module
                key = (argname, param_index, item.fspath)
            elif scopenum == 3:  # class
                key = (argname, param_index, item.fspath, item.cls)
            yield key


# algorithm for sorting on a per-parametrized resource setup basis
# it is called for scopenum==0 (session) first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns


def reorder_items(items):
    argkeys_cache = {}
    items_by_argkey = {}
    for scopenum in range(0, scopenum_function):
        argkeys_cache[scopenum] = d = {}
        items_by_argkey[scopenum] = item_d = defaultdict(deque)
        for item in items:
            keys = OrderedDict.fromkeys(get_parametrized_fixture_keys(item, scopenum))
            if keys:
                d[item] = keys
                for key in keys:
                    item_d[key].append(item)
    items = OrderedDict.fromkeys(items)
    return list(reorder_items_atscope(items, argkeys_cache, items_by_argkey, 0))


def fix_cache_order(item, argkeys_cache, items_by_argkey):
    for scopenum in range(0, scopenum_function):
        for key in argkeys_cache[scopenum].get(item, []):
            items_by_argkey[scopenum][key].appendleft(item)


def reorder_items_atscope(items, argkeys_cache, items_by_argkey, scopenum):
    if scopenum >= scopenum_function or len(items) < 3:
        return items
    ignore = set()
    items_deque = deque(items)
    items_done = OrderedDict()
    scoped_items_by_argkey = items_by_argkey[scopenum]
    scoped_argkeys_cache = argkeys_cache[scopenum]
    while items_deque:
        no_argkey_group = OrderedDict()
        slicing_argkey = None
        while items_deque:
            item = items_deque.popleft()
            if item in items_done or item in no_argkey_group:
                continue
            argkeys = OrderedDict.fromkeys(
                k for k in scoped_argkeys_cache.get(item, []) if k not in ignore
            )
            if not argkeys:
                no_argkey_group[item] = None
            else:
                slicing_argkey, _ = argkeys.popitem()
                # we don't have to remove relevant items from later in the deque because they'll just be ignored
                matching_items = [
                    i for i in scoped_items_by_argkey[slicing_argkey] if i in items
                ]
                for i in reversed(matching_items):
                    fix_cache_order(i, argkeys_cache, items_by_argkey)
                    items_deque.appendleft(i)
                break
        if no_argkey_group:
            no_argkey_group = reorder_items_atscope(
                no_argkey_group, argkeys_cache, items_by_argkey, scopenum + 1
            )
            for item in no_argkey_group:
                items_done[item] = None
        ignore.add(slicing_argkey)
    return items_done


def fillfixtures(function):
    """ fill missing funcargs for a test function. """
    try:
        request = function._request
    except AttributeError:
        # XXX this special code path is only expected to execute
        # with the oejskit plugin.  It uses classes with funcargs
        # and we thus have to work a bit to allow this.
        fm = function.session._fixturemanager
        fi = fm.getfixtureinfo(function.parent, function.obj, None)
        function._fixtureinfo = fi
        request = function._request = FixtureRequest(function)
        request._fillfixtures()
        # prune out funcargs for jstests
        newfuncargs = {}
        for name in fi.argnames:
            newfuncargs[name] = function.funcargs[name]
        function.funcargs = newfuncargs
    else:
        request._fillfixtures()


def get_direct_param_fixture_func(request):
    return request.param


@attr.s(slots=True)
class FuncFixtureInfo(object):
    # original function argument names
    argnames = attr.ib(type=tuple)
    # argnames that function immediately requires. These include argnames +
    # fixture names specified via usefixtures and via autouse=True in fixture
    # definitions.
    initialnames = attr.ib(type=tuple)
    names_closure = attr.ib()  # type: List[str]
    name2fixturedefs = attr.ib()  # type: List[str, List[FixtureDef]]

    def prune_dependency_tree(self):
        """Recompute names_closure from initialnames and name2fixturedefs

        Can only reduce names_closure, which means that the new closure will
        always be a subset of the old one. The order is preserved.

        This method is needed because direct parametrization may shadow some
        of the fixtures that were included in the originally built dependency
        tree. In this way the dependency tree can get pruned, and the closure
        of argnames may get reduced.
        """
        closure = set()
        working_set = set(self.initialnames)
        while working_set:
            argname = working_set.pop()
            # argname may be smth not included in the original names_closure,
            # in which case we ignore it. This currently happens with pseudo
            # FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
            # So they introduce the new dependency 'request' which might have
            # been missing in the original tree (closure).
            if argname not in closure and argname in self.names_closure:
                closure.add(argname)
                if argname in self.name2fixturedefs:
                    working_set.update(self.name2fixturedefs[argname][-1].argnames)

        self.names_closure[:] = sorted(closure, key=self.names_closure.index)


[docs]class FixtureRequest(FuncargnamesCompatAttr): """ A request for a fixture from a test or fixture function. A request object gives access to the requesting test context and has an optional ``param`` attribute in case the fixture is parametrized indirectly. """ def __init__(self, pyfuncitem): self._pyfuncitem = pyfuncitem #: fixture for which this request is being performed self.fixturename = None #: Scope string, one of "function", "class", "module", "session" self.scope = "function" self._fixture_defs = {} # argname -> FixtureDef fixtureinfo = pyfuncitem._fixtureinfo self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy() self._arg2index = {} self._fixturemanager = pyfuncitem.session._fixturemanager @property def fixturenames(self): """names of all active fixtures in this request""" result = list(self._pyfuncitem._fixtureinfo.names_closure) result.extend(set(self._fixture_defs).difference(result)) return result @property def node(self): """ underlying collection node (depends on current request scope)""" return self._getscopeitem(self.scope) def _getnextfixturedef(self, argname): fixturedefs = self._arg2fixturedefs.get(argname, None) if fixturedefs is None: # we arrive here because of a dynamic call to # getfixturevalue(argname) usage which was naturally # not known at parsing/collection time parentid = self._pyfuncitem.parent.nodeid fixturedefs = self._fixturemanager.getfixturedefs(argname, parentid) self._arg2fixturedefs[argname] = fixturedefs # fixturedefs list is immutable so we maintain a decreasing index index = self._arg2index.get(argname, 0) - 1 if fixturedefs is None or (-index > len(fixturedefs)): raise FixtureLookupError(argname, self) self._arg2index[argname] = index return fixturedefs[index] @property def config(self): """ the pytest config object associated with this request. """ return self._pyfuncitem.config @scopeproperty() def function(self): """ test function object if the request has a per-function scope. """ return self._pyfuncitem.obj @scopeproperty("class") def cls(self): """ class (can be None) where the test function was collected. """ clscol = self._pyfuncitem.getparent(_pytest.python.Class) if clscol: return clscol.obj @property def instance(self): """ instance (can be None) on which test function was collected. """ # unittest support hack, see _pytest.unittest.TestCaseFunction try: return self._pyfuncitem._testcase except AttributeError: function = getattr(self, "function", None) return getattr(function, "__self__", None) @scopeproperty() def module(self): """ python module object where the test function was collected. """ return self._pyfuncitem.getparent(_pytest.python.Module).obj @scopeproperty() def fspath(self): """ the file system path of the test module which collected this test. """ return self._pyfuncitem.fspath @property def keywords(self): """ keywords/markers dictionary for the underlying node. """ return self.node.keywords @property def session(self): """ pytest session object. """ return self._pyfuncitem.session
[docs] def addfinalizer(self, finalizer): """ add finalizer/teardown function to be called after the last test within the requesting test context finished execution. """ # XXX usually this method is shadowed by fixturedef specific ones self._addfinalizer(finalizer, scope=self.scope)
def _addfinalizer(self, finalizer, scope): colitem = self._getscopeitem(scope) self._pyfuncitem.session._setupstate.addfinalizer( finalizer=finalizer, colitem=colitem )
[docs] def applymarker(self, marker): """ Apply a marker to a single test function invocation. This method is useful if you don't want to have a keyword/marker on all function invocations. :arg marker: a :py:class:`_pytest.mark.MarkDecorator` object created by a call to ``pytest.mark.NAME(...)``. """ self.node.add_marker(marker)
[docs] def raiseerror(self, msg): """ raise a FixtureLookupError with the given message. """ raise self._fixturemanager.FixtureLookupError(None, self, msg)
def _fillfixtures(self): item = self._pyfuncitem fixturenames = getattr(item, "fixturenames", self.fixturenames) for argname in fixturenames: if argname not in item.funcargs: item.funcargs[argname] = self.getfixturevalue(argname)
[docs] def cached_setup(self, setup, teardown=None, scope="module", extrakey=None): """ (deprecated) Return a testing resource managed by ``setup`` & ``teardown`` calls. ``scope`` and ``extrakey`` determine when the ``teardown`` function will be called so that subsequent calls to ``setup`` would recreate the resource. With pytest-2.3 you often do not need ``cached_setup()`` as you can directly declare a scope on a fixture function and register a finalizer through ``request.addfinalizer()``. :arg teardown: function receiving a previously setup resource. :arg setup: a no-argument function creating a resource. :arg scope: a string value out of ``function``, ``class``, ``module`` or ``session`` indicating the caching lifecycle of the resource. :arg extrakey: added to internal caching key of (funcargname, scope). """ from _pytest.deprecated import CACHED_SETUP warnings.warn(CACHED_SETUP, stacklevel=2) if not hasattr(self.config, "_setupcache"): self.config._setupcache = {} # XXX weakref? cachekey = (self.fixturename, self._getscopeitem(scope), extrakey) cache = self.config._setupcache try: val = cache[cachekey] except KeyError: self._check_scope(self.fixturename, self.scope, scope) val = setup() cache[cachekey] = val if teardown is not None: def finalizer(): del cache[cachekey] teardown(val) self._addfinalizer(finalizer, scope=scope) return val
[docs] def getfixturevalue(self, argname): """ Dynamically run a named fixture function. Declaring fixtures via function argument is recommended where possible. But if you can only decide whether to use another fixture at test setup time, you may use this function to retrieve it inside a fixture or test function body. """ return self._get_active_fixturedef(argname).cached_result[0]
[docs] def getfuncargvalue(self, argname): """ Deprecated, use getfixturevalue. """ from _pytest import deprecated warnings.warn(deprecated.GETFUNCARGVALUE, stacklevel=2) return self.getfixturevalue(argname)
def _get_active_fixturedef(self, argname): try: return self._fixture_defs[argname] except KeyError: try: fixturedef = self._getnextfixturedef(argname) except FixtureLookupError: if argname == "request": cached_result = (self, [0], None) scope = "function" return PseudoFixtureDef(cached_result, scope) raise # remove indent to prevent the python3 exception # from leaking into the call self._compute_fixture_value(fixturedef) self._fixture_defs[argname] = fixturedef return fixturedef def _get_fixturestack(self): current = self values = [] while 1: fixturedef = getattr(current, "_fixturedef", None) if fixturedef is None: values.reverse() return values values.append(fixturedef) current = current._parent_request def _compute_fixture_value(self, fixturedef): """ Creates a SubRequest based on "self" and calls the execute method of the given fixturedef object. This will force the FixtureDef object to throw away any previous results and compute a new fixture value, which will be stored into the FixtureDef object itself. :param FixtureDef fixturedef: """ # prepare a subrequest object before calling fixture function # (latter managed by fixturedef) argname = fixturedef.argname funcitem = self._pyfuncitem scope = fixturedef.scope try: param = funcitem.callspec.getparam(argname) except (AttributeError, ValueError): param = NOTSET param_index = 0 has_params = fixturedef.params is not None fixtures_not_supported = getattr(funcitem, "nofuncargs", False) if has_params and fixtures_not_supported: msg = ( "{name} does not support fixtures, maybe unittest.TestCase subclass?\n" "Node id: {nodeid}\n" "Function type: {typename}" ).format( name=funcitem.name, nodeid=funcitem.nodeid, typename=type(funcitem).__name__, ) fail(msg, pytrace=False) if has_params: frame = inspect.stack()[3] frameinfo = inspect.getframeinfo(frame[0]) source_path = frameinfo.filename source_lineno = frameinfo.lineno source_path = py.path.local(source_path) if source_path.relto(funcitem.config.rootdir): source_path = source_path.relto(funcitem.config.rootdir) msg = ( "The requested fixture has no parameter defined for test:\n" " {}\n\n" "Requested fixture '{}' defined in:\n{}" "\n\nRequested here:\n{}:{}".format( funcitem.nodeid, fixturedef.argname, getlocation(fixturedef.func, funcitem.config.rootdir), source_path, source_lineno, ) ) fail(msg, pytrace=False) else: # indices might not be set if old-style metafunc.addcall() was used param_index = funcitem.callspec.indices.get(argname, 0) # if a parametrize invocation set a scope it will override # the static scope defined with the fixture function paramscopenum = funcitem.callspec._arg2scopenum.get(argname) if paramscopenum is not None: scope = scopes[paramscopenum] subrequest = SubRequest(self, scope, param, param_index, fixturedef) # check if a higher-level scoped fixture accesses a lower level one subrequest._check_scope(argname, self.scope, scope) # clear sys.exc_info before invoking the fixture (python bug?) # if it's not explicitly cleared it will leak into the call exc_clear() try: # call the fixture function fixturedef.execute(request=subrequest) finally: # if fixture function failed it might have registered finalizers self.session._setupstate.addfinalizer( functools.partial(fixturedef.finish, request=subrequest), subrequest.node, ) def _check_scope(self, argname, invoking_scope, requested_scope): if argname == "request": return if scopemismatch(invoking_scope, requested_scope): # try to report something helpful lines = self._factorytraceback() fail( "ScopeMismatch: You tried to access the %r scoped " "fixture %r with a %r scoped request object, " "involved factories\n%s" % ((requested_scope, argname, invoking_scope, "\n".join(lines))), pytrace=False, ) def _factorytraceback(self): lines = [] for fixturedef in self._get_fixturestack(): factory = fixturedef.func fs, lineno = getfslineno(factory) p = self._pyfuncitem.session.fspath.bestrelpath(fs) args = _format_args(factory) lines.append("%s:%d: def %s%s" % (p, lineno, factory.__name__, args)) return lines def _getscopeitem(self, scope): if scope == "function": # this might also be a non-function Item despite its attribute name return self._pyfuncitem if scope == "package": node = get_scope_package(self._pyfuncitem, self._fixturedef) else: node = get_scope_node(self._pyfuncitem, scope) if node is None and scope == "class": # fallback to function item itself node = self._pyfuncitem assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format( scope, self._pyfuncitem ) return node def __repr__(self): return "<FixtureRequest for %r>" % (self.node)
class SubRequest(FixtureRequest): """ a sub request for handling getting a fixture from a test function/fixture. """ def __init__(self, request, scope, param, param_index, fixturedef): self._parent_request = request self.fixturename = fixturedef.argname if param is not NOTSET: self.param = param self.param_index = param_index self.scope = scope self._fixturedef = fixturedef self._pyfuncitem = request._pyfuncitem self._fixture_defs = request._fixture_defs self._arg2fixturedefs = request._arg2fixturedefs self._arg2index = request._arg2index self._fixturemanager = request._fixturemanager def __repr__(self): return "<SubRequest %r for %r>" % (self.fixturename, self._pyfuncitem) def addfinalizer(self, finalizer): self._fixturedef.addfinalizer(finalizer) class ScopeMismatchError(Exception): """ A fixture function tries to use a different fixture function which which has a lower scope (e.g. a Session one calls a function one) """ scopes = "session package module class function".split() scopenum_function = scopes.index("function") def scopemismatch(currentscope, newscope): return scopes.index(newscope) > scopes.index(currentscope) def scope2index(scope, descr, where=None): """Look up the index of ``scope`` and raise a descriptive value error if not defined. """ try: return scopes.index(scope) except ValueError: fail( "{} {}got an unexpected scope value '{}'".format( descr, "from {} ".format(where) if where else "", scope ), pytrace=False, ) class FixtureLookupError(LookupError): """ could not return a requested Fixture (missing or invalid). """ def __init__(self, argname, request, msg=None): self.argname = argname self.request = request self.fixturestack = request._get_fixturestack() self.msg = msg def formatrepr(self): tblines = [] addline = tblines.append stack = [self.request._pyfuncitem.obj] stack.extend(map(lambda x: x.func, self.fixturestack)) msg = self.msg if msg is not None: # the last fixture raise an error, let's present # it at the requesting side stack = stack[:-1] for function in stack: fspath, lineno = getfslineno(function) try: lines, _ = inspect.getsourcelines(get_real_func(function)) except (IOError, IndexError, TypeError): error_msg = "file %s, line %s: source code not available" addline(error_msg % (fspath, lineno + 1)) else: addline("file %s, line %s" % (fspath, lineno + 1)) for i, line in enumerate(lines): line = line.rstrip() addline(" " + line) if line.lstrip().startswith("def"): break if msg is None: fm = self.request._fixturemanager available = set() parentid = self.request._pyfuncitem.parent.nodeid for name, fixturedefs in fm._arg2fixturedefs.items(): faclist = list(fm._matchfactories(fixturedefs, parentid)) if faclist: available.add(name) if self.argname in available: msg = " recursive dependency involving fixture '{}' detected".format( self.argname ) else: msg = "fixture '{}' not found".format(self.argname) msg += "\n available fixtures: {}".format(", ".join(sorted(available))) msg += "\n use 'pytest --fixtures [testpath]' for help on them." return FixtureLookupErrorRepr(fspath, lineno, tblines, msg, self.argname) class FixtureLookupErrorRepr(TerminalRepr): def __init__(self, filename, firstlineno, tblines, errorstring, argname): self.tblines = tblines self.errorstring = errorstring self.filename = filename self.firstlineno = firstlineno self.argname = argname def toterminal(self, tw): # tw.line("FixtureLookupError: %s" %(self.argname), red=True) for tbline in self.tblines: tw.line(tbline.rstrip()) lines = self.errorstring.split("\n") if lines: tw.line( "{} {}".format(FormattedExcinfo.fail_marker, lines[0].strip()), red=True, ) for line in lines[1:]: tw.line( "{} {}".format(FormattedExcinfo.flow_marker, line.strip()), red=True, ) tw.line() tw.line("%s:%d" % (self.filename, self.firstlineno + 1)) def fail_fixturefunc(fixturefunc, msg): fs, lineno = getfslineno(fixturefunc) location = "%s:%s" % (fs, lineno + 1) source = _pytest._code.Source(fixturefunc) fail(msg + ":\n\n" + str(source.indent()) + "\n" + location, pytrace=False) def call_fixture_func(fixturefunc, request, kwargs): yieldctx = is_generator(fixturefunc) if yieldctx: it = fixturefunc(**kwargs) res = next(it) finalizer = functools.partial(_teardown_yield_fixture, fixturefunc, it) request.addfinalizer(finalizer) else: res = fixturefunc(**kwargs) return res def _teardown_yield_fixture(fixturefunc, it): """Executes the teardown of a fixture function by advancing the iterator after the yield and ensure the iteration ends (if not it means there is more than one yield in the function)""" try: next(it) except StopIteration: pass else: fail_fixturefunc( fixturefunc, "yield_fixture function has more than one 'yield'" )
[docs]class FixtureDef(object): """ A container for a factory definition. """ def __init__( self, fixturemanager, baseid, argname, func, scope, params, unittest=False, ids=None, ): self._fixturemanager = fixturemanager self.baseid = baseid or "" self.has_location = baseid is not None self.func = func self.argname = argname self.scope = scope self.scopenum = scope2index( scope or "function", descr="Fixture '{}'".format(func.__name__), where=baseid, ) self.params = params self.argnames = getfuncargnames(func, is_method=unittest) self.unittest = unittest self.ids = ids self._finalizers = [] def addfinalizer(self, finalizer): self._finalizers.append(finalizer) def finish(self, request): exceptions = [] try: while self._finalizers: try: func = self._finalizers.pop() func() except: # noqa exceptions.append(sys.exc_info()) if exceptions: e = exceptions[0] del exceptions # ensure we don't keep all frames alive because of the traceback six.reraise(*e) finally: hook = self._fixturemanager.session.gethookproxy(request.node.fspath) hook.pytest_fixture_post_finalizer(fixturedef=self, request=request) # even if finalization fails, we invalidate # the cached fixture value and remove # all finalizers because they may be bound methods which will # keep instances alive if hasattr(self, "cached_result"): del self.cached_result self._finalizers = [] def execute(self, request): # get required arguments and register our own finish() # with their finalization for argname in self.argnames: fixturedef = request._get_active_fixturedef(argname) if argname != "request": fixturedef.addfinalizer(functools.partial(self.finish, request=request)) my_cache_key = request.param_index cached_result = getattr(self, "cached_result", None) if cached_result is not None: result, cache_key, err = cached_result if my_cache_key == cache_key: if err is not None: six.reraise(*err) else: return result # we have a previous but differently parametrized fixture instance # so we need to tear it down before creating a new one self.finish(request) assert not hasattr(self, "cached_result") hook = self._fixturemanager.session.gethookproxy(request.node.fspath) return hook.pytest_fixture_setup(fixturedef=self, request=request) def __repr__(self): return "<FixtureDef argname=%r scope=%r baseid=%r>" % ( self.argname, self.scope, self.baseid, )
def resolve_fixture_function(fixturedef, request): """Gets the actual callable that can be called to obtain the fixture value, dealing with unittest-specific instances and bound methods. """ fixturefunc = fixturedef.func if fixturedef.unittest: if request.instance is not None: # bind the unbound method to the TestCase instance fixturefunc = fixturedef.func.__get__(request.instance) else: # the fixture function needs to be bound to the actual # request.instance so that code working with "fixturedef" behaves # as expected. if request.instance is not None: fixturefunc = getimfunc(fixturedef.func) if fixturefunc != fixturedef.func: fixturefunc = fixturefunc.__get__(request.instance) return fixturefunc def pytest_fixture_setup(fixturedef, request): """ Execution of fixture setup. """ kwargs = {} for argname in fixturedef.argnames: fixdef = request._get_active_fixturedef(argname) result, arg_cache_key, exc = fixdef.cached_result request._check_scope(argname, request.scope, fixdef.scope) kwargs[argname] = result fixturefunc = resolve_fixture_function(fixturedef, request) my_cache_key = request.param_index try: result = call_fixture_func(fixturefunc, request, kwargs) except TEST_OUTCOME: fixturedef.cached_result = (None, my_cache_key, sys.exc_info()) raise fixturedef.cached_result = (result, my_cache_key, None) return result def _ensure_immutable_ids(ids): if ids is None: return if callable(ids): return ids return tuple(ids) def wrap_function_to_warning_if_called_directly(function, fixture_marker): """Wrap the given fixture function so we can issue warnings about it being called directly, instead of used as an argument in a test function. """ is_yield_function = is_generator(function) warning = FIXTURE_FUNCTION_CALL.format( name=fixture_marker.name or function.__name__ ) if is_yield_function: @functools.wraps(function) def result(*args, **kwargs): __tracebackhide__ = True warnings.warn(warning, stacklevel=3) for x in function(*args, **kwargs): yield x else: @functools.wraps(function) def result(*args, **kwargs): __tracebackhide__ = True warnings.warn(warning, stacklevel=3) return function(*args, **kwargs) if six.PY2: result.__wrapped__ = function # keep reference to the original function in our own custom attribute so we don't unwrap # further than this point and lose useful wrappings like @mock.patch (#3774) result.__pytest_wrapped__ = _PytestWrapper(function) return result @attr.s(frozen=True) class FixtureFunctionMarker(object): scope = attr.ib() params = attr.ib(converter=attr.converters.optional(tuple)) autouse = attr.ib(default=False) ids = attr.ib(default=None, converter=_ensure_immutable_ids) name = attr.ib(default=None) def __call__(self, function): if isclass(function): raise ValueError("class fixtures not supported (maybe in the future)") if getattr(function, "_pytestfixturefunction", False): raise ValueError( "fixture is being applied more than once to the same function" ) function = wrap_function_to_warning_if_called_directly(function, self) name = self.name or function.__name__ if name == "request": warnings.warn(FIXTURE_NAMED_REQUEST) function._pytestfixturefunction = self return function
[docs]def fixture(scope="function", params=None, autouse=False, ids=None, name=None): """Decorator to mark a fixture factory function. This decorator can be used, with or without parameters, to define a fixture function. The name of the fixture function can later be referenced to cause its invocation ahead of running tests: test modules or classes can use the ``pytest.mark.usefixtures(fixturename)`` marker. Test functions can directly use fixture names as input arguments in which case the fixture instance returned from the fixture function will be injected. Fixtures can provide their values to test functions using ``return`` or ``yield`` statements. When using ``yield`` the code block after the ``yield`` statement is executed as teardown code regardless of the test outcome, and must yield exactly once. :arg scope: the scope for which this fixture is shared, one of ``"function"`` (default), ``"class"``, ``"module"``, ``"package"`` or ``"session"``. ``"package"`` is considered **experimental** at this time. :arg params: an optional list of parameters which will cause multiple invocations of the fixture function and all of the tests using it. :arg autouse: if True, the fixture func is activated for all tests that can see it. If False (the default) then an explicit reference is needed to activate the fixture. :arg ids: list of string ids each corresponding to the params so that they are part of the test id. If no ids are provided they will be generated automatically from the params. :arg name: the name of the fixture. This defaults to the name of the decorated function. If a fixture is used in the same module in which it is defined, the function name of the fixture will be shadowed by the function arg that requests the fixture; one way to resolve this is to name the decorated function ``fixture_<fixturename>`` and then use ``@pytest.fixture(name='<fixturename>')``. """ if callable(scope) and params is None and autouse is False: # direct decoration return FixtureFunctionMarker("function", params, autouse, name=name)(scope) if params is not None and not isinstance(params, (list, tuple)): params = list(params) return FixtureFunctionMarker(scope, params, autouse, ids=ids, name=name)
def yield_fixture(scope="function", params=None, autouse=False, ids=None, name=None): """ (return a) decorator to mark a yield-fixture factory function. .. deprecated:: 3.0 Use :py:func:`pytest.fixture` directly instead. """ return fixture(scope=scope, params=params, autouse=autouse, ids=ids, name=name) defaultfuncargprefixmarker = fixture()
[docs]@fixture(scope="session") def pytestconfig(request): """Session-scoped fixture that returns the :class:`_pytest.config.Config` object. Example:: def test_foo(pytestconfig): if pytestconfig.getoption("verbose"): ... """ return request.config
class FixtureManager(object): """ pytest fixtures definitions and information is stored and managed from this class. During collection fm.parsefactories() is called multiple times to parse fixture function definitions into FixtureDef objects and internal data structures. During collection of test functions, metafunc-mechanics instantiate a FuncFixtureInfo object which is cached per node/func-name. This FuncFixtureInfo object is later retrieved by Function nodes which themselves offer a fixturenames attribute. The FuncFixtureInfo object holds information about fixtures and FixtureDefs relevant for a particular function. An initial list of fixtures is assembled like this: - ini-defined usefixtures - autouse-marked fixtures along the collection chain up from the function - usefixtures markers at module/class/function level - test function funcargs Subsequently the funcfixtureinfo.fixturenames attribute is computed as the closure of the fixtures needed to setup the initial fixtures, i. e. fixtures needed by fixture functions themselves are appended to the fixturenames list. Upon the test-setup phases all fixturenames are instantiated, retrieved by a lookup of their FuncFixtureInfo. """ _argprefix = "pytest_funcarg__" FixtureLookupError = FixtureLookupError FixtureLookupErrorRepr = FixtureLookupErrorRepr def __init__(self, session): self.session = session self.config = session.config self._arg2fixturedefs = {} self._holderobjseen = set() self._arg2finish = {} self._nodeid_and_autousenames = [("", self.config.getini("usefixtures"))] session.config.pluginmanager.register(self, "funcmanage") def getfixtureinfo(self, node, func, cls, funcargs=True): if funcargs and not getattr(node, "nofuncargs", False): argnames = getfuncargnames(func, cls=cls) else: argnames = () usefixtures = flatten( mark.args for mark in node.iter_markers(name="usefixtures") ) initialnames = tuple(usefixtures) + argnames fm = node.session._fixturemanager initialnames, names_closure, arg2fixturedefs = fm.getfixtureclosure( initialnames, node ) return FuncFixtureInfo(argnames, initialnames, names_closure, arg2fixturedefs) def pytest_plugin_registered(self, plugin): nodeid = None try: p = py.path.local(plugin.__file__).realpath() except AttributeError: pass else: # construct the base nodeid which is later used to check # what fixtures are visible for particular tests (as denoted # by their test id) if p.basename.startswith("conftest.py"): nodeid = p.dirpath().relto(self.config.rootdir) if p.sep != nodes.SEP: nodeid = nodeid.replace(p.sep, nodes.SEP) self.parsefactories(plugin, nodeid) def _getautousenames(self, nodeid): """ return a tuple of fixture names to be used. """ autousenames = [] for baseid, basenames in self._nodeid_and_autousenames: if nodeid.startswith(baseid): if baseid: i = len(baseid) nextchar = nodeid[i : i + 1] if nextchar and nextchar not in ":/": continue autousenames.extend(basenames) return autousenames def getfixtureclosure(self, fixturenames, parentnode): # collect the closure of all fixtures , starting with the given # fixturenames as the initial set. As we have to visit all # factory definitions anyway, we also return an arg2fixturedefs # mapping so that the caller can reuse it and does not have # to re-discover fixturedefs again for each fixturename # (discovering matching fixtures for a given name/node is expensive) parentid = parentnode.nodeid fixturenames_closure = self._getautousenames(parentid) def merge(otherlist): for arg in otherlist: if arg not in fixturenames_closure: fixturenames_closure.append(arg) merge(fixturenames) # at this point, fixturenames_closure contains what we call "initialnames", # which is a set of fixturenames the function immediately requests. We # need to return it as well, so save this. initialnames = tuple(fixturenames_closure) arg2fixturedefs = {} lastlen = -1 while lastlen != len(fixturenames_closure): lastlen = len(fixturenames_closure) for argname in fixturenames_closure: if argname in arg2fixturedefs: continue fixturedefs = self.getfixturedefs(argname, parentid) if fixturedefs: arg2fixturedefs[argname] = fixturedefs merge(fixturedefs[-1].argnames) def sort_by_scope(arg_name): try: fixturedefs = arg2fixturedefs[arg_name] except KeyError: return scopes.index("function") else: return fixturedefs[-1].scopenum fixturenames_closure.sort(key=sort_by_scope) return initialnames, fixturenames_closure, arg2fixturedefs def pytest_generate_tests(self, metafunc): for argname in metafunc.fixturenames: faclist = metafunc._arg2fixturedefs.get(argname) if faclist: fixturedef = faclist[-1] if fixturedef.params is not None: parametrize_func = getattr(metafunc.function, "parametrize", None) if parametrize_func is not None: parametrize_func = parametrize_func.combined func_params = getattr(parametrize_func, "args", [[None]]) func_kwargs = getattr(parametrize_func, "kwargs", {}) # skip directly parametrized arguments if "argnames" in func_kwargs: argnames = parametrize_func.kwargs["argnames"] else: argnames = func_params[0] if not isinstance(argnames, (tuple, list)): argnames = [x.strip() for x in argnames.split(",") if x.strip()] if argname not in func_params and argname not in argnames: metafunc.parametrize( argname, fixturedef.params, indirect=True, scope=fixturedef.scope, ids=fixturedef.ids, ) else: continue # will raise FixtureLookupError at setup time def pytest_collection_modifyitems(self, items): # separate parametrized setups items[:] = reorder_items(items) def parsefactories(self, node_or_obj, nodeid=NOTSET, unittest=False): from _pytest import deprecated if nodeid is not NOTSET: holderobj = node_or_obj else: holderobj = node_or_obj.obj nodeid = node_or_obj.nodeid if holderobj in self._holderobjseen: return from _pytest.nodes import _CompatProperty self._holderobjseen.add(holderobj) autousenames = [] for name in dir(holderobj): # The attribute can be an arbitrary descriptor, so the attribute # access below can raise. safe_getatt() ignores such exceptions. maybe_property = safe_getattr(type(holderobj), name, None) if isinstance(maybe_property, _CompatProperty): # deprecated continue obj = safe_getattr(holderobj, name, None) marker = getfixturemarker(obj) # fixture functions have a pytest_funcarg__ prefix (pre-2.3 style) # or are "@pytest.fixture" marked if marker is None: if not name.startswith(self._argprefix): continue if not callable(obj): continue marker = defaultfuncargprefixmarker filename, lineno = getfslineno(obj) warnings.warn_explicit( deprecated.FUNCARG_PREFIX.format(name=name), category=None, filename=str(filename), lineno=lineno + 1, ) name = name[len(self._argprefix) :] elif not isinstance(marker, FixtureFunctionMarker): # magic globals with __getattr__ might have got us a wrong # fixture attribute continue else: if marker.name: name = marker.name assert not name.startswith(self._argprefix), FIXTURE_MSG.format(name) # during fixture definition we wrap the original fixture function # to issue a warning if called directly, so here we unwrap it in order to not emit the warning # when pytest itself calls the fixture function if six.PY2 and unittest: # hack on Python 2 because of the unbound methods obj = get_real_func(obj) else: obj = get_real_method(obj, holderobj) fixture_def = FixtureDef( self, nodeid, name, obj, marker.scope, marker.params, unittest=unittest, ids=marker.ids, ) faclist = self._arg2fixturedefs.setdefault(name, []) if fixture_def.has_location: faclist.append(fixture_def) else: # fixturedefs with no location are at the front # so this inserts the current fixturedef after the # existing fixturedefs from external plugins but # before the fixturedefs provided in conftests. i = len([f for f in faclist if not f.has_location]) faclist.insert(i, fixture_def) if marker.autouse: autousenames.append(name) if autousenames: self._nodeid_and_autousenames.append((nodeid or "", autousenames)) def getfixturedefs(self, argname, nodeid): """ Gets a list of fixtures which are applicable to the given node id. :param str argname: name of the fixture to search for :param str nodeid: full node id of the requesting test. :return: list[FixtureDef] """ try: fixturedefs = self._arg2fixturedefs[argname] except KeyError: return None return tuple(self._matchfactories(fixturedefs, nodeid)) def _matchfactories(self, fixturedefs, nodeid): for fixturedef in fixturedefs: if nodes.ischildnode(fixturedef.baseid, nodeid): yield fixturedef