Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/_pytest/cacheprovider.py : 14%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2merged implementation of the cache provider
4the name cache was not chosen to ensure pluggy automatically
5ignores the external pytest-cache
6"""
7import json
8import os
9from typing import Dict
10from typing import Generator
11from typing import Iterable
12from typing import List
13from typing import Optional
14from typing import Set
15from typing import Union
17import attr
18import py
20import pytest
21from .pathlib import Path
22from .pathlib import resolve_from_str
23from .pathlib import rm_rf
24from .reports import CollectReport
25from _pytest import nodes
26from _pytest._io import TerminalWriter
27from _pytest.compat import order_preserving_dict
28from _pytest.config import Config
29from _pytest.config import ExitCode
30from _pytest.config.argparsing import Parser
31from _pytest.fixtures import FixtureRequest
32from _pytest.main import Session
33from _pytest.python import Module
34from _pytest.reports import TestReport
37README_CONTENT = """\
38# pytest cache directory #
40This directory contains data from the pytest's cache plugin,
41which provides the `--lf` and `--ff` options, as well as the `cache` fixture.
43**Do not** commit this to version control.
45See [the docs](https://docs.pytest.org/en/stable/cache.html) for more information.
46"""
48CACHEDIR_TAG_CONTENT = b"""\
49Signature: 8a477f597d28d172789f06886806bc55
50# This file is a cache directory tag created by pytest.
51# For information about cache directory tags, see:
52# http://www.bford.info/cachedir/spec.html
53"""
56@attr.s
57class Cache:
58 _cachedir = attr.ib(type=Path, repr=False)
59 _config = attr.ib(type=Config, repr=False)
61 # sub-directory under cache-dir for directories created by "makedir"
62 _CACHE_PREFIX_DIRS = "d"
64 # sub-directory under cache-dir for values created by "set"
65 _CACHE_PREFIX_VALUES = "v"
67 @classmethod
68 def for_config(cls, config: Config) -> "Cache":
69 cachedir = cls.cache_dir_from_config(config)
70 if config.getoption("cacheclear") and cachedir.is_dir():
71 cls.clear_cache(cachedir)
72 return cls(cachedir, config)
74 @classmethod
75 def clear_cache(cls, cachedir: Path) -> None:
76 """Clears the sub-directories used to hold cached directories and values."""
77 for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES):
78 d = cachedir / prefix
79 if d.is_dir():
80 rm_rf(d)
82 @staticmethod
83 def cache_dir_from_config(config: Config) -> Path:
84 return resolve_from_str(config.getini("cache_dir"), config.rootdir)
86 def warn(self, fmt: str, **args: object) -> None:
87 import warnings
88 from _pytest.warning_types import PytestCacheWarning
90 warnings.warn(
91 PytestCacheWarning(fmt.format(**args) if args else fmt),
92 self._config.hook,
93 stacklevel=3,
94 )
96 def makedir(self, name: str) -> py.path.local:
97 """ return a directory path object with the given name. If the
98 directory does not yet exist, it will be created. You can use it
99 to manage files likes e. g. store/retrieve database
100 dumps across test sessions.
102 :param name: must be a string not containing a ``/`` separator.
103 Make sure the name contains your plugin or application
104 identifiers to prevent clashes with other cache users.
105 """
106 path = Path(name)
107 if len(path.parts) > 1:
108 raise ValueError("name is not allowed to contain path separators")
109 res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path)
110 res.mkdir(exist_ok=True, parents=True)
111 return py.path.local(res)
113 def _getvaluepath(self, key: str) -> Path:
114 return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key))
116 def get(self, key: str, default):
117 """ return cached value for the given key. If no value
118 was yet cached or the value cannot be read, the specified
119 default is returned.
121 :param key: must be a ``/`` separated value. Usually the first
122 name is the name of your plugin or your application.
123 :param default: must be provided in case of a cache-miss or
124 invalid cache values.
126 """
127 path = self._getvaluepath(key)
128 try:
129 with path.open("r") as f:
130 return json.load(f)
131 except (ValueError, OSError):
132 return default
134 def set(self, key: str, value: object) -> None:
135 """ save value for the given key.
137 :param key: must be a ``/`` separated value. Usually the first
138 name is the name of your plugin or your application.
139 :param value: must be of any combination of basic
140 python types, including nested types
141 like e. g. lists of dictionaries.
142 """
143 path = self._getvaluepath(key)
144 try:
145 if path.parent.is_dir():
146 cache_dir_exists_already = True
147 else:
148 cache_dir_exists_already = self._cachedir.exists()
149 path.parent.mkdir(exist_ok=True, parents=True)
150 except OSError:
151 self.warn("could not create cache path {path}", path=path)
152 return
153 if not cache_dir_exists_already:
154 self._ensure_supporting_files()
155 data = json.dumps(value, indent=2, sort_keys=True)
156 try:
157 f = path.open("w")
158 except OSError:
159 self.warn("cache could not write path {path}", path=path)
160 else:
161 with f:
162 f.write(data)
164 def _ensure_supporting_files(self) -> None:
165 """Create supporting files in the cache dir that are not really part of the cache."""
166 readme_path = self._cachedir / "README.md"
167 readme_path.write_text(README_CONTENT)
169 gitignore_path = self._cachedir.joinpath(".gitignore")
170 msg = "# Created by pytest automatically.\n*\n"
171 gitignore_path.write_text(msg, encoding="UTF-8")
173 cachedir_tag_path = self._cachedir.joinpath("CACHEDIR.TAG")
174 cachedir_tag_path.write_bytes(CACHEDIR_TAG_CONTENT)
177class LFPluginCollWrapper:
178 def __init__(self, lfplugin: "LFPlugin") -> None:
179 self.lfplugin = lfplugin
180 self._collected_at_least_one_failure = False
182 @pytest.hookimpl(hookwrapper=True)
183 def pytest_make_collect_report(self, collector: nodes.Collector) -> Generator:
184 if isinstance(collector, Session):
185 out = yield
186 res = out.get_result() # type: CollectReport
188 # Sort any lf-paths to the beginning.
189 lf_paths = self.lfplugin._last_failed_paths
190 res.result = sorted(
191 res.result, key=lambda x: 0 if Path(str(x.fspath)) in lf_paths else 1,
192 )
193 return
195 elif isinstance(collector, Module):
196 if Path(str(collector.fspath)) in self.lfplugin._last_failed_paths:
197 out = yield
198 res = out.get_result()
199 result = res.result
200 lastfailed = self.lfplugin.lastfailed
202 # Only filter with known failures.
203 if not self._collected_at_least_one_failure:
204 if not any(x.nodeid in lastfailed for x in result):
205 return
206 self.lfplugin.config.pluginmanager.register(
207 LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip"
208 )
209 self._collected_at_least_one_failure = True
211 session = collector.session
212 result[:] = [
213 x
214 for x in result
215 if x.nodeid in lastfailed
216 # Include any passed arguments (not trivial to filter).
217 or session.isinitpath(x.fspath)
218 # Keep all sub-collectors.
219 or isinstance(x, nodes.Collector)
220 ]
221 return
222 yield
225class LFPluginCollSkipfiles:
226 def __init__(self, lfplugin: "LFPlugin") -> None:
227 self.lfplugin = lfplugin
229 @pytest.hookimpl
230 def pytest_make_collect_report(
231 self, collector: nodes.Collector
232 ) -> Optional[CollectReport]:
233 if isinstance(collector, Module):
234 if Path(str(collector.fspath)) not in self.lfplugin._last_failed_paths:
235 self.lfplugin._skipped_files += 1
237 return CollectReport(
238 collector.nodeid, "passed", longrepr=None, result=[]
239 )
240 return None
243class LFPlugin:
244 """ Plugin which implements the --lf (run last-failing) option """
246 def __init__(self, config: Config) -> None:
247 self.config = config
248 active_keys = "lf", "failedfirst"
249 self.active = any(config.getoption(key) for key in active_keys)
250 assert config.cache
251 self.lastfailed = config.cache.get(
252 "cache/lastfailed", {}
253 ) # type: Dict[str, bool]
254 self._previously_failed_count = None # type: Optional[int]
255 self._report_status = None # type: Optional[str]
256 self._skipped_files = 0 # count skipped files during collection due to --lf
258 if config.getoption("lf"):
259 self._last_failed_paths = self.get_last_failed_paths()
260 config.pluginmanager.register(
261 LFPluginCollWrapper(self), "lfplugin-collwrapper"
262 )
264 def get_last_failed_paths(self) -> Set[Path]:
265 """Returns a set with all Paths()s of the previously failed nodeids."""
266 rootpath = Path(str(self.config.rootdir))
267 result = {rootpath / nodeid.split("::")[0] for nodeid in self.lastfailed}
268 return {x for x in result if x.exists()}
270 def pytest_report_collectionfinish(self) -> Optional[str]:
271 if self.active and self.config.getoption("verbose") >= 0:
272 return "run-last-failure: %s" % self._report_status
273 return None
275 def pytest_runtest_logreport(self, report: TestReport) -> None:
276 if (report.when == "call" and report.passed) or report.skipped:
277 self.lastfailed.pop(report.nodeid, None)
278 elif report.failed:
279 self.lastfailed[report.nodeid] = True
281 def pytest_collectreport(self, report: CollectReport) -> None:
282 passed = report.outcome in ("passed", "skipped")
283 if passed:
284 if report.nodeid in self.lastfailed:
285 self.lastfailed.pop(report.nodeid)
286 self.lastfailed.update((item.nodeid, True) for item in report.result)
287 else:
288 self.lastfailed[report.nodeid] = True
290 @pytest.hookimpl(hookwrapper=True, tryfirst=True)
291 def pytest_collection_modifyitems(
292 self, config: Config, items: List[nodes.Item]
293 ) -> Generator[None, None, None]:
294 yield
296 if not self.active:
297 return
299 if self.lastfailed:
300 previously_failed = []
301 previously_passed = []
302 for item in items:
303 if item.nodeid in self.lastfailed:
304 previously_failed.append(item)
305 else:
306 previously_passed.append(item)
307 self._previously_failed_count = len(previously_failed)
309 if not previously_failed:
310 # Running a subset of all tests with recorded failures
311 # only outside of it.
312 self._report_status = "%d known failures not in selected tests" % (
313 len(self.lastfailed),
314 )
315 else:
316 if self.config.getoption("lf"):
317 items[:] = previously_failed
318 config.hook.pytest_deselected(items=previously_passed)
319 else: # --failedfirst
320 items[:] = previously_failed + previously_passed
322 noun = "failure" if self._previously_failed_count == 1 else "failures"
323 suffix = " first" if self.config.getoption("failedfirst") else ""
324 self._report_status = "rerun previous {count} {noun}{suffix}".format(
325 count=self._previously_failed_count, suffix=suffix, noun=noun
326 )
328 if self._skipped_files > 0:
329 files_noun = "file" if self._skipped_files == 1 else "files"
330 self._report_status += " (skipped {files} {files_noun})".format(
331 files=self._skipped_files, files_noun=files_noun
332 )
333 else:
334 self._report_status = "no previously failed tests, "
335 if self.config.getoption("last_failed_no_failures") == "none":
336 self._report_status += "deselecting all items."
337 config.hook.pytest_deselected(items=items[:])
338 items[:] = []
339 else:
340 self._report_status += "not deselecting items."
342 def pytest_sessionfinish(self, session: Session) -> None:
343 config = self.config
344 if config.getoption("cacheshow") or hasattr(config, "workerinput"):
345 return
347 assert config.cache is not None
348 saved_lastfailed = config.cache.get("cache/lastfailed", {})
349 if saved_lastfailed != self.lastfailed:
350 config.cache.set("cache/lastfailed", self.lastfailed)
353class NFPlugin:
354 """ Plugin which implements the --nf (run new-first) option """
356 def __init__(self, config: Config) -> None:
357 self.config = config
358 self.active = config.option.newfirst
359 assert config.cache is not None
360 self.cached_nodeids = set(config.cache.get("cache/nodeids", []))
362 @pytest.hookimpl(hookwrapper=True, tryfirst=True)
363 def pytest_collection_modifyitems(
364 self, items: List[nodes.Item]
365 ) -> Generator[None, None, None]:
366 yield
368 if self.active:
369 new_items = order_preserving_dict() # type: Dict[str, nodes.Item]
370 other_items = order_preserving_dict() # type: Dict[str, nodes.Item]
371 for item in items:
372 if item.nodeid not in self.cached_nodeids:
373 new_items[item.nodeid] = item
374 else:
375 other_items[item.nodeid] = item
377 items[:] = self._get_increasing_order(
378 new_items.values()
379 ) + self._get_increasing_order(other_items.values())
380 self.cached_nodeids.update(new_items)
381 else:
382 self.cached_nodeids.update(item.nodeid for item in items)
384 def _get_increasing_order(self, items: Iterable[nodes.Item]) -> List[nodes.Item]:
385 return sorted(items, key=lambda item: item.fspath.mtime(), reverse=True)
387 def pytest_sessionfinish(self) -> None:
388 config = self.config
389 if config.getoption("cacheshow") or hasattr(config, "workerinput"):
390 return
392 if config.getoption("collectonly"):
393 return
395 assert config.cache is not None
396 config.cache.set("cache/nodeids", sorted(self.cached_nodeids))
399def pytest_addoption(parser: Parser) -> None:
400 group = parser.getgroup("general")
401 group.addoption(
402 "--lf",
403 "--last-failed",
404 action="store_true",
405 dest="lf",
406 help="rerun only the tests that failed "
407 "at the last run (or all if none failed)",
408 )
409 group.addoption(
410 "--ff",
411 "--failed-first",
412 action="store_true",
413 dest="failedfirst",
414 help="run all tests, but run the last failures first.\n"
415 "This may re-order tests and thus lead to "
416 "repeated fixture setup/teardown.",
417 )
418 group.addoption(
419 "--nf",
420 "--new-first",
421 action="store_true",
422 dest="newfirst",
423 help="run tests from new files first, then the rest of the tests "
424 "sorted by file mtime",
425 )
426 group.addoption(
427 "--cache-show",
428 action="append",
429 nargs="?",
430 dest="cacheshow",
431 help=(
432 "show cache contents, don't perform collection or tests. "
433 "Optional argument: glob (default: '*')."
434 ),
435 )
436 group.addoption(
437 "--cache-clear",
438 action="store_true",
439 dest="cacheclear",
440 help="remove all cache contents at start of test run.",
441 )
442 cache_dir_default = ".pytest_cache"
443 if "TOX_ENV_DIR" in os.environ:
444 cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default)
445 parser.addini("cache_dir", default=cache_dir_default, help="cache directory path.")
446 group.addoption(
447 "--lfnf",
448 "--last-failed-no-failures",
449 action="store",
450 dest="last_failed_no_failures",
451 choices=("all", "none"),
452 default="all",
453 help="which tests to run with no previously (known) failures.",
454 )
457def pytest_cmdline_main(config: Config) -> Optional[Union[int, ExitCode]]:
458 if config.option.cacheshow:
459 from _pytest.main import wrap_session
461 return wrap_session(config, cacheshow)
462 return None
465@pytest.hookimpl(tryfirst=True)
466def pytest_configure(config: Config) -> None:
467 config.cache = Cache.for_config(config)
468 config.pluginmanager.register(LFPlugin(config), "lfplugin")
469 config.pluginmanager.register(NFPlugin(config), "nfplugin")
472@pytest.fixture
473def cache(request: FixtureRequest) -> Cache:
474 """
475 Return a cache object that can persist state between testing sessions.
477 cache.get(key, default)
478 cache.set(key, value)
480 Keys must be a ``/`` separated value, where the first part is usually the
481 name of your plugin or application to avoid clashes with other cache users.
483 Values can be any object handled by the json stdlib module.
484 """
485 assert request.config.cache is not None
486 return request.config.cache
489def pytest_report_header(config: Config) -> Optional[str]:
490 """Display cachedir with --cache-show and if non-default."""
491 if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache":
492 assert config.cache is not None
493 cachedir = config.cache._cachedir
494 # TODO: evaluate generating upward relative paths
495 # starting with .., ../.. if sensible
497 try:
498 displaypath = cachedir.relative_to(str(config.rootdir))
499 except ValueError:
500 displaypath = cachedir
501 return "cachedir: {}".format(displaypath)
502 return None
505def cacheshow(config: Config, session: Session) -> int:
506 from pprint import pformat
508 assert config.cache is not None
510 tw = TerminalWriter()
511 tw.line("cachedir: " + str(config.cache._cachedir))
512 if not config.cache._cachedir.is_dir():
513 tw.line("cache is empty")
514 return 0
516 glob = config.option.cacheshow[0]
517 if glob is None:
518 glob = "*"
520 dummy = object()
521 basedir = config.cache._cachedir
522 vdir = basedir / Cache._CACHE_PREFIX_VALUES
523 tw.sep("-", "cache values for %r" % glob)
524 for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()):
525 key = str(valpath.relative_to(vdir))
526 val = config.cache.get(key, dummy)
527 if val is dummy:
528 tw.line("%s contains unreadable content, will be ignored" % key)
529 else:
530 tw.line("%s contains:" % key)
531 for line in pformat(val).splitlines():
532 tw.line(" " + line)
534 ddir = basedir / Cache._CACHE_PREFIX_DIRS
535 if ddir.is_dir():
536 contents = sorted(ddir.rglob(glob))
537 tw.sep("-", "cache directories for %r" % glob)
538 for p in contents:
539 # if p.check(dir=1):
540 # print("%s/" % p.relto(basedir))
541 if p.is_file():
542 key = str(p.relative_to(basedir))
543 tw.line("{} is a file of length {:d}".format(key, p.stat().st_size))
544 return 0