Coverage for Applications / PyCharm.app / Contents / plugins / python-ce / helpers / pycharm / _jb_runner_tools.py: 65%

253 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-01 16:37 -0600

1# coding=utf-8 

2""" 

3Tools to implement runners (https://youtrack.jetbrains.com/articles/PY-A-83178089/PyCharm-test-runners-protocol) 

4""" 

5import os 

6import re 

7import sys 

8from collections import OrderedDict 

9 

10import _jb_utils 

11from teamcity import teamcity_presence_env_var, messages, output 

12 

13# Some runners need it to "detect" TC and start protocol 

14if teamcity_presence_env_var not in os.environ: 

15 os.environ[teamcity_presence_env_var] = "LOCAL" 

16 

17# Providing this env variable disables output buffering. 

18# anything sent to stdout/stderr goes to IDE directly, not after test is over like it is done by default. 

19# out and err are not in sync, so output may go to wrong test 

20# This also affects stack traces of tests. 

21JB_DISABLE_BUFFERING = "JB_DISABLE_BUFFERING" in os.environ 

22# More verbose output of testing framework to aid structured display of output in IDE. 

23JB_VERBOSE = "JB_VERBOSE" in os.environ 

24# getcwd resolves symlinks, but PWD is not supported by some shells 

25PROJECT_DIR = os.getenv('PWD', os.getcwd()) 

26 

27 

28class _TreeManagerHolder(object): 

29 def __init__(self): 

30 self.parallel = "JB_USE_PARALLEL_TREE_MANAGER" in os.environ 

31 self.offset = 0 

32 self._manager_imp = None 

33 

34 @property 

35 def manager(self): 

36 if not self._manager_imp: 

37 self._fill_manager() 

38 return self._manager_imp 

39 

40 def _fill_manager(self): 

41 if self.parallel: 

42 from _jb_parallel_tree_manager import ParallelTreeManager 

43 self._manager_imp = ParallelTreeManager(self.offset) 

44 else: 

45 from _jb_serial_tree_manager import SerialTreeManager 

46 self._manager_imp = SerialTreeManager(self.offset) 

47 

48 

49_TREE_MANAGER_HOLDER = _TreeManagerHolder() 

50 

51 

52def set_parallel_mode(): 

53 _TREE_MANAGER_HOLDER.parallel = True 

54 

55 

56def is_parallel_mode(): 

57 return _TREE_MANAGER_HOLDER.parallel 

58 

59_socket = None 

60 

61def _try_open_socket(): 

62 if os.environ.get('JB_TEAMCITY_SOCKET_PATH'): 

63 family = 'AF_UNIX' 

64 address = os.environ.get('JB_TEAMCITY_SOCKET_PATH') 

65 elif os.environ.get('JB_TEAMCITY_SOCKET_PORT'): 

66 family = 'AF_INET' 

67 address = (os.environ.get('JB_TEAMCITY_SOCKET_HOST', 'localhost'), int(os.environ.get('JB_TEAMCITY_SOCKET_PORT'))) 

68 else: 

69 return None 

70 

71 import socket 

72 new_socket = socket.socket(getattr(socket, family), socket.SOCK_STREAM) 

73 new_socket.connect(address) 

74 import atexit 

75 atexit.register(new_socket.close) # good style and avoids warning 

76 return new_socket 

77 

78 

79class _SocketTeamCityMessagesPrinter(output.TeamCityMessagesPrinter): 

80 def __init__(self, context_manager=None): 

81 super(_SocketTeamCityMessagesPrinter, self).__init__(output=_socket, context_manager=context_manager) 

82 

83 def _output(self, message): 

84 self.output.sendall(message) 

85 

86# Monkeypatching TC 

87_old_service_messages = messages.TeamcityServiceMessages 

88 

89PARSE_FUNC = None 

90 

91class TestSuiteInfo: 

92 def __init__(self, full_name, node_id, parent_node_id, is_test, was_stopped): 

93 self.full_name = full_name 

94 self.node_id = node_id 

95 self.parent_node_id = parent_node_id 

96 self.is_test = is_test 

97 self.was_stopped = was_stopped 

98 

99 

100class NewTeamcityServiceMessages(_old_service_messages): 

101 _latest_subtest_result = None 

102 # [full_test_name] = (test_name, node_id, parent_node_id) 

103 _test_suites = OrderedDict() 

104 INSTANCE = None 

105 

106 def __init__(self, *args, **kwargs): 

107 super(NewTeamcityServiceMessages, self).__init__(*args, **kwargs) 

108 NewTeamcityServiceMessages.INSTANCE = self 

109 

110 global _socket # reuse socket to ensure order of messages 

111 if _socket is None: 

112 _socket = _try_open_socket() 

113 if _socket is not None: 

114 self.output_handler = _SocketTeamCityMessagesPrinter(self.output_handler.context_manager) 

115 

116 self.stderr_output_manager = output.TeamCityMessagesPrinter( 

117 output=sys.stderr, 

118 context_manager=self.output_handler.context_manager 

119 ) 

120 

121 def message(self, messageName, **properties): 

122 if messageName in {"enteredTheMatrix", "testCount"}: 

123 if "_jb_do_not_call_enter_matrix" not in os.environ: 

124 _old_service_messages.message(self, messageName, **properties) 

125 return 

126 

127 full_name = properties["name"] 

128 try: 

129 # Report directory so Java site knows which folder to resolve names against 

130 

131 # tests with docstrings are reported in format "test.name (some test here)". 

132 # text should be part of name, but not location. 

133 path = properties.get('path') 

134 if not path or path.endswith('.py'): 

135 possible_location = str(full_name) 

136 loc = possible_location.find("(") 

137 if loc > 0: 

138 possible_location = possible_location[:loc].strip() 

139 properties["locationHint"] = "python<{0}>://{1}".format(PROJECT_DIR, possible_location) 

140 else: 

141 # For data-driven tests, we reference the test by file and line number. 

142 location_hint = "file:/" + path 

143 lineno = properties.get('lineno') 

144 if lineno: 

145 location_hint += ":{}".format(lineno) 

146 properties["locationHint"] = location_hint 

147 except KeyError: 

148 # If message does not have name, then it is not test 

149 # Simply pass it 

150 _old_service_messages.message(self, messageName, **properties) 

151 return 

152 

153 current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(full_name) 

154 if not current and not parent: 

155 return 

156 # Shortcut for name 

157 try: 

158 properties["name"] = str(full_name).split(".")[-1] 

159 except IndexError: 

160 pass 

161 

162 properties["nodeId"] = str(current) 

163 properties["parentNodeId"] = str(parent) 

164 

165 is_test = messageName == "testStarted" 

166 if messageName == "testSuiteStarted" or is_test: 

167 self._test_suites[full_name] = TestSuiteInfo(full_name, current, parent, is_test, False) 

168 elif messageName == "testIgnored" and properties.get("stopped") == "true": 

169 ancestors = self._test_to_list(full_name) 

170 # mark ancestors as explicitly stopped 

171 for i in range(len(ancestors), 0, -1): 

172 ancestor = ".".join(ancestors[:i]) 

173 # keep old entries intact; only change was_stopped 

174 self._test_suites[ancestor].was_stopped = True 

175 _old_service_messages.message(self, messageName, **properties) 

176 

177 def _fix_setup_teardown_name(self, test_name): 

178 """ 

179 

180 Hack to rename setup and teardown methods to much real python signatures 

181 """ 

182 try: 

183 return {"test setup": "setUpClass", "test teardown": "tearDownClass"}[test_name] 

184 except KeyError: 

185 return test_name 

186 

187 # Blocks are used for 2 cases now: 

188 # 1) Unittest subtests (only closed, opened by subTestBlockOpened) 

189 # 2) setup/teardown (does not work, see https://github.com/JetBrains/teamcity-messages/issues/114) 

190 # def blockOpened(self, name, flowId=None): 

191 # self.testStarted(".".join(TREE_MANAGER.current_branch + [self._fix_setup_teardown_name(name)])) 

192 

193 def blockClosed(self, name, flowId=None): 

194 

195 # If _latest_subtest_result is not set or does not exist we closing setup method, not a subtest 

196 try: 

197 if not self._latest_subtest_result: 

198 return 

199 except AttributeError: 

200 return 

201 

202 # closing subtest 

203 test_name = ".".join(_TREE_MANAGER_HOLDER.manager.current_branch) 

204 if self._latest_subtest_result in {"Failure", "Error"}: 

205 self.testFailed(test_name) 

206 if self._latest_subtest_result == "Skip": 

207 self.testIgnored(test_name) 

208 

209 self.testFinished(test_name) 

210 self._latest_subtest_result = None 

211 

212 def subTestBlockOpened(self, name, subTestResult, flowId=None): 

213 self.testStarted(".".join(_TREE_MANAGER_HOLDER.manager.current_branch + [name])) 

214 self._latest_subtest_result = subTestResult 

215 

216 def testStarted(self, testName, captureStandardOutput=None, flowId=None, is_suite=False, metainfo=None, path=None, lineno=None): 

217 test_name_as_list = _jb_utils.test_to_list(testName) 

218 testName = ".".join(test_name_as_list) 

219 

220 def _write_start_message(): 

221 # testName, captureStandardOutput, flowId 

222 args = {"name": testName, "captureStandardOutput": captureStandardOutput, "metainfo": metainfo} 

223 if path is not None and lineno is not None: 

224 args["path"] = path 

225 args["lineno"] = str(lineno) 

226 if is_suite: 

227 self.message("testSuiteStarted", **args) 

228 else: 

229 self.message("testStarted", **args) 

230 

231 commands = _TREE_MANAGER_HOLDER.manager.level_opened(test_name_as_list, _write_start_message) 

232 if commands: 

233 self.do_commands(commands) 

234 self.testStarted(testName, captureStandardOutput, is_suite=is_suite, metainfo=metainfo, path=path, lineno=lineno) 

235 

236 def testFailed(self, testName, message='', details='', flowId=None, comparison_failure=None): 

237 testName = ".".join(_jb_utils.test_to_list(testName)) 

238 if JB_DISABLE_BUFFERING: 

239 self._print_error(details) 

240 details = None 

241 _old_service_messages.testFailed(self, testName, message, details, comparison_failure=comparison_failure) 

242 

243 def _print_error(self, message): 

244 if not message.endswith("\n"): 

245 message += "\n" 

246 if self.stderr_output_manager.output.isatty(): 

247 message = "\033[31m" + message + "\033[0m" 

248 self.stderr_output_manager.send_message(self.encode(message)) 

249 

250 def testFinished(self, testName, testDuration=None, flowId=None, is_suite=False): 

251 test_parts = _jb_utils.test_to_list(testName) 

252 testName = ".".join(test_parts) 

253 

254 def _write_finished_message(): 

255 # testName, captureStandardOutput, flowId 

256 current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(testName) 

257 if not current and not parent: 

258 return 

259 args = {"nodeId": current, "parentNodeId": parent, "name": testName} 

260 

261 # TODO: Doc copy/paste with parent, extract 

262 if testDuration is not None: 

263 duration_ms = testDuration.days * 86400000 + \ 

264 testDuration.seconds * 1000 + \ 

265 int(testDuration.microseconds / 1000) 

266 args["duration"] = str(duration_ms) 

267 

268 if is_suite: 

269 del self._test_suites[testName] 

270 if is_parallel_mode(): 

271 del args["duration"] 

272 self.message("testSuiteFinished", **args) 

273 else: 

274 self.message("testFinished", **args) 

275 del self._test_suites[testName] 

276 

277 commands = _TREE_MANAGER_HOLDER.manager.level_closed( 

278 test_parts, _write_finished_message) 

279 if commands: 

280 self.do_commands(commands) 

281 self.testFinished(testName, testDuration) 

282 

283 def do_commands(self, commands): 

284 """ 

285 

286 Executes commands, returned by level_closed and level_opened 

287 """ 

288 for command, test in commands: 

289 test_name = ".".join(test) 

290 # By executing commands we open or close suites(branches) since tests(leaves) are always reported by runner 

291 if command == "open": 

292 self.testStarted(test_name, is_suite=True) 

293 else: 

294 self.testFinished(test_name, is_suite=True) 

295 

296 def _repose_suite_closed(self, suite): 

297 name = suite.full_name 

298 if name: 

299 _old_service_messages.testSuiteFinished(self, ".".join(name)) 

300 for child in suite.children.values(): 

301 self._repose_suite_closed(child) 

302 

303 def close_suites(self): 

304 # Go in reverse order and close all suites 

305 for suite in reversed(list(self._test_suites.values())): 

306 # suites are explicitly closed, but if test can't been finished, it was either stopped or skipped 

307 message = "testIgnored" if suite.is_test or suite.was_stopped else "testSuiteFinished" 

308 kwargs = { 

309 "name": suite.full_name, 

310 "nodeId": str(suite.node_id), 

311 "parentNodeId": str(suite.parent_node_id), 

312 } 

313 if suite.was_stopped: 

314 kwargs["stopped"] = "true" 

315 _old_service_messages.message(self, message, **kwargs) 

316 self._test_suites = OrderedDict() 

317 

318 

319messages.TeamcityServiceMessages = NewTeamcityServiceMessages 

320 

321 

322# Monkeypatched 

323 

324def jb_patch_targets(targets, fs_glue, old_python_glue, new_python_glue, fs_to_python_glue, python_parts_action=None): 

325 """ 

326 Converts python targets format provided by Java to python-specific format 

327 

328 :param targets: list of separated by [old_python_glue] or dots targets 

329 :param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs" 

330 :param new_python_glue: how to glue python parts (glue between class and function etc) 

331 :param old_python_glue: which symbols need to be replaced by [new_python_glue] 

332 :param fs_to_python_glue: between last fs-part and first python part 

333 :param python_parts_action: additional action for python parts 

334 :return: list of targets with patched separators 

335 """ 

336 if not targets: 

337 return [] 

338 

339 def _patch_target(target): 

340 # /path/foo.py::parts.to.python 

341 match = re.match("^(:?(.+)[.]py::)?([^\\[]+)(.*)$", target) 

342 assert match, "unexpected string: {0}".format(target) 

343 fs_part = match.group(2) 

344 python_part = match.group(3).replace(old_python_glue, new_python_glue) + match.group(4) 

345 if python_parts_action is not None: 

346 python_part = python_parts_action(fs_part, python_part) 

347 if fs_part: 

348 return fs_part.replace("/", fs_glue) + fs_to_python_glue + python_part 

349 else: 

350 return python_part 

351 

352 return map(_patch_target, targets) 

353 

354 

355def jb_patch_separator(targets, fs_glue, python_glue, fs_to_python_glue): 

356 """ 

357 Converts python target if format "/path/foo.py::parts.to.python" provided by Java to 

358 python-specific format 

359 

360 :param targets: list of dot-separated targets 

361 :param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs" 

362 :param python_glue: how to glue python parts (glue between class and function etc) 

363 :param fs_to_python_glue: between last fs-part and first python part 

364 :return: list of targets with patched separators 

365 """ 

366 return jb_patch_targets(targets, fs_glue, '.', python_glue, fs_to_python_glue) 

367 

368 

369def jb_start_tests(): 

370 """ 

371 Parses arguments, starts protocol and fixes syspath and returns tuple of arguments 

372 """ 

373 path, targets, additional_args = parse_arguments() 

374 start_protocol() 

375 return path, targets, additional_args 

376 

377 

378def jb_finish_tests(): 

379 # To be called before process exist to close all suites 

380 instance = NewTeamcityServiceMessages.INSTANCE 

381 

382 # instance may not be set if you run like pytest --version 

383 if instance: 

384 instance.close_suites() 

385 

386 

387def start_protocol(): 

388 properties = {"durationStrategy": "manual"} if is_parallel_mode() else dict() 

389 NewTeamcityServiceMessages().message('enteredTheMatrix', **properties) 

390 

391 

392def parse_arguments(): 

393 """ 

394 Parses arguments, fixes syspath and returns tuple of arguments 

395 

396 :return: (string with path or None, list of targets or None, list of additional arguments) 

397 It may return list with only one element (name itself) if name is the same or split names to several parts 

398 """ 

399 # Handle additional args after -- 

400 additional_args = [] 

401 try: 

402 index = sys.argv.index("--") 

403 additional_args = sys.argv[index + 1:] 

404 del sys.argv[index:] 

405 except ValueError: 

406 pass 

407 utils = _jb_utils.VersionAgnosticUtils() 

408 namespace = utils.get_options( 

409 _jb_utils.OptionDescription('--path', 'Path to file or folder to run'), 

410 _jb_utils.OptionDescription('--offset', 'Root node offset'), 

411 _jb_utils.OptionDescription('--target', 'Python target to run', "append")) 

412 del sys.argv[1:] # Remove all args 

413 

414 # PyCharm helpers dir is first dir in sys.path because helper is launched. 

415 # But sys.path should be same as when launched with test runner directly 

416 try: 

417 if os.path.abspath(sys.path[0]) == os.path.abspath( 

418 os.environ["PYCHARM_HELPERS_DIR"]): 

419 path = sys.path.pop(0) 

420 if path not in sys.path: 

421 sys.path.append(path) 

422 except KeyError: 

423 pass 

424 _TREE_MANAGER_HOLDER.offset = int(namespace.offset if namespace.offset else 0) 

425 return namespace.path, namespace.target, additional_args 

426 

427 

428def jb_doc_args(framework_name, args): 

429 """ 

430 Runner encouraged to report its arguments to user with aid of this function 

431 

432 """ 

433 print("Launching {0} with arguments {1} in {2}\n".format(framework_name, 

434 " ".join(args), 

435 PROJECT_DIR))