Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2merged implementation of the cache provider 

3 

4the name cache was not chosen to ensure pluggy automatically 

5ignores the external pytest-cache 

6""" 

7import json 

8import os 

9from typing import Dict 

10from typing import Generator 

11from typing import Iterable 

12from typing import List 

13from typing import Optional 

14from typing import Set 

15from typing import Union 

16 

17import attr 

18import py 

19 

20import pytest 

21from .pathlib import Path 

22from .pathlib import resolve_from_str 

23from .pathlib import rm_rf 

24from .reports import CollectReport 

25from _pytest import nodes 

26from _pytest._io import TerminalWriter 

27from _pytest.compat import order_preserving_dict 

28from _pytest.config import Config 

29from _pytest.config import ExitCode 

30from _pytest.config.argparsing import Parser 

31from _pytest.fixtures import FixtureRequest 

32from _pytest.main import Session 

33from _pytest.python import Module 

34from _pytest.reports import TestReport 

35 

36 

37README_CONTENT = """\ 

38# pytest cache directory # 

39 

40This directory contains data from the pytest's cache plugin, 

41which provides the `--lf` and `--ff` options, as well as the `cache` fixture. 

42 

43**Do not** commit this to version control. 

44 

45See [the docs](https://docs.pytest.org/en/stable/cache.html) for more information. 

46""" 

47 

48CACHEDIR_TAG_CONTENT = b"""\ 

49Signature: 8a477f597d28d172789f06886806bc55 

50# This file is a cache directory tag created by pytest. 

51# For information about cache directory tags, see: 

52# http://www.bford.info/cachedir/spec.html 

53""" 

54 

55 

56@attr.s 

57class Cache: 

58 _cachedir = attr.ib(type=Path, repr=False) 

59 _config = attr.ib(type=Config, repr=False) 

60 

61 # sub-directory under cache-dir for directories created by "makedir" 

62 _CACHE_PREFIX_DIRS = "d" 

63 

64 # sub-directory under cache-dir for values created by "set" 

65 _CACHE_PREFIX_VALUES = "v" 

66 

67 @classmethod 

68 def for_config(cls, config: Config) -> "Cache": 

69 cachedir = cls.cache_dir_from_config(config) 

70 if config.getoption("cacheclear") and cachedir.is_dir(): 

71 cls.clear_cache(cachedir) 

72 return cls(cachedir, config) 

73 

74 @classmethod 

75 def clear_cache(cls, cachedir: Path) -> None: 

76 """Clears the sub-directories used to hold cached directories and values.""" 

77 for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES): 

78 d = cachedir / prefix 

79 if d.is_dir(): 

80 rm_rf(d) 

81 

82 @staticmethod 

83 def cache_dir_from_config(config: Config) -> Path: 

84 return resolve_from_str(config.getini("cache_dir"), config.rootdir) 

85 

86 def warn(self, fmt: str, **args: object) -> None: 

87 import warnings 

88 from _pytest.warning_types import PytestCacheWarning 

89 

90 warnings.warn( 

91 PytestCacheWarning(fmt.format(**args) if args else fmt), 

92 self._config.hook, 

93 stacklevel=3, 

94 ) 

95 

96 def makedir(self, name: str) -> py.path.local: 

97 """ return a directory path object with the given name. If the 

98 directory does not yet exist, it will be created. You can use it 

99 to manage files likes e. g. store/retrieve database 

100 dumps across test sessions. 

101 

102 :param name: must be a string not containing a ``/`` separator. 

103 Make sure the name contains your plugin or application 

104 identifiers to prevent clashes with other cache users. 

105 """ 

106 path = Path(name) 

107 if len(path.parts) > 1: 

108 raise ValueError("name is not allowed to contain path separators") 

109 res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path) 

110 res.mkdir(exist_ok=True, parents=True) 

111 return py.path.local(res) 

112 

113 def _getvaluepath(self, key: str) -> Path: 

114 return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key)) 

115 

116 def get(self, key: str, default): 

117 """ return cached value for the given key. If no value 

118 was yet cached or the value cannot be read, the specified 

119 default is returned. 

120 

121 :param key: must be a ``/`` separated value. Usually the first 

122 name is the name of your plugin or your application. 

123 :param default: must be provided in case of a cache-miss or 

124 invalid cache values. 

125 

126 """ 

127 path = self._getvaluepath(key) 

128 try: 

129 with path.open("r") as f: 

130 return json.load(f) 

131 except (ValueError, OSError): 

132 return default 

133 

134 def set(self, key: str, value: object) -> None: 

135 """ save value for the given key. 

136 

137 :param key: must be a ``/`` separated value. Usually the first 

138 name is the name of your plugin or your application. 

139 :param value: must be of any combination of basic 

140 python types, including nested types 

141 like e. g. lists of dictionaries. 

142 """ 

143 path = self._getvaluepath(key) 

144 try: 

145 if path.parent.is_dir(): 

146 cache_dir_exists_already = True 

147 else: 

148 cache_dir_exists_already = self._cachedir.exists() 

149 path.parent.mkdir(exist_ok=True, parents=True) 

150 except OSError: 

151 self.warn("could not create cache path {path}", path=path) 

152 return 

153 if not cache_dir_exists_already: 

154 self._ensure_supporting_files() 

155 data = json.dumps(value, indent=2, sort_keys=True) 

156 try: 

157 f = path.open("w") 

158 except OSError: 

159 self.warn("cache could not write path {path}", path=path) 

160 else: 

161 with f: 

162 f.write(data) 

163 

164 def _ensure_supporting_files(self) -> None: 

165 """Create supporting files in the cache dir that are not really part of the cache.""" 

166 readme_path = self._cachedir / "README.md" 

167 readme_path.write_text(README_CONTENT) 

168 

169 gitignore_path = self._cachedir.joinpath(".gitignore") 

170 msg = "# Created by pytest automatically.\n*\n" 

171 gitignore_path.write_text(msg, encoding="UTF-8") 

172 

173 cachedir_tag_path = self._cachedir.joinpath("CACHEDIR.TAG") 

174 cachedir_tag_path.write_bytes(CACHEDIR_TAG_CONTENT) 

175 

176 

177class LFPluginCollWrapper: 

178 def __init__(self, lfplugin: "LFPlugin") -> None: 

179 self.lfplugin = lfplugin 

180 self._collected_at_least_one_failure = False 

181 

182 @pytest.hookimpl(hookwrapper=True) 

183 def pytest_make_collect_report(self, collector: nodes.Collector) -> Generator: 

184 if isinstance(collector, Session): 

185 out = yield 

186 res = out.get_result() # type: CollectReport 

187 

188 # Sort any lf-paths to the beginning. 

189 lf_paths = self.lfplugin._last_failed_paths 

190 res.result = sorted( 

191 res.result, key=lambda x: 0 if Path(str(x.fspath)) in lf_paths else 1, 

192 ) 

193 return 

194 

195 elif isinstance(collector, Module): 

196 if Path(str(collector.fspath)) in self.lfplugin._last_failed_paths: 

197 out = yield 

198 res = out.get_result() 

199 result = res.result 

200 lastfailed = self.lfplugin.lastfailed 

201 

202 # Only filter with known failures. 

203 if not self._collected_at_least_one_failure: 

204 if not any(x.nodeid in lastfailed for x in result): 

205 return 

206 self.lfplugin.config.pluginmanager.register( 

207 LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" 

208 ) 

209 self._collected_at_least_one_failure = True 

210 

211 session = collector.session 

212 result[:] = [ 

213 x 

214 for x in result 

215 if x.nodeid in lastfailed 

216 # Include any passed arguments (not trivial to filter). 

217 or session.isinitpath(x.fspath) 

218 # Keep all sub-collectors. 

219 or isinstance(x, nodes.Collector) 

220 ] 

221 return 

222 yield 

223 

224 

225class LFPluginCollSkipfiles: 

226 def __init__(self, lfplugin: "LFPlugin") -> None: 

227 self.lfplugin = lfplugin 

228 

229 @pytest.hookimpl 

230 def pytest_make_collect_report( 

231 self, collector: nodes.Collector 

232 ) -> Optional[CollectReport]: 

233 if isinstance(collector, Module): 

234 if Path(str(collector.fspath)) not in self.lfplugin._last_failed_paths: 

235 self.lfplugin._skipped_files += 1 

236 

237 return CollectReport( 

238 collector.nodeid, "passed", longrepr=None, result=[] 

239 ) 

240 return None 

241 

242 

243class LFPlugin: 

244 """ Plugin which implements the --lf (run last-failing) option """ 

245 

246 def __init__(self, config: Config) -> None: 

247 self.config = config 

248 active_keys = "lf", "failedfirst" 

249 self.active = any(config.getoption(key) for key in active_keys) 

250 assert config.cache 

251 self.lastfailed = config.cache.get( 

252 "cache/lastfailed", {} 

253 ) # type: Dict[str, bool] 

254 self._previously_failed_count = None # type: Optional[int] 

255 self._report_status = None # type: Optional[str] 

256 self._skipped_files = 0 # count skipped files during collection due to --lf 

257 

258 if config.getoption("lf"): 

259 self._last_failed_paths = self.get_last_failed_paths() 

260 config.pluginmanager.register( 

261 LFPluginCollWrapper(self), "lfplugin-collwrapper" 

262 ) 

263 

264 def get_last_failed_paths(self) -> Set[Path]: 

265 """Returns a set with all Paths()s of the previously failed nodeids.""" 

266 rootpath = Path(str(self.config.rootdir)) 

267 result = {rootpath / nodeid.split("::")[0] for nodeid in self.lastfailed} 

268 return {x for x in result if x.exists()} 

269 

270 def pytest_report_collectionfinish(self) -> Optional[str]: 

271 if self.active and self.config.getoption("verbose") >= 0: 

272 return "run-last-failure: %s" % self._report_status 

273 return None 

274 

275 def pytest_runtest_logreport(self, report: TestReport) -> None: 

276 if (report.when == "call" and report.passed) or report.skipped: 

277 self.lastfailed.pop(report.nodeid, None) 

278 elif report.failed: 

279 self.lastfailed[report.nodeid] = True 

280 

281 def pytest_collectreport(self, report: CollectReport) -> None: 

282 passed = report.outcome in ("passed", "skipped") 

283 if passed: 

284 if report.nodeid in self.lastfailed: 

285 self.lastfailed.pop(report.nodeid) 

286 self.lastfailed.update((item.nodeid, True) for item in report.result) 

287 else: 

288 self.lastfailed[report.nodeid] = True 

289 

290 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

291 def pytest_collection_modifyitems( 

292 self, config: Config, items: List[nodes.Item] 

293 ) -> Generator[None, None, None]: 

294 yield 

295 

296 if not self.active: 

297 return 

298 

299 if self.lastfailed: 

300 previously_failed = [] 

301 previously_passed = [] 

302 for item in items: 

303 if item.nodeid in self.lastfailed: 

304 previously_failed.append(item) 

305 else: 

306 previously_passed.append(item) 

307 self._previously_failed_count = len(previously_failed) 

308 

309 if not previously_failed: 

310 # Running a subset of all tests with recorded failures 

311 # only outside of it. 

312 self._report_status = "%d known failures not in selected tests" % ( 

313 len(self.lastfailed), 

314 ) 

315 else: 

316 if self.config.getoption("lf"): 

317 items[:] = previously_failed 

318 config.hook.pytest_deselected(items=previously_passed) 

319 else: # --failedfirst 

320 items[:] = previously_failed + previously_passed 

321 

322 noun = "failure" if self._previously_failed_count == 1 else "failures" 

323 suffix = " first" if self.config.getoption("failedfirst") else "" 

324 self._report_status = "rerun previous {count} {noun}{suffix}".format( 

325 count=self._previously_failed_count, suffix=suffix, noun=noun 

326 ) 

327 

328 if self._skipped_files > 0: 

329 files_noun = "file" if self._skipped_files == 1 else "files" 

330 self._report_status += " (skipped {files} {files_noun})".format( 

331 files=self._skipped_files, files_noun=files_noun 

332 ) 

333 else: 

334 self._report_status = "no previously failed tests, " 

335 if self.config.getoption("last_failed_no_failures") == "none": 

336 self._report_status += "deselecting all items." 

337 config.hook.pytest_deselected(items=items[:]) 

338 items[:] = [] 

339 else: 

340 self._report_status += "not deselecting items." 

341 

342 def pytest_sessionfinish(self, session: Session) -> None: 

343 config = self.config 

344 if config.getoption("cacheshow") or hasattr(config, "workerinput"): 

345 return 

346 

347 assert config.cache is not None 

348 saved_lastfailed = config.cache.get("cache/lastfailed", {}) 

349 if saved_lastfailed != self.lastfailed: 

350 config.cache.set("cache/lastfailed", self.lastfailed) 

351 

352 

353class NFPlugin: 

354 """ Plugin which implements the --nf (run new-first) option """ 

355 

356 def __init__(self, config: Config) -> None: 

357 self.config = config 

358 self.active = config.option.newfirst 

359 assert config.cache is not None 

360 self.cached_nodeids = set(config.cache.get("cache/nodeids", [])) 

361 

362 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

363 def pytest_collection_modifyitems( 

364 self, items: List[nodes.Item] 

365 ) -> Generator[None, None, None]: 

366 yield 

367 

368 if self.active: 

369 new_items = order_preserving_dict() # type: Dict[str, nodes.Item] 

370 other_items = order_preserving_dict() # type: Dict[str, nodes.Item] 

371 for item in items: 

372 if item.nodeid not in self.cached_nodeids: 

373 new_items[item.nodeid] = item 

374 else: 

375 other_items[item.nodeid] = item 

376 

377 items[:] = self._get_increasing_order( 

378 new_items.values() 

379 ) + self._get_increasing_order(other_items.values()) 

380 self.cached_nodeids.update(new_items) 

381 else: 

382 self.cached_nodeids.update(item.nodeid for item in items) 

383 

384 def _get_increasing_order(self, items: Iterable[nodes.Item]) -> List[nodes.Item]: 

385 return sorted(items, key=lambda item: item.fspath.mtime(), reverse=True) 

386 

387 def pytest_sessionfinish(self) -> None: 

388 config = self.config 

389 if config.getoption("cacheshow") or hasattr(config, "workerinput"): 

390 return 

391 

392 if config.getoption("collectonly"): 

393 return 

394 

395 assert config.cache is not None 

396 config.cache.set("cache/nodeids", sorted(self.cached_nodeids)) 

397 

398 

399def pytest_addoption(parser: Parser) -> None: 

400 group = parser.getgroup("general") 

401 group.addoption( 

402 "--lf", 

403 "--last-failed", 

404 action="store_true", 

405 dest="lf", 

406 help="rerun only the tests that failed " 

407 "at the last run (or all if none failed)", 

408 ) 

409 group.addoption( 

410 "--ff", 

411 "--failed-first", 

412 action="store_true", 

413 dest="failedfirst", 

414 help="run all tests, but run the last failures first.\n" 

415 "This may re-order tests and thus lead to " 

416 "repeated fixture setup/teardown.", 

417 ) 

418 group.addoption( 

419 "--nf", 

420 "--new-first", 

421 action="store_true", 

422 dest="newfirst", 

423 help="run tests from new files first, then the rest of the tests " 

424 "sorted by file mtime", 

425 ) 

426 group.addoption( 

427 "--cache-show", 

428 action="append", 

429 nargs="?", 

430 dest="cacheshow", 

431 help=( 

432 "show cache contents, don't perform collection or tests. " 

433 "Optional argument: glob (default: '*')." 

434 ), 

435 ) 

436 group.addoption( 

437 "--cache-clear", 

438 action="store_true", 

439 dest="cacheclear", 

440 help="remove all cache contents at start of test run.", 

441 ) 

442 cache_dir_default = ".pytest_cache" 

443 if "TOX_ENV_DIR" in os.environ: 

444 cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default) 

445 parser.addini("cache_dir", default=cache_dir_default, help="cache directory path.") 

446 group.addoption( 

447 "--lfnf", 

448 "--last-failed-no-failures", 

449 action="store", 

450 dest="last_failed_no_failures", 

451 choices=("all", "none"), 

452 default="all", 

453 help="which tests to run with no previously (known) failures.", 

454 ) 

455 

456 

457def pytest_cmdline_main(config: Config) -> Optional[Union[int, ExitCode]]: 

458 if config.option.cacheshow: 

459 from _pytest.main import wrap_session 

460 

461 return wrap_session(config, cacheshow) 

462 return None 

463 

464 

465@pytest.hookimpl(tryfirst=True) 

466def pytest_configure(config: Config) -> None: 

467 config.cache = Cache.for_config(config) 

468 config.pluginmanager.register(LFPlugin(config), "lfplugin") 

469 config.pluginmanager.register(NFPlugin(config), "nfplugin") 

470 

471 

472@pytest.fixture 

473def cache(request: FixtureRequest) -> Cache: 

474 """ 

475 Return a cache object that can persist state between testing sessions. 

476 

477 cache.get(key, default) 

478 cache.set(key, value) 

479 

480 Keys must be a ``/`` separated value, where the first part is usually the 

481 name of your plugin or application to avoid clashes with other cache users. 

482 

483 Values can be any object handled by the json stdlib module. 

484 """ 

485 assert request.config.cache is not None 

486 return request.config.cache 

487 

488 

489def pytest_report_header(config: Config) -> Optional[str]: 

490 """Display cachedir with --cache-show and if non-default.""" 

491 if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache": 

492 assert config.cache is not None 

493 cachedir = config.cache._cachedir 

494 # TODO: evaluate generating upward relative paths 

495 # starting with .., ../.. if sensible 

496 

497 try: 

498 displaypath = cachedir.relative_to(str(config.rootdir)) 

499 except ValueError: 

500 displaypath = cachedir 

501 return "cachedir: {}".format(displaypath) 

502 return None 

503 

504 

505def cacheshow(config: Config, session: Session) -> int: 

506 from pprint import pformat 

507 

508 assert config.cache is not None 

509 

510 tw = TerminalWriter() 

511 tw.line("cachedir: " + str(config.cache._cachedir)) 

512 if not config.cache._cachedir.is_dir(): 

513 tw.line("cache is empty") 

514 return 0 

515 

516 glob = config.option.cacheshow[0] 

517 if glob is None: 

518 glob = "*" 

519 

520 dummy = object() 

521 basedir = config.cache._cachedir 

522 vdir = basedir / Cache._CACHE_PREFIX_VALUES 

523 tw.sep("-", "cache values for %r" % glob) 

524 for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()): 

525 key = str(valpath.relative_to(vdir)) 

526 val = config.cache.get(key, dummy) 

527 if val is dummy: 

528 tw.line("%s contains unreadable content, will be ignored" % key) 

529 else: 

530 tw.line("%s contains:" % key) 

531 for line in pformat(val).splitlines(): 

532 tw.line(" " + line) 

533 

534 ddir = basedir / Cache._CACHE_PREFIX_DIRS 

535 if ddir.is_dir(): 

536 contents = sorted(ddir.rglob(glob)) 

537 tw.sep("-", "cache directories for %r" % glob) 

538 for p in contents: 

539 # if p.check(dir=1): 

540 # print("%s/" % p.relto(basedir)) 

541 if p.is_file(): 

542 key = str(p.relative_to(basedir)) 

543 tw.line("{} is a file of length {:d}".format(key, p.stat().st_size)) 

544 return 0