Coverage for Applications / PyCharm.app / Contents / plugins / python-ce / helpers / pycharm / teamcity / pytest_plugin.py: 44%

313 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-01 16:37 -0600

1# coding=utf-8 

2""" 

3Aaron Buchanan 

4Nov. 2012 

5 

6Plug-in for py.test for reporting to TeamCity server 

7Report results to TeamCity during test execution for immediate reporting 

8 when using TeamCity. 

9 

10This should be installed as a py.test plugin and will be automatically enabled by running 

11tests under TeamCity build. 

12""" 

13 

14import os 

15import re 

16import sys 

17import traceback 

18from datetime import timedelta 

19 

20import pytest 

21 

22from teamcity import diff_tools 

23from teamcity import is_running_under_teamcity 

24from teamcity.common import convert_error_to_string, dump_test_stderr, dump_test_stdout 

25from teamcity.messages import TeamcityServiceMessages 

26from teamcity.output import TeamCityMessagesPrinter 

27 

28diff_tools.patch_unittest_diff() 

29_ASSERTION_FAILURE_KEY = '_teamcity_assertion_failure' 

30 

31 

32def pytest_addoption(parser): 

33 group = parser.getgroup("terminal reporting", "reporting", after="general") 

34 

35 group._addoption('--teamcity', action="count", 

36 dest="teamcity", default=0, help="force output of JetBrains TeamCity service messages") 

37 group._addoption('--no-teamcity', action="count", 

38 dest="no_teamcity", default=0, help="disable output of JetBrains TeamCity service messages") 

39 parser.addoption('--jb-swapdiff', action="store_true", dest="swapdiff", default=False, help="Swap actual/expected in diff") 

40 

41 kwargs = {"help": "skip output of passed tests for JetBrains TeamCity service messages"} 

42 kwargs.update({"type": "bool"}) 

43 

44 parser.addini("skippassedoutput", **kwargs) 

45 parser.addini("swapdiff", **kwargs) 

46 

47 

48def get_rootdir(config): 

49 if hasattr(config, 'rootpath'): # pytest>=6 

50 return str(config.rootpath) 

51 else: 

52 return str(config.rootdir) 

53 

54 

55def pytest_configure(config): 

56 if config.option.no_teamcity >= 1: 

57 enabled = False 

58 elif config.option.teamcity >= 1: 

59 enabled = True 

60 else: 

61 enabled = is_running_under_teamcity() 

62 

63 if enabled: 

64 output_capture_enabled = getattr(config.option, 'capture', 'fd') != 'no' 

65 coverage_controller = _get_coverage_controller(config) 

66 skip_passed_output = bool(config.getini('skippassedoutput')) 

67 

68 config.option.verbose = 2 # don't truncate assert explanations 

69 config._teamcityReporting = EchoTeamCityMessages( 

70 output_capture_enabled, 

71 # never write tc messages into buffered output 

72 getattr(config.pluginmanager.getplugin('capturemanager'), 'global_and_fixture_disabled'), 

73 coverage_controller, 

74 skip_passed_output, 

75 bool(config.getini('swapdiff') or config.option.swapdiff), 

76 get_rootdir(config), 

77 ) 

78 config.pluginmanager.register(config._teamcityReporting) 

79 

80 

81def pytest_unconfigure(config): 

82 teamcity_reporting = getattr(config, '_teamcityReporting', None) 

83 if teamcity_reporting: 

84 del config._teamcityReporting 

85 config.pluginmanager.unregister(teamcity_reporting) 

86 

87 

88def _get_coverage_controller(config): 

89 cov_plugin = config.pluginmanager.getplugin('_cov') 

90 if not cov_plugin: 

91 return None 

92 

93 return cov_plugin.cov_controller 

94 

95 

96class EchoTeamCityMessages(object): 

97 def __init__(self, output_capture_enabled, context_manager, coverage_controller, skip_passed_output, swap_diff, rootdir): 

98 self.coverage_controller = coverage_controller 

99 self.output_capture_enabled = output_capture_enabled 

100 self.skip_passed_output = skip_passed_output 

101 

102 output_handler = TeamCityMessagesPrinter(context_manager=context_manager) 

103 self.teamcity = TeamcityServiceMessages(output_handler=output_handler) 

104 self.test_start_reported_mark = set() 

105 self.rootdir = rootdir 

106 self.current_test_item = None 

107 

108 self.max_reported_output_size = 1 * 1024 * 1024 

109 self.reported_output_chunk_size = 50000 

110 self.swap_diff = swap_diff 

111 

112 def get_id_from_location(self, location): 

113 if type(location) is not tuple or len(location) != 3 or not hasattr(location[2], "startswith"): 

114 return None 

115 

116 def convert_file_to_id(filename): 

117 filename = re.sub(r"\.pyc?$", "", filename) 

118 return filename.replace(os.sep, ".").replace("/", ".") 

119 

120 def add_prefix_to_filename_id(filename_id, prefix): 

121 dot_location = filename_id.rfind('.') 

122 if dot_location <= 0 or dot_location >= len(filename_id) - 1: 

123 return None 

124 

125 return filename_id[:dot_location + 1] + prefix + filename_id[dot_location + 1:] 

126 

127 pylint_prefix = '[pylint] ' 

128 if location[2].startswith(pylint_prefix): 

129 id_from_file = convert_file_to_id(location[2][len(pylint_prefix):]) 

130 return id_from_file + ".Pylint" 

131 

132 if location[2] == "PEP8-check": 

133 id_from_file = convert_file_to_id(location[0]) 

134 return id_from_file + ".PEP8" 

135 

136 return None 

137 

138 def format_test_id(self, nodeid, location): 

139 id_from_location = self.get_id_from_location(location) 

140 

141 if id_from_location is not None: 

142 return id_from_location 

143 

144 test_id = nodeid 

145 

146 if test_id: 

147 if test_id.find("::") < 0: 

148 test_id += "::top_level" 

149 else: 

150 test_id = "top_level" 

151 

152 first_bracket = test_id.find("[") 

153 if first_bracket > 0: 

154 # [] -> (), make it look like nose parameterized tests 

155 params = "(" + test_id[first_bracket + 1:] 

156 if params.endswith("]"): 

157 params = params[:-1] + ")" 

158 test_id = test_id[:first_bracket] 

159 if test_id.endswith("::"): 

160 test_id = test_id[:-2] 

161 else: 

162 params = "" 

163 

164 test_id = test_id.replace("::()::", "::") 

165 test_id = re.sub(r"\.pyc?::", r"::", test_id) 

166 test_id = test_id.replace(".", "_").replace(os.sep, ".").replace("/", ".").replace('::', '.') 

167 

168 if params: 

169 params = params.replace(".", "_") 

170 test_id += params 

171 

172 return test_id 

173 

174 def format_location(self, location): 

175 if type(location) is tuple and len(location) == 3: 

176 return "%s:%s (%s)" % (str(location[0]), str(location[1]), str(location[2])) 

177 return str(location) 

178 

179 def pytest_sessionfinish(self, session, exitstatus): 

180 if exitstatus > pytest.ExitCode.TESTS_FAILED and self.current_test_item: 

181 test_id = self.format_test_id(self.current_test_item.nodeid, self.current_test_item.location) 

182 self.teamcity.testStopped( 

183 test_id, 

184 message=exitstatus.name if hasattr(exitstatus, 'name') else str(exitstatus), 

185 flowId=test_id 

186 ) 

187 self.report_test_finished(test_id) 

188 

189 def pytest_collection_finish(self, session): 

190 self.teamcity.testCount(len(session.items)) 

191 

192 def pytest_runtest_logstart(self, nodeid, location): 

193 # test name fetched from location passed as metainfo to PyCharm 

194 # it will be used to run specific test 

195 # See IDEA-176950, PY-31836 

196 path, lineno, test_name = location 

197 if test_name: 

198 test_name = str(test_name).split(".")[-1] 

199 path = os.path.join(self.rootdir, path) 

200 lineno = lineno + 1 if lineno is not None else None 

201 self.ensure_test_start_reported(self.format_test_id(nodeid, location), test_name, path=path, lineno=lineno) 

202 

203 def pytest_runtest_protocol(self, item): 

204 self.current_test_item = item 

205 return None # continue to next hook 

206 

207 def ensure_test_start_reported(self, test_id, metainfo=None, path=None, lineno=None): 

208 if test_id not in self.test_start_reported_mark: 

209 if self.output_capture_enabled: 

210 capture_standard_output = "false" 

211 else: 

212 capture_standard_output = "true" 

213 self.teamcity.testStarted(test_id, flowId=test_id, captureStandardOutput=capture_standard_output, 

214 metainfo=metainfo, path=path, lineno=str(lineno) if lineno is not None else None) 

215 self.test_start_reported_mark.add(test_id) 

216 

217 def report_has_output(self, report): 

218 for (secname, data) in report.sections: 

219 if report.when in secname and ('stdout' in secname or 'stderr' in secname): 

220 return True 

221 return False 

222 

223 def report_test_output(self, report, test_id): 

224 for (secname, data) in report.sections: 

225 # https://github.com/JetBrains/teamcity-messages/issues/112 

226 # CollectReport didn't have 'when' property, but now it has. 

227 # But we still need output on 'collect' state 

228 if hasattr(report, "when") and report.when not in secname and report.when != 'collect': 

229 continue 

230 if not data: 

231 continue 

232 

233 if 'stdout' in secname: 

234 dump_test_stdout(self.teamcity, test_id, test_id, data) 

235 elif 'stderr' in secname: 

236 dump_test_stderr(self.teamcity, test_id, test_id, data) 

237 

238 def report_test_finished(self, test_id, duration=None): 

239 self.teamcity.testFinished(test_id, testDuration=duration, flowId=test_id) 

240 self.test_start_reported_mark.remove(test_id) 

241 

242 def report_test_failure(self, test_id, report, message=None, report_output=True): 

243 if hasattr(report, 'duration'): 

244 duration = timedelta(seconds=report.duration) 

245 else: 

246 duration = None 

247 

248 if message is None: 

249 message = self.format_location(report.location) 

250 

251 self.ensure_test_start_reported(test_id) 

252 if report_output: 

253 self.report_test_output(report, test_id) 

254 

255 diff_error = None 

256 try: 

257 err_message = str(report.longrepr.reprcrash.message) 

258 diff_name = diff_tools.EqualsAssertionError.__name__ 

259 # There is a string like "foo.bar.DiffError: [serialized_data]" 

260 if diff_name in err_message: 

261 serialized_data = err_message[err_message.index(diff_name) + len(diff_name) + 1:] 

262 diff_error = diff_tools.deserialize_error(serialized_data) 

263 

264 assertion_tuple = getattr(self.current_test_item, _ASSERTION_FAILURE_KEY, None) 

265 if assertion_tuple: 

266 op, left, right = assertion_tuple 

267 if self.swap_diff: 

268 left, right = right, left 

269 diff_error = diff_tools.EqualsAssertionError(expected=right, actual=left) 

270 else: 

271 if m := re.search("AssertionError: Expected <(.*)> to .*? <(.*)>, but .*", err_message): 

272 left, right = m.group(1), m.group(2) 

273 if self.swap_diff: 

274 left, right = right, left 

275 diff_error = diff_tools.EqualsAssertionError(expected=right, actual=left) 

276 except Exception: 

277 pass 

278 

279 if not diff_error: 

280 from .jb_local_exc_store import get_exception 

281 diff_error = get_exception() 

282 

283 if diff_error: 

284 # Cut everything after postfix: it is internal view of DiffError 

285 strace = str(report.longrepr) 

286 data_postfix = "_ _ _ _ _" 

287 # Error message in pytest must be in "file.py:22 AssertionError" format 

288 # This message goes to strace 

289 # With custom error we must add real exception class explicitly 

290 if data_postfix in strace: 

291 strace = strace[0:strace.index(data_postfix)].strip() 

292 if strace.endswith(":") and diff_error.real_exception: 

293 strace += " " + type(diff_error.real_exception).__name__ 

294 self.teamcity.testFailed(test_id, diff_error.msg or message, strace, 

295 flowId=test_id, 

296 comparison_failure=diff_error 

297 ) 

298 else: 

299 self.teamcity.testFailed(test_id, message, str(report.longrepr), flowId=test_id) 

300 self.report_test_finished(test_id, duration) 

301 

302 def report_test_skip(self, test_id, report): 

303 if type(report.longrepr) is tuple and len(report.longrepr) == 3: 

304 reason = report.longrepr[2] 

305 else: 

306 reason = str(report.longrepr) 

307 

308 if hasattr(report, 'duration'): 

309 duration = timedelta(seconds=report.duration) 

310 else: 

311 duration = None 

312 

313 self.ensure_test_start_reported(test_id) 

314 self.report_test_output(report, test_id) 

315 self.teamcity.testIgnored(test_id, reason, flowId=test_id) 

316 self.report_test_finished(test_id, duration) 

317 

318 def pytest_assertrepr_compare(self, config, op, left, right): 

319 setattr(self.current_test_item, _ASSERTION_FAILURE_KEY, (op, left, right)) 

320 

321 def pytest_runtest_logreport(self, report): 

322 """ 

323 :type report: _pytest.runner.TestReport 

324 """ 

325 test_id = self.format_test_id(report.nodeid, report.location) 

326 

327 duration = timedelta(seconds=report.duration) 

328 

329 if report.passed: 

330 # Do not report passed setup/teardown if no output 

331 if report.when == 'call': 

332 self.ensure_test_start_reported(test_id) 

333 if not self.skip_passed_output: 

334 self.report_test_output(report, test_id) 

335 self.report_test_finished(test_id, duration) 

336 else: 

337 if self.report_has_output(report) and not self.skip_passed_output: 

338 block_name = "test " + report.when 

339 self.teamcity.blockOpened(block_name, flowId=test_id) 

340 self.report_test_output(report, test_id) 

341 self.teamcity.blockClosed(block_name, flowId=test_id) 

342 elif report.failed: 

343 if report.when == 'call': 

344 self.report_test_failure(test_id, report) 

345 elif report.when == 'setup': 

346 if self.report_has_output(report): 

347 self.teamcity.blockOpened("test setup", flowId=test_id) 

348 self.report_test_output(report, test_id) 

349 self.teamcity.blockClosed("test setup", flowId=test_id) 

350 

351 self.report_test_failure(test_id, report, message="test setup failed", report_output=False) 

352 elif report.when == 'teardown': 

353 # Report failed teardown as a separate test as original test is already finished 

354 self.report_test_failure(test_id + "_teardown", report) 

355 elif report.skipped: 

356 self.report_test_skip(test_id, report) 

357 

358 def pytest_collectreport(self, report): 

359 test_id = self.format_test_id(report.nodeid, report.location) + "_collect" 

360 

361 if report.failed: 

362 self.report_test_failure(test_id, report) 

363 elif report.skipped: 

364 self.report_test_skip(test_id, report) 

365 

366 def pytest_terminal_summary(self): 

367 if self.coverage_controller is not None: 

368 try: 

369 self._report_coverage() 

370 except Exception: 

371 tb = traceback.format_exc() 

372 self.teamcity.customMessage("Coverage statistics reporting failed", "ERROR", errorDetails=tb) 

373 

374 def _report_coverage(self): 

375 from coverage.misc import NotPython 

376 from coverage.results import Numbers 

377 

378 class _Reporter(object): 

379 def __init__(self, coverage, config): 

380 try: 

381 from coverage.report import Reporter 

382 except ImportError: 

383 # Support for coverage >= 5.0.1. 

384 from coverage.report import get_analysis_to_report 

385 

386 class Reporter(object): 

387 

388 def __init__(self, coverage, config): 

389 self.coverage = coverage 

390 self.config = config 

391 self._file_reporters = [] 

392 

393 def find_file_reporters(self, morfs): 

394 return [fr for fr, _ in get_analysis_to_report(self.coverage, morfs)] 

395 

396 self._reporter = Reporter(coverage, config) 

397 

398 def find_file_reporters(self, morfs): 

399 self.file_reporters = self._reporter.find_file_reporters(morfs) 

400 

401 def __getattr__(self, name): 

402 return getattr(self._reporter, name) 

403 

404 class _CoverageReporter(_Reporter): 

405 def __init__(self, coverage, config, messages): 

406 super(_CoverageReporter, self).__init__(coverage, config) 

407 

408 if hasattr(coverage, 'data'): 

409 self.branches = coverage.data.has_arcs() 

410 else: 

411 self.branches = coverage.get_data().has_arcs() 

412 self.messages = messages 

413 

414 def report(self, morfs, outfile=None): 

415 if hasattr(self, 'find_code_units'): 

416 self.find_code_units(morfs) 

417 else: 

418 self.find_file_reporters(morfs) 

419 

420 total = Numbers() 

421 

422 if hasattr(self, 'code_units'): 

423 units = self.code_units 

424 else: 

425 units = self.file_reporters 

426 

427 for cu in units: 

428 try: 

429 analysis = self.coverage._analyze(cu.filename) 

430 nums = analysis.numbers 

431 total += nums 

432 except KeyboardInterrupt: 

433 raise 

434 except Exception: 

435 if self.config.ignore_errors: 

436 continue 

437 

438 err = sys.exc_info() 

439 typ, msg = err[:2] 

440 if typ is NotPython and not cu.should_be_python(): 

441 continue 

442 

443 test_id = cu.relname 

444 details = convert_error_to_string(err) 

445 

446 self.messages.testStarted(test_id, flowId=test_id) 

447 self.messages.testFailed(test_id, message="Coverage analysis failed", details=details, flowId=test_id) 

448 self.messages.testFinished(test_id, flowId=test_id) 

449 

450 if total.n_files > 0: 

451 covered = total.n_executed 

452 total_statements = total.n_statements 

453 

454 if self.branches: 

455 covered += total.n_executed_branches 

456 total_statements += total.n_branches 

457 

458 self.messages.buildStatisticLinesCovered(covered) 

459 self.messages.buildStatisticTotalLines(total_statements) 

460 self.messages.buildStatisticLinesUncovered(total_statements - covered) 

461 reporter = _CoverageReporter( 

462 self.coverage_controller.cov, 

463 self.coverage_controller.cov.config, 

464 self.teamcity, 

465 ) 

466 reporter.report(None)