Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/_pytest/junitxml.py : 1%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2 report test results in JUnit-XML format,
3 for use with Jenkins and build integration servers.
6Based on initial code from Ross Lawley.
8Output conforms to https://github.com/jenkinsci/xunit-plugin/blob/master/
9src/main/resources/org/jenkinsci/plugins/xunit/types/model/xsd/junit-10.xsd
10"""
11import functools
12import os
13import platform
14import re
15import sys
16from datetime import datetime
17from typing import Callable
18from typing import Dict
19from typing import List
20from typing import Match
21from typing import Optional
22from typing import Tuple
23from typing import Union
25import py
27import pytest
28from _pytest import deprecated
29from _pytest import nodes
30from _pytest import timing
31from _pytest._code.code import ExceptionRepr
32from _pytest.compat import TYPE_CHECKING
33from _pytest.config import Config
34from _pytest.config import filename_arg
35from _pytest.config.argparsing import Parser
36from _pytest.fixtures import FixtureRequest
37from _pytest.reports import TestReport
38from _pytest.store import StoreKey
39from _pytest.terminal import TerminalReporter
40from _pytest.warnings import _issue_warning_captured
42if TYPE_CHECKING:
43 from typing import Type
46xml_key = StoreKey["LogXML"]()
49class Junit(py.xml.Namespace):
50 pass
53# We need to get the subset of the invalid unicode ranges according to
54# XML 1.0 which are valid in this python build. Hence we calculate
55# this dynamically instead of hardcoding it. The spec range of valid
56# chars is: Char ::= #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
57# | [#x10000-#x10FFFF]
58_legal_chars = (0x09, 0x0A, 0x0D)
59_legal_ranges = ((0x20, 0x7E), (0x80, 0xD7FF), (0xE000, 0xFFFD), (0x10000, 0x10FFFF))
60_legal_xml_re = [
61 "{}-{}".format(chr(low), chr(high))
62 for (low, high) in _legal_ranges
63 if low < sys.maxunicode
64]
65_legal_xml_re = [chr(x) for x in _legal_chars] + _legal_xml_re
66illegal_xml_re = re.compile("[^%s]" % "".join(_legal_xml_re))
67del _legal_chars
68del _legal_ranges
69del _legal_xml_re
71_py_ext_re = re.compile(r"\.py$")
74def bin_xml_escape(arg: object) -> py.xml.raw:
75 def repl(matchobj: Match[str]) -> str:
76 i = ord(matchobj.group())
77 if i <= 0xFF:
78 return "#x%02X" % i
79 else:
80 return "#x%04X" % i
82 return py.xml.raw(illegal_xml_re.sub(repl, py.xml.escape(str(arg))))
85def merge_family(left, right) -> None:
86 result = {}
87 for kl, vl in left.items():
88 for kr, vr in right.items():
89 if not isinstance(vl, list):
90 raise TypeError(type(vl))
91 result[kl] = vl + vr
92 left.update(result)
95families = {}
96families["_base"] = {"testcase": ["classname", "name"]}
97families["_base_legacy"] = {"testcase": ["file", "line", "url"]}
99# xUnit 1.x inherits legacy attributes
100families["xunit1"] = families["_base"].copy()
101merge_family(families["xunit1"], families["_base_legacy"])
103# xUnit 2.x uses strict base attributes
104families["xunit2"] = families["_base"]
107class _NodeReporter:
108 def __init__(self, nodeid: Union[str, TestReport], xml: "LogXML") -> None:
109 self.id = nodeid
110 self.xml = xml
111 self.add_stats = self.xml.add_stats
112 self.family = self.xml.family
113 self.duration = 0
114 self.properties = [] # type: List[Tuple[str, py.xml.raw]]
115 self.nodes = [] # type: List[py.xml.Tag]
116 self.attrs = {} # type: Dict[str, Union[str, py.xml.raw]]
118 def append(self, node: py.xml.Tag) -> None:
119 self.xml.add_stats(type(node).__name__)
120 self.nodes.append(node)
122 def add_property(self, name: str, value: object) -> None:
123 self.properties.append((str(name), bin_xml_escape(value)))
125 def add_attribute(self, name: str, value: object) -> None:
126 self.attrs[str(name)] = bin_xml_escape(value)
128 def make_properties_node(self) -> Union[py.xml.Tag, str]:
129 """Return a Junit node containing custom properties, if any.
130 """
131 if self.properties:
132 return Junit.properties(
133 [
134 Junit.property(name=name, value=value)
135 for name, value in self.properties
136 ]
137 )
138 return ""
140 def record_testreport(self, testreport: TestReport) -> None:
141 names = mangle_test_address(testreport.nodeid)
142 existing_attrs = self.attrs
143 classnames = names[:-1]
144 if self.xml.prefix:
145 classnames.insert(0, self.xml.prefix)
146 attrs = {
147 "classname": ".".join(classnames),
148 "name": bin_xml_escape(names[-1]),
149 "file": testreport.location[0],
150 } # type: Dict[str, Union[str, py.xml.raw]]
151 if testreport.location[1] is not None:
152 attrs["line"] = str(testreport.location[1])
153 if hasattr(testreport, "url"):
154 attrs["url"] = testreport.url
155 self.attrs = attrs
156 self.attrs.update(existing_attrs) # restore any user-defined attributes
158 # Preserve legacy testcase behavior
159 if self.family == "xunit1":
160 return
162 # Filter out attributes not permitted by this test family.
163 # Including custom attributes because they are not valid here.
164 temp_attrs = {}
165 for key in self.attrs.keys():
166 if key in families[self.family]["testcase"]:
167 temp_attrs[key] = self.attrs[key]
168 self.attrs = temp_attrs
170 def to_xml(self) -> py.xml.Tag:
171 testcase = Junit.testcase(time="%.3f" % self.duration, **self.attrs)
172 testcase.append(self.make_properties_node())
173 for node in self.nodes:
174 testcase.append(node)
175 return testcase
177 def _add_simple(self, kind: "Type[py.xml.Tag]", message: str, data=None) -> None:
178 data = bin_xml_escape(data)
179 node = kind(data, message=message)
180 self.append(node)
182 def write_captured_output(self, report: TestReport) -> None:
183 if not self.xml.log_passing_tests and report.passed:
184 return
186 content_out = report.capstdout
187 content_log = report.caplog
188 content_err = report.capstderr
189 if self.xml.logging == "no":
190 return
191 content_all = ""
192 if self.xml.logging in ["log", "all"]:
193 content_all = self._prepare_content(content_log, " Captured Log ")
194 if self.xml.logging in ["system-out", "out-err", "all"]:
195 content_all += self._prepare_content(content_out, " Captured Out ")
196 self._write_content(report, content_all, "system-out")
197 content_all = ""
198 if self.xml.logging in ["system-err", "out-err", "all"]:
199 content_all += self._prepare_content(content_err, " Captured Err ")
200 self._write_content(report, content_all, "system-err")
201 content_all = ""
202 if content_all:
203 self._write_content(report, content_all, "system-out")
205 def _prepare_content(self, content: str, header: str) -> str:
206 return "\n".join([header.center(80, "-"), content, ""])
208 def _write_content(self, report: TestReport, content: str, jheader: str) -> None:
209 tag = getattr(Junit, jheader)
210 self.append(tag(bin_xml_escape(content)))
212 def append_pass(self, report: TestReport) -> None:
213 self.add_stats("passed")
215 def append_failure(self, report: TestReport) -> None:
216 # msg = str(report.longrepr.reprtraceback.extraline)
217 if hasattr(report, "wasxfail"):
218 self._add_simple(Junit.skipped, "xfail-marked test passes unexpectedly")
219 else:
220 assert report.longrepr is not None
221 if getattr(report.longrepr, "reprcrash", None) is not None:
222 message = report.longrepr.reprcrash.message
223 else:
224 message = str(report.longrepr)
225 message = bin_xml_escape(message)
226 fail = Junit.failure(message=message)
227 fail.append(bin_xml_escape(report.longrepr))
228 self.append(fail)
230 def append_collect_error(self, report: TestReport) -> None:
231 # msg = str(report.longrepr.reprtraceback.extraline)
232 assert report.longrepr is not None
233 self.append(
234 Junit.error(bin_xml_escape(report.longrepr), message="collection failure")
235 )
237 def append_collect_skipped(self, report: TestReport) -> None:
238 self._add_simple(Junit.skipped, "collection skipped", report.longrepr)
240 def append_error(self, report: TestReport) -> None:
241 assert report.longrepr is not None
242 if getattr(report.longrepr, "reprcrash", None) is not None:
243 reason = report.longrepr.reprcrash.message
244 else:
245 reason = str(report.longrepr)
247 if report.when == "teardown":
248 msg = 'failed on teardown with "{}"'.format(reason)
249 else:
250 msg = 'failed on setup with "{}"'.format(reason)
251 self._add_simple(Junit.error, msg, report.longrepr)
253 def append_skipped(self, report: TestReport) -> None:
254 if hasattr(report, "wasxfail"):
255 xfailreason = report.wasxfail
256 if xfailreason.startswith("reason: "):
257 xfailreason = xfailreason[8:]
258 self.append(
259 Junit.skipped(
260 "", type="pytest.xfail", message=bin_xml_escape(xfailreason)
261 )
262 )
263 else:
264 assert report.longrepr is not None
265 filename, lineno, skipreason = report.longrepr
266 if skipreason.startswith("Skipped: "):
267 skipreason = skipreason[9:]
268 details = "{}:{}: {}".format(filename, lineno, skipreason)
270 self.append(
271 Junit.skipped(
272 bin_xml_escape(details),
273 type="pytest.skip",
274 message=bin_xml_escape(skipreason),
275 )
276 )
277 self.write_captured_output(report)
279 def finalize(self) -> None:
280 data = self.to_xml().unicode(indent=0)
281 self.__dict__.clear()
282 # Type ignored becuase mypy doesn't like overriding a method.
283 # Also the return value doesn't match...
284 self.to_xml = lambda: py.xml.raw(data) # type: ignore
287def _warn_incompatibility_with_xunit2(
288 request: FixtureRequest, fixture_name: str
289) -> None:
290 """Emits a PytestWarning about the given fixture being incompatible with newer xunit revisions"""
291 from _pytest.warning_types import PytestWarning
293 xml = request.config._store.get(xml_key, None)
294 if xml is not None and xml.family not in ("xunit1", "legacy"):
295 request.node.warn(
296 PytestWarning(
297 "{fixture_name} is incompatible with junit_family '{family}' (use 'legacy' or 'xunit1')".format(
298 fixture_name=fixture_name, family=xml.family
299 )
300 )
301 )
304@pytest.fixture
305def record_property(request: FixtureRequest) -> Callable[[str, object], None]:
306 """Add extra properties to the calling test.
308 User properties become part of the test report and are available to the
309 configured reporters, like JUnit XML.
311 The fixture is callable with ``name, value``. The value is automatically
312 XML-encoded.
314 Example::
316 def test_function(record_property):
317 record_property("example_key", 1)
318 """
319 _warn_incompatibility_with_xunit2(request, "record_property")
321 def append_property(name: str, value: object) -> None:
322 request.node.user_properties.append((name, value))
324 return append_property
327@pytest.fixture
328def record_xml_attribute(request: FixtureRequest) -> Callable[[str, object], None]:
329 """Add extra xml attributes to the tag for the calling test.
331 The fixture is callable with ``name, value``. The value is
332 automatically XML-encoded.
333 """
334 from _pytest.warning_types import PytestExperimentalApiWarning
336 request.node.warn(
337 PytestExperimentalApiWarning("record_xml_attribute is an experimental feature")
338 )
340 _warn_incompatibility_with_xunit2(request, "record_xml_attribute")
342 # Declare noop
343 def add_attr_noop(name: str, value: object) -> None:
344 pass
346 attr_func = add_attr_noop
348 xml = request.config._store.get(xml_key, None)
349 if xml is not None:
350 node_reporter = xml.node_reporter(request.node.nodeid)
351 attr_func = node_reporter.add_attribute
353 return attr_func
356def _check_record_param_type(param: str, v: str) -> None:
357 """Used by record_testsuite_property to check that the given parameter name is of the proper
358 type"""
359 __tracebackhide__ = True
360 if not isinstance(v, str):
361 msg = "{param} parameter needs to be a string, but {g} given"
362 raise TypeError(msg.format(param=param, g=type(v).__name__))
365@pytest.fixture(scope="session")
366def record_testsuite_property(request: FixtureRequest) -> Callable[[str, object], None]:
367 """
368 Records a new ``<property>`` tag as child of the root ``<testsuite>``. This is suitable to
369 writing global information regarding the entire test suite, and is compatible with ``xunit2`` JUnit family.
371 This is a ``session``-scoped fixture which is called with ``(name, value)``. Example:
373 .. code-block:: python
375 def test_foo(record_testsuite_property):
376 record_testsuite_property("ARCH", "PPC")
377 record_testsuite_property("STORAGE_TYPE", "CEPH")
379 ``name`` must be a string, ``value`` will be converted to a string and properly xml-escaped.
380 """
382 __tracebackhide__ = True
384 def record_func(name: str, value: object) -> None:
385 """noop function in case --junitxml was not passed in the command-line"""
386 __tracebackhide__ = True
387 _check_record_param_type("name", name)
389 xml = request.config._store.get(xml_key, None)
390 if xml is not None:
391 record_func = xml.add_global_property # noqa
392 return record_func
395def pytest_addoption(parser: Parser) -> None:
396 group = parser.getgroup("terminal reporting")
397 group.addoption(
398 "--junitxml",
399 "--junit-xml",
400 action="store",
401 dest="xmlpath",
402 metavar="path",
403 type=functools.partial(filename_arg, optname="--junitxml"),
404 default=None,
405 help="create junit-xml style report file at given path.",
406 )
407 group.addoption(
408 "--junitprefix",
409 "--junit-prefix",
410 action="store",
411 metavar="str",
412 default=None,
413 help="prepend prefix to classnames in junit-xml output",
414 )
415 parser.addini(
416 "junit_suite_name", "Test suite name for JUnit report", default="pytest"
417 )
418 parser.addini(
419 "junit_logging",
420 "Write captured log messages to JUnit report: "
421 "one of no|log|system-out|system-err|out-err|all",
422 default="no",
423 )
424 parser.addini(
425 "junit_log_passing_tests",
426 "Capture log information for passing tests to JUnit report: ",
427 type="bool",
428 default=True,
429 )
430 parser.addini(
431 "junit_duration_report",
432 "Duration time to report: one of total|call",
433 default="total",
434 ) # choices=['total', 'call'])
435 parser.addini(
436 "junit_family", "Emit XML for schema: one of legacy|xunit1|xunit2", default=None
437 )
440def pytest_configure(config: Config) -> None:
441 xmlpath = config.option.xmlpath
442 # prevent opening xmllog on worker nodes (xdist)
443 if xmlpath and not hasattr(config, "workerinput"):
444 junit_family = config.getini("junit_family")
445 if not junit_family:
446 _issue_warning_captured(deprecated.JUNIT_XML_DEFAULT_FAMILY, config.hook, 2)
447 junit_family = "xunit1"
448 config._store[xml_key] = LogXML(
449 xmlpath,
450 config.option.junitprefix,
451 config.getini("junit_suite_name"),
452 config.getini("junit_logging"),
453 config.getini("junit_duration_report"),
454 junit_family,
455 config.getini("junit_log_passing_tests"),
456 )
457 config.pluginmanager.register(config._store[xml_key])
460def pytest_unconfigure(config: Config) -> None:
461 xml = config._store.get(xml_key, None)
462 if xml:
463 del config._store[xml_key]
464 config.pluginmanager.unregister(xml)
467def mangle_test_address(address: str) -> List[str]:
468 path, possible_open_bracket, params = address.partition("[")
469 names = path.split("::")
470 try:
471 names.remove("()")
472 except ValueError:
473 pass
474 # convert file path to dotted path
475 names[0] = names[0].replace(nodes.SEP, ".")
476 names[0] = _py_ext_re.sub("", names[0])
477 # put any params back
478 names[-1] += possible_open_bracket + params
479 return names
482class LogXML:
483 def __init__(
484 self,
485 logfile,
486 prefix: Optional[str],
487 suite_name: str = "pytest",
488 logging: str = "no",
489 report_duration: str = "total",
490 family="xunit1",
491 log_passing_tests: bool = True,
492 ) -> None:
493 logfile = os.path.expanduser(os.path.expandvars(logfile))
494 self.logfile = os.path.normpath(os.path.abspath(logfile))
495 self.prefix = prefix
496 self.suite_name = suite_name
497 self.logging = logging
498 self.log_passing_tests = log_passing_tests
499 self.report_duration = report_duration
500 self.family = family
501 self.stats = dict.fromkeys(
502 ["error", "passed", "failure", "skipped"], 0
503 ) # type: Dict[str, int]
504 self.node_reporters = (
505 {}
506 ) # type: Dict[Tuple[Union[str, TestReport], object], _NodeReporter]
507 self.node_reporters_ordered = [] # type: List[_NodeReporter]
508 self.global_properties = [] # type: List[Tuple[str, py.xml.raw]]
510 # List of reports that failed on call but teardown is pending.
511 self.open_reports = [] # type: List[TestReport]
512 self.cnt_double_fail_tests = 0
514 # Replaces convenience family with real family
515 if self.family == "legacy":
516 self.family = "xunit1"
518 def finalize(self, report: TestReport) -> None:
519 nodeid = getattr(report, "nodeid", report)
520 # local hack to handle xdist report order
521 workernode = getattr(report, "node", None)
522 reporter = self.node_reporters.pop((nodeid, workernode))
523 if reporter is not None:
524 reporter.finalize()
526 def node_reporter(self, report: Union[TestReport, str]) -> _NodeReporter:
527 nodeid = getattr(report, "nodeid", report) # type: Union[str, TestReport]
528 # local hack to handle xdist report order
529 workernode = getattr(report, "node", None)
531 key = nodeid, workernode
533 if key in self.node_reporters:
534 # TODO: breaks for --dist=each
535 return self.node_reporters[key]
537 reporter = _NodeReporter(nodeid, self)
539 self.node_reporters[key] = reporter
540 self.node_reporters_ordered.append(reporter)
542 return reporter
544 def add_stats(self, key: str) -> None:
545 if key in self.stats:
546 self.stats[key] += 1
548 def _opentestcase(self, report: TestReport) -> _NodeReporter:
549 reporter = self.node_reporter(report)
550 reporter.record_testreport(report)
551 return reporter
553 def pytest_runtest_logreport(self, report: TestReport) -> None:
554 """handle a setup/call/teardown report, generating the appropriate
555 xml tags as necessary.
557 note: due to plugins like xdist, this hook may be called in interlaced
558 order with reports from other nodes. for example:
560 usual call order:
561 -> setup node1
562 -> call node1
563 -> teardown node1
564 -> setup node2
565 -> call node2
566 -> teardown node2
568 possible call order in xdist:
569 -> setup node1
570 -> call node1
571 -> setup node2
572 -> call node2
573 -> teardown node2
574 -> teardown node1
575 """
576 close_report = None
577 if report.passed:
578 if report.when == "call": # ignore setup/teardown
579 reporter = self._opentestcase(report)
580 reporter.append_pass(report)
581 elif report.failed:
582 if report.when == "teardown":
583 # The following vars are needed when xdist plugin is used
584 report_wid = getattr(report, "worker_id", None)
585 report_ii = getattr(report, "item_index", None)
586 close_report = next(
587 (
588 rep
589 for rep in self.open_reports
590 if (
591 rep.nodeid == report.nodeid
592 and getattr(rep, "item_index", None) == report_ii
593 and getattr(rep, "worker_id", None) == report_wid
594 )
595 ),
596 None,
597 )
598 if close_report:
599 # We need to open new testcase in case we have failure in
600 # call and error in teardown in order to follow junit
601 # schema
602 self.finalize(close_report)
603 self.cnt_double_fail_tests += 1
604 reporter = self._opentestcase(report)
605 if report.when == "call":
606 reporter.append_failure(report)
607 self.open_reports.append(report)
608 if not self.log_passing_tests:
609 reporter.write_captured_output(report)
610 else:
611 reporter.append_error(report)
612 elif report.skipped:
613 reporter = self._opentestcase(report)
614 reporter.append_skipped(report)
615 self.update_testcase_duration(report)
616 if report.when == "teardown":
617 reporter = self._opentestcase(report)
618 reporter.write_captured_output(report)
620 for propname, propvalue in report.user_properties:
621 reporter.add_property(propname, str(propvalue))
623 self.finalize(report)
624 report_wid = getattr(report, "worker_id", None)
625 report_ii = getattr(report, "item_index", None)
626 close_report = next(
627 (
628 rep
629 for rep in self.open_reports
630 if (
631 rep.nodeid == report.nodeid
632 and getattr(rep, "item_index", None) == report_ii
633 and getattr(rep, "worker_id", None) == report_wid
634 )
635 ),
636 None,
637 )
638 if close_report:
639 self.open_reports.remove(close_report)
641 def update_testcase_duration(self, report: TestReport) -> None:
642 """accumulates total duration for nodeid from given report and updates
643 the Junit.testcase with the new total if already created.
644 """
645 if self.report_duration == "total" or report.when == self.report_duration:
646 reporter = self.node_reporter(report)
647 reporter.duration += getattr(report, "duration", 0.0)
649 def pytest_collectreport(self, report: TestReport) -> None:
650 if not report.passed:
651 reporter = self._opentestcase(report)
652 if report.failed:
653 reporter.append_collect_error(report)
654 else:
655 reporter.append_collect_skipped(report)
657 def pytest_internalerror(self, excrepr: ExceptionRepr) -> None:
658 reporter = self.node_reporter("internal")
659 reporter.attrs.update(classname="pytest", name="internal")
660 reporter._add_simple(Junit.error, "internal error", excrepr)
662 def pytest_sessionstart(self) -> None:
663 self.suite_start_time = timing.time()
665 def pytest_sessionfinish(self) -> None:
666 dirname = os.path.dirname(os.path.abspath(self.logfile))
667 if not os.path.isdir(dirname):
668 os.makedirs(dirname)
669 logfile = open(self.logfile, "w", encoding="utf-8")
670 suite_stop_time = timing.time()
671 suite_time_delta = suite_stop_time - self.suite_start_time
673 numtests = (
674 self.stats["passed"]
675 + self.stats["failure"]
676 + self.stats["skipped"]
677 + self.stats["error"]
678 - self.cnt_double_fail_tests
679 )
680 logfile.write('<?xml version="1.0" encoding="utf-8"?>')
682 suite_node = Junit.testsuite(
683 self._get_global_properties_node(),
684 [x.to_xml() for x in self.node_reporters_ordered],
685 name=self.suite_name,
686 errors=str(self.stats["error"]),
687 failures=str(self.stats["failure"]),
688 skipped=str(self.stats["skipped"]),
689 tests=str(numtests),
690 time="%.3f" % suite_time_delta,
691 timestamp=datetime.fromtimestamp(self.suite_start_time).isoformat(),
692 hostname=platform.node(),
693 )
694 logfile.write(Junit.testsuites([suite_node]).unicode(indent=0))
695 logfile.close()
697 def pytest_terminal_summary(self, terminalreporter: TerminalReporter) -> None:
698 terminalreporter.write_sep("-", "generated xml file: {}".format(self.logfile))
700 def add_global_property(self, name: str, value: object) -> None:
701 __tracebackhide__ = True
702 _check_record_param_type("name", name)
703 self.global_properties.append((name, bin_xml_escape(value)))
705 def _get_global_properties_node(self) -> Union[py.xml.Tag, str]:
706 """Return a Junit node containing custom properties, if any.
707 """
708 if self.global_properties:
709 return Junit.properties(
710 [
711 Junit.property(name=name, value=value)
712 for name, value in self.global_properties
713 ]
714 )
715 return ""