Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/_pytest/runner.py : 41%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1""" basic collect and runtest protocol implementations """
2import bdb
3import os
4import sys
5from typing import Any
6from typing import Callable
7from typing import cast
8from typing import Dict
9from typing import Generic
10from typing import List
11from typing import Optional
12from typing import Tuple
13from typing import TypeVar
14from typing import Union
16import attr
18from .reports import BaseReport
19from .reports import CollectErrorRepr
20from .reports import CollectReport
21from .reports import TestReport
22from _pytest import timing
23from _pytest._code.code import ExceptionChainRepr
24from _pytest._code.code import ExceptionInfo
25from _pytest.compat import TYPE_CHECKING
26from _pytest.config.argparsing import Parser
27from _pytest.nodes import Collector
28from _pytest.nodes import Item
29from _pytest.nodes import Node
30from _pytest.outcomes import Exit
31from _pytest.outcomes import Skipped
32from _pytest.outcomes import TEST_OUTCOME
34if TYPE_CHECKING:
35 from typing import Type
36 from typing_extensions import Literal
38 from _pytest.main import Session
39 from _pytest.terminal import TerminalReporter
41#
42# pytest plugin hooks
45def pytest_addoption(parser: Parser) -> None:
46 group = parser.getgroup("terminal reporting", "reporting", after="general")
47 group.addoption(
48 "--durations",
49 action="store",
50 type=int,
51 default=None,
52 metavar="N",
53 help="show N slowest setup/test durations (N=0 for all).",
54 )
57def pytest_terminal_summary(terminalreporter: "TerminalReporter") -> None:
58 durations = terminalreporter.config.option.durations
59 verbose = terminalreporter.config.getvalue("verbose")
60 if durations is None:
61 return
62 tr = terminalreporter
63 dlist = []
64 for replist in tr.stats.values():
65 for rep in replist:
66 if hasattr(rep, "duration"):
67 dlist.append(rep)
68 if not dlist:
69 return
70 dlist.sort(key=lambda x: x.duration)
71 dlist.reverse()
72 if not durations:
73 tr.write_sep("=", "slowest durations")
74 else:
75 tr.write_sep("=", "slowest %s durations" % durations)
76 dlist = dlist[:durations]
78 for i, rep in enumerate(dlist):
79 if verbose < 2 and rep.duration < 0.005:
80 tr.write_line("")
81 tr.write_line(
82 "(%s durations < 0.005s hidden. Use -vv to show these durations.)"
83 % (len(dlist) - i)
84 )
85 break
86 tr.write_line("{:02.2f}s {:<8} {}".format(rep.duration, rep.when, rep.nodeid))
89def pytest_sessionstart(session: "Session") -> None:
90 session._setupstate = SetupState()
93def pytest_sessionfinish(session: "Session") -> None:
94 session._setupstate.teardown_all()
97def pytest_runtest_protocol(item: Item, nextitem: Optional[Item]) -> bool:
98 ihook = item.ihook
99 ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
100 runtestprotocol(item, nextitem=nextitem)
101 ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
102 return True
105def runtestprotocol(
106 item: Item, log: bool = True, nextitem: Optional[Item] = None
107) -> List[TestReport]:
108 hasrequest = hasattr(item, "_request")
109 if hasrequest and not item._request: # type: ignore[attr-defined]
110 item._initrequest() # type: ignore[attr-defined]
111 rep = call_and_report(item, "setup", log)
112 reports = [rep]
113 if rep.passed:
114 if item.config.getoption("setupshow", False):
115 show_test_item(item)
116 if not item.config.getoption("setuponly", False):
117 reports.append(call_and_report(item, "call", log))
118 reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
119 # after all teardown hooks have been called
120 # want funcargs and request info to go away
121 if hasrequest:
122 item._request = False # type: ignore[attr-defined]
123 item.funcargs = None # type: ignore[attr-defined]
124 return reports
127def show_test_item(item: Item) -> None:
128 """Show test function, parameters and the fixtures of the test item."""
129 tw = item.config.get_terminal_writer()
130 tw.line()
131 tw.write(" " * 8)
132 tw.write(item.nodeid)
133 used_fixtures = sorted(getattr(item, "fixturenames", []))
134 if used_fixtures:
135 tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
136 tw.flush()
139def pytest_runtest_setup(item: Item) -> None:
140 _update_current_test_var(item, "setup")
141 item.session._setupstate.prepare(item)
144def pytest_runtest_call(item: Item) -> None:
145 _update_current_test_var(item, "call")
146 try:
147 del sys.last_type
148 del sys.last_value
149 del sys.last_traceback
150 except AttributeError:
151 pass
152 try:
153 item.runtest()
154 except Exception as e:
155 # Store trace info to allow postmortem debugging
156 sys.last_type = type(e)
157 sys.last_value = e
158 assert e.__traceback__ is not None
159 # Skip *this* frame
160 sys.last_traceback = e.__traceback__.tb_next
161 raise e
164def pytest_runtest_teardown(item: Item, nextitem: Optional[Item]) -> None:
165 _update_current_test_var(item, "teardown")
166 item.session._setupstate.teardown_exact(item, nextitem)
167 _update_current_test_var(item, None)
170def _update_current_test_var(
171 item: Item, when: Optional["Literal['setup', 'call', 'teardown']"]
172) -> None:
173 """
174 Update :envvar:`PYTEST_CURRENT_TEST` to reflect the current item and stage.
176 If ``when`` is None, delete ``PYTEST_CURRENT_TEST`` from the environment.
177 """
178 var_name = "PYTEST_CURRENT_TEST"
179 if when:
180 value = "{} ({})".format(item.nodeid, when)
181 # don't allow null bytes on environment variables (see #2644, #2957)
182 value = value.replace("\x00", "(null)")
183 os.environ[var_name] = value
184 else:
185 os.environ.pop(var_name)
188def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str]]:
189 if report.when in ("setup", "teardown"):
190 if report.failed:
191 # category, shortletter, verbose-word
192 return "error", "E", "ERROR"
193 elif report.skipped:
194 return "skipped", "s", "SKIPPED"
195 else:
196 return "", "", ""
197 return None
200#
201# Implementation
204def call_and_report(
205 item: Item, when: "Literal['setup', 'call', 'teardown']", log: bool = True, **kwds
206) -> TestReport:
207 call = call_runtest_hook(item, when, **kwds)
208 hook = item.ihook
209 report = hook.pytest_runtest_makereport(item=item, call=call) # type: TestReport
210 if log:
211 hook.pytest_runtest_logreport(report=report)
212 if check_interactive_exception(call, report):
213 hook.pytest_exception_interact(node=item, call=call, report=report)
214 return report
217def check_interactive_exception(call: "CallInfo", report: BaseReport) -> bool:
218 """Check whether the call raised an exception that should be reported as
219 interactive."""
220 if call.excinfo is None:
221 # Didn't raise.
222 return False
223 if hasattr(report, "wasxfail"):
224 # Exception was expected.
225 return False
226 if isinstance(call.excinfo.value, (Skipped, bdb.BdbQuit)):
227 # Special control flow exception.
228 return False
229 return True
232def call_runtest_hook(
233 item: Item, when: "Literal['setup', 'call', 'teardown']", **kwds
234) -> "CallInfo[None]":
235 if when == "setup":
236 ihook = item.ihook.pytest_runtest_setup # type: Callable[..., None]
237 elif when == "call":
238 ihook = item.ihook.pytest_runtest_call
239 elif when == "teardown":
240 ihook = item.ihook.pytest_runtest_teardown
241 else:
242 assert False, "Unhandled runtest hook case: {}".format(when)
243 reraise = (Exit,) # type: Tuple[Type[BaseException], ...]
244 if not item.config.getoption("usepdb", False):
245 reraise += (KeyboardInterrupt,)
246 return CallInfo.from_call(
247 lambda: ihook(item=item, **kwds), when=when, reraise=reraise
248 )
251_T = TypeVar("_T")
254@attr.s(repr=False)
255class CallInfo(Generic[_T]):
256 """ Result/Exception info a function invocation.
258 :param T result: The return value of the call, if it didn't raise. Can only be accessed
259 if excinfo is None.
260 :param Optional[ExceptionInfo] excinfo: The captured exception of the call, if it raised.
261 :param float start: The system time when the call started, in seconds since the epoch.
262 :param float stop: The system time when the call ended, in seconds since the epoch.
263 :param float duration: The call duration, in seconds.
264 :param str when: The context of invocation: "setup", "call", "teardown", ...
265 """
267 _result = attr.ib(type="Optional[_T]")
268 excinfo = attr.ib(type=Optional[ExceptionInfo[BaseException]])
269 start = attr.ib(type=float)
270 stop = attr.ib(type=float)
271 duration = attr.ib(type=float)
272 when = attr.ib(type="Literal['collect', 'setup', 'call', 'teardown']")
274 @property
275 def result(self) -> _T:
276 if self.excinfo is not None:
277 raise AttributeError("{!r} has no valid result".format(self))
278 # The cast is safe because an exception wasn't raised, hence
279 # _result has the expected function return type (which may be
280 # None, that's why a cast and not an assert).
281 return cast(_T, self._result)
283 @classmethod
284 def from_call(
285 cls,
286 func: "Callable[[], _T]",
287 when: "Literal['collect', 'setup', 'call', 'teardown']",
288 reraise: "Optional[Union[Type[BaseException], Tuple[Type[BaseException], ...]]]" = None,
289 ) -> "CallInfo[_T]":
290 excinfo = None
291 start = timing.time()
292 precise_start = timing.perf_counter()
293 try:
294 result = func() # type: Optional[_T]
295 except BaseException:
296 excinfo = ExceptionInfo.from_current()
297 if reraise is not None and isinstance(excinfo.value, reraise):
298 raise
299 result = None
300 # use the perf counter
301 precise_stop = timing.perf_counter()
302 duration = precise_stop - precise_start
303 stop = timing.time()
304 return cls(
305 start=start,
306 stop=stop,
307 duration=duration,
308 when=when,
309 result=result,
310 excinfo=excinfo,
311 )
313 def __repr__(self) -> str:
314 if self.excinfo is None:
315 return "<CallInfo when={!r} result: {!r}>".format(self.when, self._result)
316 return "<CallInfo when={!r} excinfo={!r}>".format(self.when, self.excinfo)
319def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> TestReport:
320 return TestReport.from_item_and_call(item, call)
323def pytest_make_collect_report(collector: Collector) -> CollectReport:
324 call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
325 # TODO: Better typing for longrepr.
326 longrepr = None # type: Optional[Any]
327 if not call.excinfo:
328 outcome = "passed" # type: Literal["passed", "skipped", "failed"]
329 else:
330 skip_exceptions = [Skipped]
331 unittest = sys.modules.get("unittest")
332 if unittest is not None:
333 # Type ignored because unittest is loaded dynamically.
334 skip_exceptions.append(unittest.SkipTest) # type: ignore
335 if isinstance(call.excinfo.value, tuple(skip_exceptions)):
336 outcome = "skipped"
337 r_ = collector._repr_failure_py(call.excinfo, "line")
338 assert isinstance(r_, ExceptionChainRepr), repr(r_)
339 r = r_.reprcrash
340 assert r
341 longrepr = (str(r.path), r.lineno, r.message)
342 else:
343 outcome = "failed"
344 errorinfo = collector.repr_failure(call.excinfo)
345 if not hasattr(errorinfo, "toterminal"):
346 errorinfo = CollectErrorRepr(errorinfo)
347 longrepr = errorinfo
348 result = call.result if not call.excinfo else None
349 rep = CollectReport(collector.nodeid, outcome, longrepr, result)
350 rep.call = call # type: ignore # see collect_one_node
351 return rep
354class SetupState:
355 """ shared state for setting up/tearing down test items or collectors. """
357 def __init__(self):
358 self.stack = [] # type: List[Node]
359 self._finalizers = {} # type: Dict[Node, List[Callable[[], object]]]
361 def addfinalizer(self, finalizer: Callable[[], object], colitem) -> None:
362 """ attach a finalizer to the given colitem. """
363 assert colitem and not isinstance(colitem, tuple)
364 assert callable(finalizer)
365 # assert colitem in self.stack # some unit tests don't setup stack :/
366 self._finalizers.setdefault(colitem, []).append(finalizer)
368 def _pop_and_teardown(self):
369 colitem = self.stack.pop()
370 self._teardown_with_finalization(colitem)
372 def _callfinalizers(self, colitem) -> None:
373 finalizers = self._finalizers.pop(colitem, None)
374 exc = None
375 while finalizers:
376 fin = finalizers.pop()
377 try:
378 fin()
379 except TEST_OUTCOME as e:
380 # XXX Only first exception will be seen by user,
381 # ideally all should be reported.
382 if exc is None:
383 exc = e
384 if exc:
385 raise exc
387 def _teardown_with_finalization(self, colitem) -> None:
388 self._callfinalizers(colitem)
389 colitem.teardown()
390 for colitem in self._finalizers:
391 assert colitem in self.stack
393 def teardown_all(self) -> None:
394 while self.stack:
395 self._pop_and_teardown()
396 for key in list(self._finalizers):
397 self._teardown_with_finalization(key)
398 assert not self._finalizers
400 def teardown_exact(self, item, nextitem) -> None:
401 needed_collectors = nextitem and nextitem.listchain() or []
402 self._teardown_towards(needed_collectors)
404 def _teardown_towards(self, needed_collectors) -> None:
405 exc = None
406 while self.stack:
407 if self.stack == needed_collectors[: len(self.stack)]:
408 break
409 try:
410 self._pop_and_teardown()
411 except TEST_OUTCOME as e:
412 # XXX Only first exception will be seen by user,
413 # ideally all should be reported.
414 if exc is None:
415 exc = e
416 if exc:
417 raise exc
419 def prepare(self, colitem) -> None:
420 """Setup objects along the collector chain to the test-method."""
422 # check if the last collection node has raised an error
423 for col in self.stack:
424 if hasattr(col, "_prepare_exc"):
425 exc = col._prepare_exc # type: ignore[attr-defined]
426 raise exc
428 needed_collectors = colitem.listchain()
429 for col in needed_collectors[len(self.stack) :]:
430 self.stack.append(col)
431 try:
432 col.setup()
433 except TEST_OUTCOME as e:
434 col._prepare_exc = e # type: ignore[attr-defined]
435 raise e
438def collect_one_node(collector: Collector) -> CollectReport:
439 ihook = collector.ihook
440 ihook.pytest_collectstart(collector=collector)
441 rep = ihook.pytest_make_collect_report(collector=collector) # type: CollectReport
442 call = rep.__dict__.pop("call", None)
443 if call and check_interactive_exception(call, rep):
444 ihook.pytest_exception_interact(node=collector, call=call, report=rep)
445 return rep