Source code for _pytest.reports

from io import StringIO
from pprint import pprint
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

import attr
import py

from _pytest._code.code import ExceptionChainRepr
from _pytest._code.code import ExceptionInfo
from _pytest._code.code import ReprEntry
from _pytest._code.code import ReprEntryNative
from _pytest._code.code import ReprExceptionInfo
from _pytest._code.code import ReprFileLocation
from _pytest._code.code import ReprFuncArgs
from _pytest._code.code import ReprLocals
from _pytest._code.code import ReprTraceback
from _pytest._code.code import TerminalRepr
from _pytest._io import TerminalWriter
from _pytest.compat import TYPE_CHECKING
from _pytest.nodes import Node
from _pytest.outcomes import skip
from _pytest.pathlib import Path

def getslaveinfoline(node):
        return node._slaveinfocache
    except AttributeError:
        d = node.slaveinfo
        ver = "%s.%s.%s" % d["version_info"][:3]
        node._slaveinfocache = s = "[{}] {} -- Python {} {}".format(
            d["id"], d["sysplatform"], ver, d["executable"]
        return s

class BaseReport:
    when = None  # type: Optional[str]
    location = None  # type: Optional[Tuple[str, Optional[int], str]]
    longrepr = None
    sections = []  # type: List[Tuple[str, str]]
    nodeid = None  # type: str

    def __init__(self, **kw: Any) -> None:

        # Can have arbitrary fields given to __init__().
        def __getattr__(self, key: str) -> Any:
            raise NotImplementedError()

    def toterminal(self, out) -> None:
        if hasattr(self, "node"):
            out.line(getslaveinfoline(self.node))  # type: ignore

        longrepr = self.longrepr
        if longrepr is None:

        if hasattr(longrepr, "toterminal"):
            except UnicodeEncodeError:
                out.line("<unprintable longrepr>")

    def get_sections(self, prefix):
        for name, content in self.sections:
            if name.startswith(prefix):
                yield prefix, content

    def longreprtext(self):
        Read-only property that returns the full string representation
        of ``longrepr``.

        .. versionadded:: 3.0
        tw = TerminalWriter(stringio=True)
        tw.hasmarkup = False
        exc = tw.stringio.getvalue()
        return exc.strip()

    def caplog(self):
        """Return captured log lines, if log capturing is enabled

        .. versionadded:: 3.5
        return "\n".join(
            content for (prefix, content) in self.get_sections("Captured log")

    def capstdout(self):
        """Return captured text from stdout, if capturing is enabled

        .. versionadded:: 3.0
        return "".join(
            content for (prefix, content) in self.get_sections("Captured stdout")

    def capstderr(self):
        """Return captured text from stderr, if capturing is enabled

        .. versionadded:: 3.0
        return "".join(
            content for (prefix, content) in self.get_sections("Captured stderr")

    passed = property(lambda x: x.outcome == "passed")
    failed = property(lambda x: x.outcome == "failed")
    skipped = property(lambda x: x.outcome == "skipped")

    def fspath(self) -> str:
        return self.nodeid.split("::")[0]

    def count_towards_summary(self):

        Returns True if this report should be counted towards the totals shown at the end of the
        test session: "1 passed, 1 failure, etc".

        .. note::

            This function is considered **experimental**, so beware that it is subject to changes
            even in patch releases.
        return True

    def head_line(self):

        Returns the head line shown with longrepr output for this report, more commonly during
        traceback representation during failures::

            ________ ________

        In the example above, the head_line is "".

        .. note::

            This function is considered **experimental**, so beware that it is subject to changes
            even in patch releases.
        if self.location is not None:
            fspath, lineno, domain = self.location
            return domain

    def _get_verbose_word(self, config):
        _category, _short, verbose = config.hook.pytest_report_teststatus(
            report=self, config=config
        return verbose

    def _to_json(self):
        This was originally the serialize_report() function from xdist (ca03269).

        Returns the contents of this report as a dict of builtin entries, suitable for

        Experimental method.
        return _report_to_json(self)

    def _from_json(cls, reportdict):
        This was originally the serialize_report() function from xdist (ca03269).

        Factory method that returns either a TestReport or CollectReport, depending on the calling
        class. It's the callers responsibility to know which class to pass here.

        Experimental method.
        kwargs = _report_kwargs_from_json(reportdict)
        return cls(**kwargs)

def _report_unserialization_failure(type_name, report_class, reportdict):
    url = ""
    stream = StringIO()
    pprint("-" * 100, stream=stream)
    pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream)
    pprint("report_name: %s" % report_class, stream=stream)
    pprint(reportdict, stream=stream)
    pprint("Please report this bug at %s" % url, stream=stream)
    pprint("-" * 100, stream=stream)
    raise RuntimeError(stream.getvalue())

[docs]class TestReport(BaseReport): """ Basic test report object (also used for setup and teardown calls if they fail). """ __test__ = False def __init__( self, nodeid, location: Tuple[str, Optional[int], str], keywords, outcome, longrepr, when, sections=(), duration=0, user_properties=None, **extra ) -> None: #: normalized collection node id self.nodeid = nodeid #: a (filesystempath, lineno, domaininfo) tuple indicating the #: actual location of a test item - it might be different from the #: collected one e.g. if a method is inherited from a different module. self.location = location # type: Tuple[str, Optional[int], str] #: a name -> value dictionary containing all keywords and #: markers associated with a test invocation. self.keywords = keywords #: test outcome, always one of "passed", "failed", "skipped". self.outcome = outcome #: None or a failure representation. self.longrepr = longrepr #: one of 'setup', 'call', 'teardown' to indicate runtest phase. self.when = when #: user properties is a list of tuples (name, value) that holds user #: defined properties of the test self.user_properties = list(user_properties or []) #: list of pairs ``(str, str)`` of extra information which needs to #: marshallable. Used by pytest to add captured text #: from ``stdout`` and ``stderr``, but may be used by other plugins #: to add arbitrary information to reports. self.sections = list(sections) #: time it took to run just the test self.duration = duration self.__dict__.update(extra) def __repr__(self): return "<{} {!r} when={!r} outcome={!r}>".format( self.__class__.__name__, self.nodeid, self.when, self.outcome )
[docs] @classmethod def from_item_and_call(cls, item, call) -> "TestReport": """ Factory method to create and fill a TestReport with standard item and call info. """ when = call.when duration = call.stop - call.start keywords = {x: 1 for x in item.keywords} excinfo = call.excinfo sections = [] if not call.excinfo: outcome = "passed" longrepr = None else: if not isinstance(excinfo, ExceptionInfo): outcome = "failed" longrepr = excinfo # Type ignored -- see comment where skip.Exception is defined. elif excinfo.errisinstance(skip.Exception): # type: ignore outcome = "skipped" r = excinfo._getreprcrash() longrepr = (str(r.path), r.lineno, r.message) else: outcome = "failed" if call.when == "call": longrepr = item.repr_failure(excinfo) else: # exception in setup or teardown longrepr = item._repr_failure_py( excinfo, style=item.config.getoption("tbstyle", "auto") ) for rwhen, key, content in item._report_sections: sections.append(("Captured {} {}".format(key, rwhen), content)) return cls( item.nodeid, item.location, keywords, outcome, longrepr, when, sections, duration, user_properties=item.user_properties, )
class CollectReport(BaseReport): when = "collect" def __init__( self, nodeid: str, outcome, longrepr, result: List[Node], sections=(), **extra ) -> None: self.nodeid = nodeid self.outcome = outcome self.longrepr = longrepr self.result = result or [] self.sections = list(sections) self.__dict__.update(extra) @property def location(self): return (self.fspath, None, self.fspath) def __repr__(self): return "<CollectReport {!r} lenresult={} outcome={!r}>".format( self.nodeid, len(self.result), self.outcome ) class CollectErrorRepr(TerminalRepr): def __init__(self, msg): self.longrepr = msg def toterminal(self, out) -> None: out.line(self.longrepr, red=True) def pytest_report_to_serializable(report): if isinstance(report, (TestReport, CollectReport)): data = report._to_json() data["$report_type"] = report.__class__.__name__ return data def pytest_report_from_serializable(data): if "$report_type" in data: if data["$report_type"] == "TestReport": return TestReport._from_json(data) elif data["$report_type"] == "CollectReport": return CollectReport._from_json(data) assert False, "Unknown report_type unserialize data: {}".format( data["$report_type"] ) def _report_to_json(report): """ This was originally the serialize_report() function from xdist (ca03269). Returns the contents of this report as a dict of builtin entries, suitable for serialization. """ def serialize_repr_entry(entry): entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()} for key, value in entry_data["data"].items(): if hasattr(value, "__dict__"): entry_data["data"][key] = value.__dict__.copy() return entry_data def serialize_repr_traceback(reprtraceback: ReprTraceback): result = attr.asdict(reprtraceback) result["reprentries"] = [ serialize_repr_entry(x) for x in reprtraceback.reprentries ] return result def serialize_repr_crash(reprcrash: Optional[ReprFileLocation]): if reprcrash is not None: return attr.asdict(reprcrash) else: return None def serialize_longrepr(rep): result = { "reprcrash": serialize_repr_crash(rep.longrepr.reprcrash), "reprtraceback": serialize_repr_traceback(rep.longrepr.reprtraceback), "sections": rep.longrepr.sections, } if isinstance(rep.longrepr, ExceptionChainRepr): result["chain"] = [] for repr_traceback, repr_crash, description in rep.longrepr.chain: result["chain"].append( ( serialize_repr_traceback(repr_traceback), serialize_repr_crash(repr_crash), description, ) ) else: result["chain"] = None return result d = report.__dict__.copy() if hasattr(report.longrepr, "toterminal"): if hasattr(report.longrepr, "reprtraceback") and hasattr( report.longrepr, "reprcrash" ): d["longrepr"] = serialize_longrepr(report) else: d["longrepr"] = str(report.longrepr) else: d["longrepr"] = report.longrepr for name in d: if isinstance(d[name], (py.path.local, Path)): d[name] = str(d[name]) elif name == "result": d[name] = None # for now return d def _report_kwargs_from_json(reportdict): """ This was originally the serialize_report() function from xdist (ca03269). Returns **kwargs that can be used to construct a TestReport or CollectReport instance. """ def deserialize_repr_entry(entry_data): data = entry_data["data"] entry_type = entry_data["type"] if entry_type == "ReprEntry": reprfuncargs = None reprfileloc = None reprlocals = None if data["reprfuncargs"]: reprfuncargs = ReprFuncArgs(**data["reprfuncargs"]) if data["reprfileloc"]: reprfileloc = ReprFileLocation(**data["reprfileloc"]) if data["reprlocals"]: reprlocals = ReprLocals(data["reprlocals"]["lines"]) reprentry = ReprEntry( lines=data["lines"], reprfuncargs=reprfuncargs, reprlocals=reprlocals, filelocrepr=reprfileloc, style=data["style"], ) # type: Union[ReprEntry, ReprEntryNative] elif entry_type == "ReprEntryNative": reprentry = ReprEntryNative(data["lines"]) else: _report_unserialization_failure(entry_type, TestReport, reportdict) return reprentry def deserialize_repr_traceback(repr_traceback_dict): repr_traceback_dict["reprentries"] = [ deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"] ] return ReprTraceback(**repr_traceback_dict) def deserialize_repr_crash(repr_crash_dict: Optional[dict]): if repr_crash_dict is not None: return ReprFileLocation(**repr_crash_dict) else: return None if ( reportdict["longrepr"] and "reprcrash" in reportdict["longrepr"] and "reprtraceback" in reportdict["longrepr"] ): reprtraceback = deserialize_repr_traceback( reportdict["longrepr"]["reprtraceback"] ) reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"]) if reportdict["longrepr"]["chain"]: chain = [] for repr_traceback_data, repr_crash_data, description in reportdict[ "longrepr" ]["chain"]: chain.append( ( deserialize_repr_traceback(repr_traceback_data), deserialize_repr_crash(repr_crash_data), description, ) ) exception_info = ExceptionChainRepr( chain ) # type: Union[ExceptionChainRepr,ReprExceptionInfo] else: exception_info = ReprExceptionInfo(reprtraceback, reprcrash) for section in reportdict["longrepr"]["sections"]: exception_info.addsection(*section) reportdict["longrepr"] = exception_info return reportdict