Coverage for Applications / PyCharm.app / Contents / plugins / python-ce / helpers / pycharm / teamcity / pytest_plugin.py: 44%
313 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
1# coding=utf-8
2"""
3Aaron Buchanan
4Nov. 2012
6Plug-in for py.test for reporting to TeamCity server
7Report results to TeamCity during test execution for immediate reporting
8 when using TeamCity.
10This should be installed as a py.test plugin and will be automatically enabled by running
11tests under TeamCity build.
12"""
14import os
15import re
16import sys
17import traceback
18from datetime import timedelta
20import pytest
22from teamcity import diff_tools
23from teamcity import is_running_under_teamcity
24from teamcity.common import convert_error_to_string, dump_test_stderr, dump_test_stdout
25from teamcity.messages import TeamcityServiceMessages
26from teamcity.output import TeamCityMessagesPrinter
28diff_tools.patch_unittest_diff()
29_ASSERTION_FAILURE_KEY = '_teamcity_assertion_failure'
32def pytest_addoption(parser):
33 group = parser.getgroup("terminal reporting", "reporting", after="general")
35 group._addoption('--teamcity', action="count",
36 dest="teamcity", default=0, help="force output of JetBrains TeamCity service messages")
37 group._addoption('--no-teamcity', action="count",
38 dest="no_teamcity", default=0, help="disable output of JetBrains TeamCity service messages")
39 parser.addoption('--jb-swapdiff', action="store_true", dest="swapdiff", default=False, help="Swap actual/expected in diff")
41 kwargs = {"help": "skip output of passed tests for JetBrains TeamCity service messages"}
42 kwargs.update({"type": "bool"})
44 parser.addini("skippassedoutput", **kwargs)
45 parser.addini("swapdiff", **kwargs)
48def get_rootdir(config):
49 if hasattr(config, 'rootpath'): # pytest>=6
50 return str(config.rootpath)
51 else:
52 return str(config.rootdir)
55def pytest_configure(config):
56 if config.option.no_teamcity >= 1:
57 enabled = False
58 elif config.option.teamcity >= 1:
59 enabled = True
60 else:
61 enabled = is_running_under_teamcity()
63 if enabled:
64 output_capture_enabled = getattr(config.option, 'capture', 'fd') != 'no'
65 coverage_controller = _get_coverage_controller(config)
66 skip_passed_output = bool(config.getini('skippassedoutput'))
68 config.option.verbose = 2 # don't truncate assert explanations
69 config._teamcityReporting = EchoTeamCityMessages(
70 output_capture_enabled,
71 # never write tc messages into buffered output
72 getattr(config.pluginmanager.getplugin('capturemanager'), 'global_and_fixture_disabled'),
73 coverage_controller,
74 skip_passed_output,
75 bool(config.getini('swapdiff') or config.option.swapdiff),
76 get_rootdir(config),
77 )
78 config.pluginmanager.register(config._teamcityReporting)
81def pytest_unconfigure(config):
82 teamcity_reporting = getattr(config, '_teamcityReporting', None)
83 if teamcity_reporting:
84 del config._teamcityReporting
85 config.pluginmanager.unregister(teamcity_reporting)
88def _get_coverage_controller(config):
89 cov_plugin = config.pluginmanager.getplugin('_cov')
90 if not cov_plugin:
91 return None
93 return cov_plugin.cov_controller
96class EchoTeamCityMessages(object):
97 def __init__(self, output_capture_enabled, context_manager, coverage_controller, skip_passed_output, swap_diff, rootdir):
98 self.coverage_controller = coverage_controller
99 self.output_capture_enabled = output_capture_enabled
100 self.skip_passed_output = skip_passed_output
102 output_handler = TeamCityMessagesPrinter(context_manager=context_manager)
103 self.teamcity = TeamcityServiceMessages(output_handler=output_handler)
104 self.test_start_reported_mark = set()
105 self.rootdir = rootdir
106 self.current_test_item = None
108 self.max_reported_output_size = 1 * 1024 * 1024
109 self.reported_output_chunk_size = 50000
110 self.swap_diff = swap_diff
112 def get_id_from_location(self, location):
113 if type(location) is not tuple or len(location) != 3 or not hasattr(location[2], "startswith"):
114 return None
116 def convert_file_to_id(filename):
117 filename = re.sub(r"\.pyc?$", "", filename)
118 return filename.replace(os.sep, ".").replace("/", ".")
120 def add_prefix_to_filename_id(filename_id, prefix):
121 dot_location = filename_id.rfind('.')
122 if dot_location <= 0 or dot_location >= len(filename_id) - 1:
123 return None
125 return filename_id[:dot_location + 1] + prefix + filename_id[dot_location + 1:]
127 pylint_prefix = '[pylint] '
128 if location[2].startswith(pylint_prefix):
129 id_from_file = convert_file_to_id(location[2][len(pylint_prefix):])
130 return id_from_file + ".Pylint"
132 if location[2] == "PEP8-check":
133 id_from_file = convert_file_to_id(location[0])
134 return id_from_file + ".PEP8"
136 return None
138 def format_test_id(self, nodeid, location):
139 id_from_location = self.get_id_from_location(location)
141 if id_from_location is not None:
142 return id_from_location
144 test_id = nodeid
146 if test_id:
147 if test_id.find("::") < 0:
148 test_id += "::top_level"
149 else:
150 test_id = "top_level"
152 first_bracket = test_id.find("[")
153 if first_bracket > 0:
154 # [] -> (), make it look like nose parameterized tests
155 params = "(" + test_id[first_bracket + 1:]
156 if params.endswith("]"):
157 params = params[:-1] + ")"
158 test_id = test_id[:first_bracket]
159 if test_id.endswith("::"):
160 test_id = test_id[:-2]
161 else:
162 params = ""
164 test_id = test_id.replace("::()::", "::")
165 test_id = re.sub(r"\.pyc?::", r"::", test_id)
166 test_id = test_id.replace(".", "_").replace(os.sep, ".").replace("/", ".").replace('::', '.')
168 if params:
169 params = params.replace(".", "_")
170 test_id += params
172 return test_id
174 def format_location(self, location):
175 if type(location) is tuple and len(location) == 3:
176 return "%s:%s (%s)" % (str(location[0]), str(location[1]), str(location[2]))
177 return str(location)
179 def pytest_sessionfinish(self, session, exitstatus):
180 if exitstatus > pytest.ExitCode.TESTS_FAILED and self.current_test_item:
181 test_id = self.format_test_id(self.current_test_item.nodeid, self.current_test_item.location)
182 self.teamcity.testStopped(
183 test_id,
184 message=exitstatus.name if hasattr(exitstatus, 'name') else str(exitstatus),
185 flowId=test_id
186 )
187 self.report_test_finished(test_id)
189 def pytest_collection_finish(self, session):
190 self.teamcity.testCount(len(session.items))
192 def pytest_runtest_logstart(self, nodeid, location):
193 # test name fetched from location passed as metainfo to PyCharm
194 # it will be used to run specific test
195 # See IDEA-176950, PY-31836
196 path, lineno, test_name = location
197 if test_name:
198 test_name = str(test_name).split(".")[-1]
199 path = os.path.join(self.rootdir, path)
200 lineno = lineno + 1 if lineno is not None else None
201 self.ensure_test_start_reported(self.format_test_id(nodeid, location), test_name, path=path, lineno=lineno)
203 def pytest_runtest_protocol(self, item):
204 self.current_test_item = item
205 return None # continue to next hook
207 def ensure_test_start_reported(self, test_id, metainfo=None, path=None, lineno=None):
208 if test_id not in self.test_start_reported_mark:
209 if self.output_capture_enabled:
210 capture_standard_output = "false"
211 else:
212 capture_standard_output = "true"
213 self.teamcity.testStarted(test_id, flowId=test_id, captureStandardOutput=capture_standard_output,
214 metainfo=metainfo, path=path, lineno=str(lineno) if lineno is not None else None)
215 self.test_start_reported_mark.add(test_id)
217 def report_has_output(self, report):
218 for (secname, data) in report.sections:
219 if report.when in secname and ('stdout' in secname or 'stderr' in secname):
220 return True
221 return False
223 def report_test_output(self, report, test_id):
224 for (secname, data) in report.sections:
225 # https://github.com/JetBrains/teamcity-messages/issues/112
226 # CollectReport didn't have 'when' property, but now it has.
227 # But we still need output on 'collect' state
228 if hasattr(report, "when") and report.when not in secname and report.when != 'collect':
229 continue
230 if not data:
231 continue
233 if 'stdout' in secname:
234 dump_test_stdout(self.teamcity, test_id, test_id, data)
235 elif 'stderr' in secname:
236 dump_test_stderr(self.teamcity, test_id, test_id, data)
238 def report_test_finished(self, test_id, duration=None):
239 self.teamcity.testFinished(test_id, testDuration=duration, flowId=test_id)
240 self.test_start_reported_mark.remove(test_id)
242 def report_test_failure(self, test_id, report, message=None, report_output=True):
243 if hasattr(report, 'duration'):
244 duration = timedelta(seconds=report.duration)
245 else:
246 duration = None
248 if message is None:
249 message = self.format_location(report.location)
251 self.ensure_test_start_reported(test_id)
252 if report_output:
253 self.report_test_output(report, test_id)
255 diff_error = None
256 try:
257 err_message = str(report.longrepr.reprcrash.message)
258 diff_name = diff_tools.EqualsAssertionError.__name__
259 # There is a string like "foo.bar.DiffError: [serialized_data]"
260 if diff_name in err_message:
261 serialized_data = err_message[err_message.index(diff_name) + len(diff_name) + 1:]
262 diff_error = diff_tools.deserialize_error(serialized_data)
264 assertion_tuple = getattr(self.current_test_item, _ASSERTION_FAILURE_KEY, None)
265 if assertion_tuple:
266 op, left, right = assertion_tuple
267 if self.swap_diff:
268 left, right = right, left
269 diff_error = diff_tools.EqualsAssertionError(expected=right, actual=left)
270 else:
271 if m := re.search("AssertionError: Expected <(.*)> to .*? <(.*)>, but .*", err_message):
272 left, right = m.group(1), m.group(2)
273 if self.swap_diff:
274 left, right = right, left
275 diff_error = diff_tools.EqualsAssertionError(expected=right, actual=left)
276 except Exception:
277 pass
279 if not diff_error:
280 from .jb_local_exc_store import get_exception
281 diff_error = get_exception()
283 if diff_error:
284 # Cut everything after postfix: it is internal view of DiffError
285 strace = str(report.longrepr)
286 data_postfix = "_ _ _ _ _"
287 # Error message in pytest must be in "file.py:22 AssertionError" format
288 # This message goes to strace
289 # With custom error we must add real exception class explicitly
290 if data_postfix in strace:
291 strace = strace[0:strace.index(data_postfix)].strip()
292 if strace.endswith(":") and diff_error.real_exception:
293 strace += " " + type(diff_error.real_exception).__name__
294 self.teamcity.testFailed(test_id, diff_error.msg or message, strace,
295 flowId=test_id,
296 comparison_failure=diff_error
297 )
298 else:
299 self.teamcity.testFailed(test_id, message, str(report.longrepr), flowId=test_id)
300 self.report_test_finished(test_id, duration)
302 def report_test_skip(self, test_id, report):
303 if type(report.longrepr) is tuple and len(report.longrepr) == 3:
304 reason = report.longrepr[2]
305 else:
306 reason = str(report.longrepr)
308 if hasattr(report, 'duration'):
309 duration = timedelta(seconds=report.duration)
310 else:
311 duration = None
313 self.ensure_test_start_reported(test_id)
314 self.report_test_output(report, test_id)
315 self.teamcity.testIgnored(test_id, reason, flowId=test_id)
316 self.report_test_finished(test_id, duration)
318 def pytest_assertrepr_compare(self, config, op, left, right):
319 setattr(self.current_test_item, _ASSERTION_FAILURE_KEY, (op, left, right))
321 def pytest_runtest_logreport(self, report):
322 """
323 :type report: _pytest.runner.TestReport
324 """
325 test_id = self.format_test_id(report.nodeid, report.location)
327 duration = timedelta(seconds=report.duration)
329 if report.passed:
330 # Do not report passed setup/teardown if no output
331 if report.when == 'call':
332 self.ensure_test_start_reported(test_id)
333 if not self.skip_passed_output:
334 self.report_test_output(report, test_id)
335 self.report_test_finished(test_id, duration)
336 else:
337 if self.report_has_output(report) and not self.skip_passed_output:
338 block_name = "test " + report.when
339 self.teamcity.blockOpened(block_name, flowId=test_id)
340 self.report_test_output(report, test_id)
341 self.teamcity.blockClosed(block_name, flowId=test_id)
342 elif report.failed:
343 if report.when == 'call':
344 self.report_test_failure(test_id, report)
345 elif report.when == 'setup':
346 if self.report_has_output(report):
347 self.teamcity.blockOpened("test setup", flowId=test_id)
348 self.report_test_output(report, test_id)
349 self.teamcity.blockClosed("test setup", flowId=test_id)
351 self.report_test_failure(test_id, report, message="test setup failed", report_output=False)
352 elif report.when == 'teardown':
353 # Report failed teardown as a separate test as original test is already finished
354 self.report_test_failure(test_id + "_teardown", report)
355 elif report.skipped:
356 self.report_test_skip(test_id, report)
358 def pytest_collectreport(self, report):
359 test_id = self.format_test_id(report.nodeid, report.location) + "_collect"
361 if report.failed:
362 self.report_test_failure(test_id, report)
363 elif report.skipped:
364 self.report_test_skip(test_id, report)
366 def pytest_terminal_summary(self):
367 if self.coverage_controller is not None:
368 try:
369 self._report_coverage()
370 except Exception:
371 tb = traceback.format_exc()
372 self.teamcity.customMessage("Coverage statistics reporting failed", "ERROR", errorDetails=tb)
374 def _report_coverage(self):
375 from coverage.misc import NotPython
376 from coverage.results import Numbers
378 class _Reporter(object):
379 def __init__(self, coverage, config):
380 try:
381 from coverage.report import Reporter
382 except ImportError:
383 # Support for coverage >= 5.0.1.
384 from coverage.report import get_analysis_to_report
386 class Reporter(object):
388 def __init__(self, coverage, config):
389 self.coverage = coverage
390 self.config = config
391 self._file_reporters = []
393 def find_file_reporters(self, morfs):
394 return [fr for fr, _ in get_analysis_to_report(self.coverage, morfs)]
396 self._reporter = Reporter(coverage, config)
398 def find_file_reporters(self, morfs):
399 self.file_reporters = self._reporter.find_file_reporters(morfs)
401 def __getattr__(self, name):
402 return getattr(self._reporter, name)
404 class _CoverageReporter(_Reporter):
405 def __init__(self, coverage, config, messages):
406 super(_CoverageReporter, self).__init__(coverage, config)
408 if hasattr(coverage, 'data'):
409 self.branches = coverage.data.has_arcs()
410 else:
411 self.branches = coverage.get_data().has_arcs()
412 self.messages = messages
414 def report(self, morfs, outfile=None):
415 if hasattr(self, 'find_code_units'):
416 self.find_code_units(morfs)
417 else:
418 self.find_file_reporters(morfs)
420 total = Numbers()
422 if hasattr(self, 'code_units'):
423 units = self.code_units
424 else:
425 units = self.file_reporters
427 for cu in units:
428 try:
429 analysis = self.coverage._analyze(cu.filename)
430 nums = analysis.numbers
431 total += nums
432 except KeyboardInterrupt:
433 raise
434 except Exception:
435 if self.config.ignore_errors:
436 continue
438 err = sys.exc_info()
439 typ, msg = err[:2]
440 if typ is NotPython and not cu.should_be_python():
441 continue
443 test_id = cu.relname
444 details = convert_error_to_string(err)
446 self.messages.testStarted(test_id, flowId=test_id)
447 self.messages.testFailed(test_id, message="Coverage analysis failed", details=details, flowId=test_id)
448 self.messages.testFinished(test_id, flowId=test_id)
450 if total.n_files > 0:
451 covered = total.n_executed
452 total_statements = total.n_statements
454 if self.branches:
455 covered += total.n_executed_branches
456 total_statements += total.n_branches
458 self.messages.buildStatisticLinesCovered(covered)
459 self.messages.buildStatisticTotalLines(total_statements)
460 self.messages.buildStatisticLinesUncovered(total_statements - covered)
461 reporter = _CoverageReporter(
462 self.coverage_controller.cov,
463 self.coverage_controller.cov.config,
464 self.teamcity,
465 )
466 reporter.report(None)