Coverage for Applications / PyCharm.app / Contents / plugins / python-ce / helpers / pycharm / _jb_runner_tools.py: 65%
253 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
1# coding=utf-8
2"""
3Tools to implement runners (https://youtrack.jetbrains.com/articles/PY-A-83178089/PyCharm-test-runners-protocol)
4"""
5import os
6import re
7import sys
8from collections import OrderedDict
10import _jb_utils
11from teamcity import teamcity_presence_env_var, messages, output
13# Some runners need it to "detect" TC and start protocol
14if teamcity_presence_env_var not in os.environ:
15 os.environ[teamcity_presence_env_var] = "LOCAL"
17# Providing this env variable disables output buffering.
18# anything sent to stdout/stderr goes to IDE directly, not after test is over like it is done by default.
19# out and err are not in sync, so output may go to wrong test
20# This also affects stack traces of tests.
21JB_DISABLE_BUFFERING = "JB_DISABLE_BUFFERING" in os.environ
22# More verbose output of testing framework to aid structured display of output in IDE.
23JB_VERBOSE = "JB_VERBOSE" in os.environ
24# getcwd resolves symlinks, but PWD is not supported by some shells
25PROJECT_DIR = os.getenv('PWD', os.getcwd())
28class _TreeManagerHolder(object):
29 def __init__(self):
30 self.parallel = "JB_USE_PARALLEL_TREE_MANAGER" in os.environ
31 self.offset = 0
32 self._manager_imp = None
34 @property
35 def manager(self):
36 if not self._manager_imp:
37 self._fill_manager()
38 return self._manager_imp
40 def _fill_manager(self):
41 if self.parallel:
42 from _jb_parallel_tree_manager import ParallelTreeManager
43 self._manager_imp = ParallelTreeManager(self.offset)
44 else:
45 from _jb_serial_tree_manager import SerialTreeManager
46 self._manager_imp = SerialTreeManager(self.offset)
49_TREE_MANAGER_HOLDER = _TreeManagerHolder()
52def set_parallel_mode():
53 _TREE_MANAGER_HOLDER.parallel = True
56def is_parallel_mode():
57 return _TREE_MANAGER_HOLDER.parallel
59_socket = None
61def _try_open_socket():
62 if os.environ.get('JB_TEAMCITY_SOCKET_PATH'):
63 family = 'AF_UNIX'
64 address = os.environ.get('JB_TEAMCITY_SOCKET_PATH')
65 elif os.environ.get('JB_TEAMCITY_SOCKET_PORT'):
66 family = 'AF_INET'
67 address = (os.environ.get('JB_TEAMCITY_SOCKET_HOST', 'localhost'), int(os.environ.get('JB_TEAMCITY_SOCKET_PORT')))
68 else:
69 return None
71 import socket
72 new_socket = socket.socket(getattr(socket, family), socket.SOCK_STREAM)
73 new_socket.connect(address)
74 import atexit
75 atexit.register(new_socket.close) # good style and avoids warning
76 return new_socket
79class _SocketTeamCityMessagesPrinter(output.TeamCityMessagesPrinter):
80 def __init__(self, context_manager=None):
81 super(_SocketTeamCityMessagesPrinter, self).__init__(output=_socket, context_manager=context_manager)
83 def _output(self, message):
84 self.output.sendall(message)
86# Monkeypatching TC
87_old_service_messages = messages.TeamcityServiceMessages
89PARSE_FUNC = None
91class TestSuiteInfo:
92 def __init__(self, full_name, node_id, parent_node_id, is_test, was_stopped):
93 self.full_name = full_name
94 self.node_id = node_id
95 self.parent_node_id = parent_node_id
96 self.is_test = is_test
97 self.was_stopped = was_stopped
100class NewTeamcityServiceMessages(_old_service_messages):
101 _latest_subtest_result = None
102 # [full_test_name] = (test_name, node_id, parent_node_id)
103 _test_suites = OrderedDict()
104 INSTANCE = None
106 def __init__(self, *args, **kwargs):
107 super(NewTeamcityServiceMessages, self).__init__(*args, **kwargs)
108 NewTeamcityServiceMessages.INSTANCE = self
110 global _socket # reuse socket to ensure order of messages
111 if _socket is None:
112 _socket = _try_open_socket()
113 if _socket is not None:
114 self.output_handler = _SocketTeamCityMessagesPrinter(self.output_handler.context_manager)
116 self.stderr_output_manager = output.TeamCityMessagesPrinter(
117 output=sys.stderr,
118 context_manager=self.output_handler.context_manager
119 )
121 def message(self, messageName, **properties):
122 if messageName in {"enteredTheMatrix", "testCount"}:
123 if "_jb_do_not_call_enter_matrix" not in os.environ:
124 _old_service_messages.message(self, messageName, **properties)
125 return
127 full_name = properties["name"]
128 try:
129 # Report directory so Java site knows which folder to resolve names against
131 # tests with docstrings are reported in format "test.name (some test here)".
132 # text should be part of name, but not location.
133 path = properties.get('path')
134 if not path or path.endswith('.py'):
135 possible_location = str(full_name)
136 loc = possible_location.find("(")
137 if loc > 0:
138 possible_location = possible_location[:loc].strip()
139 properties["locationHint"] = "python<{0}>://{1}".format(PROJECT_DIR, possible_location)
140 else:
141 # For data-driven tests, we reference the test by file and line number.
142 location_hint = "file:/" + path
143 lineno = properties.get('lineno')
144 if lineno:
145 location_hint += ":{}".format(lineno)
146 properties["locationHint"] = location_hint
147 except KeyError:
148 # If message does not have name, then it is not test
149 # Simply pass it
150 _old_service_messages.message(self, messageName, **properties)
151 return
153 current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(full_name)
154 if not current and not parent:
155 return
156 # Shortcut for name
157 try:
158 properties["name"] = str(full_name).split(".")[-1]
159 except IndexError:
160 pass
162 properties["nodeId"] = str(current)
163 properties["parentNodeId"] = str(parent)
165 is_test = messageName == "testStarted"
166 if messageName == "testSuiteStarted" or is_test:
167 self._test_suites[full_name] = TestSuiteInfo(full_name, current, parent, is_test, False)
168 elif messageName == "testIgnored" and properties.get("stopped") == "true":
169 ancestors = self._test_to_list(full_name)
170 # mark ancestors as explicitly stopped
171 for i in range(len(ancestors), 0, -1):
172 ancestor = ".".join(ancestors[:i])
173 # keep old entries intact; only change was_stopped
174 self._test_suites[ancestor].was_stopped = True
175 _old_service_messages.message(self, messageName, **properties)
177 def _fix_setup_teardown_name(self, test_name):
178 """
180 Hack to rename setup and teardown methods to much real python signatures
181 """
182 try:
183 return {"test setup": "setUpClass", "test teardown": "tearDownClass"}[test_name]
184 except KeyError:
185 return test_name
187 # Blocks are used for 2 cases now:
188 # 1) Unittest subtests (only closed, opened by subTestBlockOpened)
189 # 2) setup/teardown (does not work, see https://github.com/JetBrains/teamcity-messages/issues/114)
190 # def blockOpened(self, name, flowId=None):
191 # self.testStarted(".".join(TREE_MANAGER.current_branch + [self._fix_setup_teardown_name(name)]))
193 def blockClosed(self, name, flowId=None):
195 # If _latest_subtest_result is not set or does not exist we closing setup method, not a subtest
196 try:
197 if not self._latest_subtest_result:
198 return
199 except AttributeError:
200 return
202 # closing subtest
203 test_name = ".".join(_TREE_MANAGER_HOLDER.manager.current_branch)
204 if self._latest_subtest_result in {"Failure", "Error"}:
205 self.testFailed(test_name)
206 if self._latest_subtest_result == "Skip":
207 self.testIgnored(test_name)
209 self.testFinished(test_name)
210 self._latest_subtest_result = None
212 def subTestBlockOpened(self, name, subTestResult, flowId=None):
213 self.testStarted(".".join(_TREE_MANAGER_HOLDER.manager.current_branch + [name]))
214 self._latest_subtest_result = subTestResult
216 def testStarted(self, testName, captureStandardOutput=None, flowId=None, is_suite=False, metainfo=None, path=None, lineno=None):
217 test_name_as_list = _jb_utils.test_to_list(testName)
218 testName = ".".join(test_name_as_list)
220 def _write_start_message():
221 # testName, captureStandardOutput, flowId
222 args = {"name": testName, "captureStandardOutput": captureStandardOutput, "metainfo": metainfo}
223 if path is not None and lineno is not None:
224 args["path"] = path
225 args["lineno"] = str(lineno)
226 if is_suite:
227 self.message("testSuiteStarted", **args)
228 else:
229 self.message("testStarted", **args)
231 commands = _TREE_MANAGER_HOLDER.manager.level_opened(test_name_as_list, _write_start_message)
232 if commands:
233 self.do_commands(commands)
234 self.testStarted(testName, captureStandardOutput, is_suite=is_suite, metainfo=metainfo, path=path, lineno=lineno)
236 def testFailed(self, testName, message='', details='', flowId=None, comparison_failure=None):
237 testName = ".".join(_jb_utils.test_to_list(testName))
238 if JB_DISABLE_BUFFERING:
239 self._print_error(details)
240 details = None
241 _old_service_messages.testFailed(self, testName, message, details, comparison_failure=comparison_failure)
243 def _print_error(self, message):
244 if not message.endswith("\n"):
245 message += "\n"
246 if self.stderr_output_manager.output.isatty():
247 message = "\033[31m" + message + "\033[0m"
248 self.stderr_output_manager.send_message(self.encode(message))
250 def testFinished(self, testName, testDuration=None, flowId=None, is_suite=False):
251 test_parts = _jb_utils.test_to_list(testName)
252 testName = ".".join(test_parts)
254 def _write_finished_message():
255 # testName, captureStandardOutput, flowId
256 current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(testName)
257 if not current and not parent:
258 return
259 args = {"nodeId": current, "parentNodeId": parent, "name": testName}
261 # TODO: Doc copy/paste with parent, extract
262 if testDuration is not None:
263 duration_ms = testDuration.days * 86400000 + \
264 testDuration.seconds * 1000 + \
265 int(testDuration.microseconds / 1000)
266 args["duration"] = str(duration_ms)
268 if is_suite:
269 del self._test_suites[testName]
270 if is_parallel_mode():
271 del args["duration"]
272 self.message("testSuiteFinished", **args)
273 else:
274 self.message("testFinished", **args)
275 del self._test_suites[testName]
277 commands = _TREE_MANAGER_HOLDER.manager.level_closed(
278 test_parts, _write_finished_message)
279 if commands:
280 self.do_commands(commands)
281 self.testFinished(testName, testDuration)
283 def do_commands(self, commands):
284 """
286 Executes commands, returned by level_closed and level_opened
287 """
288 for command, test in commands:
289 test_name = ".".join(test)
290 # By executing commands we open or close suites(branches) since tests(leaves) are always reported by runner
291 if command == "open":
292 self.testStarted(test_name, is_suite=True)
293 else:
294 self.testFinished(test_name, is_suite=True)
296 def _repose_suite_closed(self, suite):
297 name = suite.full_name
298 if name:
299 _old_service_messages.testSuiteFinished(self, ".".join(name))
300 for child in suite.children.values():
301 self._repose_suite_closed(child)
303 def close_suites(self):
304 # Go in reverse order and close all suites
305 for suite in reversed(list(self._test_suites.values())):
306 # suites are explicitly closed, but if test can't been finished, it was either stopped or skipped
307 message = "testIgnored" if suite.is_test or suite.was_stopped else "testSuiteFinished"
308 kwargs = {
309 "name": suite.full_name,
310 "nodeId": str(suite.node_id),
311 "parentNodeId": str(suite.parent_node_id),
312 }
313 if suite.was_stopped:
314 kwargs["stopped"] = "true"
315 _old_service_messages.message(self, message, **kwargs)
316 self._test_suites = OrderedDict()
319messages.TeamcityServiceMessages = NewTeamcityServiceMessages
322# Monkeypatched
324def jb_patch_targets(targets, fs_glue, old_python_glue, new_python_glue, fs_to_python_glue, python_parts_action=None):
325 """
326 Converts python targets format provided by Java to python-specific format
328 :param targets: list of separated by [old_python_glue] or dots targets
329 :param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs"
330 :param new_python_glue: how to glue python parts (glue between class and function etc)
331 :param old_python_glue: which symbols need to be replaced by [new_python_glue]
332 :param fs_to_python_glue: between last fs-part and first python part
333 :param python_parts_action: additional action for python parts
334 :return: list of targets with patched separators
335 """
336 if not targets:
337 return []
339 def _patch_target(target):
340 # /path/foo.py::parts.to.python
341 match = re.match("^(:?(.+)[.]py::)?([^\\[]+)(.*)$", target)
342 assert match, "unexpected string: {0}".format(target)
343 fs_part = match.group(2)
344 python_part = match.group(3).replace(old_python_glue, new_python_glue) + match.group(4)
345 if python_parts_action is not None:
346 python_part = python_parts_action(fs_part, python_part)
347 if fs_part:
348 return fs_part.replace("/", fs_glue) + fs_to_python_glue + python_part
349 else:
350 return python_part
352 return map(_patch_target, targets)
355def jb_patch_separator(targets, fs_glue, python_glue, fs_to_python_glue):
356 """
357 Converts python target if format "/path/foo.py::parts.to.python" provided by Java to
358 python-specific format
360 :param targets: list of dot-separated targets
361 :param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs"
362 :param python_glue: how to glue python parts (glue between class and function etc)
363 :param fs_to_python_glue: between last fs-part and first python part
364 :return: list of targets with patched separators
365 """
366 return jb_patch_targets(targets, fs_glue, '.', python_glue, fs_to_python_glue)
369def jb_start_tests():
370 """
371 Parses arguments, starts protocol and fixes syspath and returns tuple of arguments
372 """
373 path, targets, additional_args = parse_arguments()
374 start_protocol()
375 return path, targets, additional_args
378def jb_finish_tests():
379 # To be called before process exist to close all suites
380 instance = NewTeamcityServiceMessages.INSTANCE
382 # instance may not be set if you run like pytest --version
383 if instance:
384 instance.close_suites()
387def start_protocol():
388 properties = {"durationStrategy": "manual"} if is_parallel_mode() else dict()
389 NewTeamcityServiceMessages().message('enteredTheMatrix', **properties)
392def parse_arguments():
393 """
394 Parses arguments, fixes syspath and returns tuple of arguments
396 :return: (string with path or None, list of targets or None, list of additional arguments)
397 It may return list with only one element (name itself) if name is the same or split names to several parts
398 """
399 # Handle additional args after --
400 additional_args = []
401 try:
402 index = sys.argv.index("--")
403 additional_args = sys.argv[index + 1:]
404 del sys.argv[index:]
405 except ValueError:
406 pass
407 utils = _jb_utils.VersionAgnosticUtils()
408 namespace = utils.get_options(
409 _jb_utils.OptionDescription('--path', 'Path to file or folder to run'),
410 _jb_utils.OptionDescription('--offset', 'Root node offset'),
411 _jb_utils.OptionDescription('--target', 'Python target to run', "append"))
412 del sys.argv[1:] # Remove all args
414 # PyCharm helpers dir is first dir in sys.path because helper is launched.
415 # But sys.path should be same as when launched with test runner directly
416 try:
417 if os.path.abspath(sys.path[0]) == os.path.abspath(
418 os.environ["PYCHARM_HELPERS_DIR"]):
419 path = sys.path.pop(0)
420 if path not in sys.path:
421 sys.path.append(path)
422 except KeyError:
423 pass
424 _TREE_MANAGER_HOLDER.offset = int(namespace.offset if namespace.offset else 0)
425 return namespace.path, namespace.target, additional_args
428def jb_doc_args(framework_name, args):
429 """
430 Runner encouraged to report its arguments to user with aid of this function
432 """
433 print("Launching {0} with arguments {1} in {2}\n".format(framework_name,
434 " ".join(args),
435 PROJECT_DIR))