Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2per-test stdout/stderr capturing mechanism. 

3 

4""" 

5import collections 

6import contextlib 

7import io 

8import os 

9import sys 

10from io import UnsupportedOperation 

11from tempfile import TemporaryFile 

12from typing import Generator 

13from typing import Optional 

14from typing import TextIO 

15from typing import Tuple 

16from typing import Union 

17 

18import pytest 

19from _pytest.compat import TYPE_CHECKING 

20from _pytest.config import Config 

21from _pytest.config.argparsing import Parser 

22from _pytest.fixtures import SubRequest 

23from _pytest.nodes import Collector 

24from _pytest.nodes import Item 

25 

26if TYPE_CHECKING: 

27 from typing_extensions import Literal 

28 

29 _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] 

30 

31 

32def pytest_addoption(parser: Parser) -> None: 

33 group = parser.getgroup("general") 

34 group._addoption( 

35 "--capture", 

36 action="store", 

37 default="fd", 

38 metavar="method", 

39 choices=["fd", "sys", "no", "tee-sys"], 

40 help="per-test capturing method: one of fd|sys|no|tee-sys.", 

41 ) 

42 group._addoption( 

43 "-s", 

44 action="store_const", 

45 const="no", 

46 dest="capture", 

47 help="shortcut for --capture=no.", 

48 ) 

49 

50 

51def _colorama_workaround() -> None: 

52 """ 

53 Ensure colorama is imported so that it attaches to the correct stdio 

54 handles on Windows. 

55 

56 colorama uses the terminal on import time. So if something does the 

57 first import of colorama while I/O capture is active, colorama will 

58 fail in various ways. 

59 """ 

60 if sys.platform.startswith("win32"): 

61 try: 

62 import colorama # noqa: F401 

63 except ImportError: 

64 pass 

65 

66 

67def _readline_workaround() -> None: 

68 """ 

69 Ensure readline is imported so that it attaches to the correct stdio 

70 handles on Windows. 

71 

72 Pdb uses readline support where available--when not running from the Python 

73 prompt, the readline module is not imported until running the pdb REPL. If 

74 running pytest with the --pdb option this means the readline module is not 

75 imported until after I/O capture has been started. 

76 

77 This is a problem for pyreadline, which is often used to implement readline 

78 support on Windows, as it does not attach to the correct handles for stdout 

79 and/or stdin if they have been redirected by the FDCapture mechanism. This 

80 workaround ensures that readline is imported before I/O capture is setup so 

81 that it can attach to the actual stdin/out for the console. 

82 

83 See https://github.com/pytest-dev/pytest/pull/1281 

84 """ 

85 if sys.platform.startswith("win32"): 

86 try: 

87 import readline # noqa: F401 

88 except ImportError: 

89 pass 

90 

91 

92def _py36_windowsconsoleio_workaround(stream: TextIO) -> None: 

93 """ 

94 Python 3.6 implemented unicode console handling for Windows. This works 

95 by reading/writing to the raw console handle using 

96 ``{Read,Write}ConsoleW``. 

97 

98 The problem is that we are going to ``dup2`` over the stdio file 

99 descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the 

100 handles used by Python to write to the console. Though there is still some 

101 weirdness and the console handle seems to only be closed randomly and not 

102 on the first call to ``CloseHandle``, or maybe it gets reopened with the 

103 same handle value when we suspend capturing. 

104 

105 The workaround in this case will reopen stdio with a different fd which 

106 also means a different handle by replicating the logic in 

107 "Py_lifecycle.c:initstdio/create_stdio". 

108 

109 :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given 

110 here as parameter for unittesting purposes. 

111 

112 See https://github.com/pytest-dev/py/issues/103 

113 """ 

114 if ( 

115 not sys.platform.startswith("win32") 

116 or sys.version_info[:2] < (3, 6) 

117 or hasattr(sys, "pypy_version_info") 

118 ): 

119 return 

120 

121 # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) 

122 if not hasattr(stream, "buffer"): 

123 return 

124 

125 buffered = hasattr(stream.buffer, "raw") 

126 raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined] 

127 

128 if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined] 

129 return 

130 

131 def _reopen_stdio(f, mode): 

132 if not buffered and mode[0] == "w": 

133 buffering = 0 

134 else: 

135 buffering = -1 

136 

137 return io.TextIOWrapper( 

138 open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type] 

139 f.encoding, 

140 f.errors, 

141 f.newlines, 

142 f.line_buffering, 

143 ) 

144 

145 sys.stdin = _reopen_stdio(sys.stdin, "rb") 

146 sys.stdout = _reopen_stdio(sys.stdout, "wb") 

147 sys.stderr = _reopen_stdio(sys.stderr, "wb") 

148 

149 

150@pytest.hookimpl(hookwrapper=True) 

151def pytest_load_initial_conftests(early_config: Config): 

152 ns = early_config.known_args_namespace 

153 if ns.capture == "fd": 

154 _py36_windowsconsoleio_workaround(sys.stdout) 

155 _colorama_workaround() 

156 _readline_workaround() 

157 pluginmanager = early_config.pluginmanager 

158 capman = CaptureManager(ns.capture) 

159 pluginmanager.register(capman, "capturemanager") 

160 

161 # make sure that capturemanager is properly reset at final shutdown 

162 early_config.add_cleanup(capman.stop_global_capturing) 

163 

164 # finally trigger conftest loading but while capturing (issue93) 

165 capman.start_global_capturing() 

166 outcome = yield 

167 capman.suspend_global_capture() 

168 if outcome.excinfo is not None: 

169 out, err = capman.read_global_capture() 

170 sys.stdout.write(out) 

171 sys.stderr.write(err) 

172 

173 

174# IO Helpers. 

175 

176 

177class EncodedFile(io.TextIOWrapper): 

178 __slots__ = () 

179 

180 @property 

181 def name(self) -> str: 

182 # Ensure that file.name is a string. Workaround for a Python bug 

183 # fixed in >=3.7.4: https://bugs.python.org/issue36015 

184 return repr(self.buffer) 

185 

186 @property 

187 def mode(self) -> str: 

188 # TextIOWrapper doesn't expose a mode, but at least some of our 

189 # tests check it. 

190 return self.buffer.mode.replace("b", "") 

191 

192 

193class CaptureIO(io.TextIOWrapper): 

194 def __init__(self) -> None: 

195 super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) 

196 

197 def getvalue(self) -> str: 

198 assert isinstance(self.buffer, io.BytesIO) 

199 return self.buffer.getvalue().decode("UTF-8") 

200 

201 

202class TeeCaptureIO(CaptureIO): 

203 def __init__(self, other: TextIO) -> None: 

204 self._other = other 

205 super().__init__() 

206 

207 def write(self, s: str) -> int: 

208 super().write(s) 

209 return self._other.write(s) 

210 

211 

212class DontReadFromInput: 

213 encoding = None 

214 

215 def read(self, *args): 

216 raise OSError( 

217 "pytest: reading from stdin while output is captured! Consider using `-s`." 

218 ) 

219 

220 readline = read 

221 readlines = read 

222 __next__ = read 

223 

224 def __iter__(self): 

225 return self 

226 

227 def fileno(self) -> int: 

228 raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") 

229 

230 def isatty(self) -> bool: 

231 return False 

232 

233 def close(self) -> None: 

234 pass 

235 

236 @property 

237 def buffer(self): 

238 return self 

239 

240 

241# Capture classes. 

242 

243 

244patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} 

245 

246 

247class NoCapture: 

248 EMPTY_BUFFER = None 

249 __init__ = start = done = suspend = resume = lambda *args: None 

250 

251 

252class SysCaptureBinary: 

253 

254 EMPTY_BUFFER = b"" 

255 

256 def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None: 

257 name = patchsysdict[fd] 

258 self._old = getattr(sys, name) 

259 self.name = name 

260 if tmpfile is None: 

261 if name == "stdin": 

262 tmpfile = DontReadFromInput() 

263 else: 

264 tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) 

265 self.tmpfile = tmpfile 

266 self._state = "initialized" 

267 

268 def repr(self, class_name: str) -> str: 

269 return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( 

270 class_name, 

271 self.name, 

272 hasattr(self, "_old") and repr(self._old) or "<UNSET>", 

273 self._state, 

274 self.tmpfile, 

275 ) 

276 

277 def __repr__(self) -> str: 

278 return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( 

279 self.__class__.__name__, 

280 self.name, 

281 hasattr(self, "_old") and repr(self._old) or "<UNSET>", 

282 self._state, 

283 self.tmpfile, 

284 ) 

285 

286 def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: 

287 assert ( 

288 self._state in states 

289 ), "cannot {} in state {!r}: expected one of {}".format( 

290 op, self._state, ", ".join(states) 

291 ) 

292 

293 def start(self) -> None: 

294 self._assert_state("start", ("initialized",)) 

295 setattr(sys, self.name, self.tmpfile) 

296 self._state = "started" 

297 

298 def snap(self): 

299 self._assert_state("snap", ("started", "suspended")) 

300 self.tmpfile.seek(0) 

301 res = self.tmpfile.buffer.read() 

302 self.tmpfile.seek(0) 

303 self.tmpfile.truncate() 

304 return res 

305 

306 def done(self) -> None: 

307 self._assert_state("done", ("initialized", "started", "suspended", "done")) 

308 if self._state == "done": 

309 return 

310 setattr(sys, self.name, self._old) 

311 del self._old 

312 self.tmpfile.close() 

313 self._state = "done" 

314 

315 def suspend(self) -> None: 

316 self._assert_state("suspend", ("started", "suspended")) 

317 setattr(sys, self.name, self._old) 

318 self._state = "suspended" 

319 

320 def resume(self) -> None: 

321 self._assert_state("resume", ("started", "suspended")) 

322 if self._state == "started": 

323 return 

324 setattr(sys, self.name, self.tmpfile) 

325 self._state = "started" 

326 

327 def writeorg(self, data) -> None: 

328 self._assert_state("writeorg", ("started", "suspended")) 

329 self._old.flush() 

330 self._old.buffer.write(data) 

331 self._old.buffer.flush() 

332 

333 

334class SysCapture(SysCaptureBinary): 

335 EMPTY_BUFFER = "" # type: ignore[assignment] 

336 

337 def snap(self): 

338 res = self.tmpfile.getvalue() 

339 self.tmpfile.seek(0) 

340 self.tmpfile.truncate() 

341 return res 

342 

343 def writeorg(self, data): 

344 self._assert_state("writeorg", ("started", "suspended")) 

345 self._old.write(data) 

346 self._old.flush() 

347 

348 

349class FDCaptureBinary: 

350 """Capture IO to/from a given os-level filedescriptor. 

351 

352 snap() produces `bytes` 

353 """ 

354 

355 EMPTY_BUFFER = b"" 

356 

357 def __init__(self, targetfd: int) -> None: 

358 self.targetfd = targetfd 

359 

360 try: 

361 os.fstat(targetfd) 

362 except OSError: 

363 # FD capturing is conceptually simple -- create a temporary file, 

364 # redirect the FD to it, redirect back when done. But when the 

365 # target FD is invalid it throws a wrench into this loveley scheme. 

366 # 

367 # Tests themselves shouldn't care if the FD is valid, FD capturing 

368 # should work regardless of external circumstances. So falling back 

369 # to just sys capturing is not a good option. 

370 # 

371 # Further complications are the need to support suspend() and the 

372 # possibility of FD reuse (e.g. the tmpfile getting the very same 

373 # target FD). The following approach is robust, I believe. 

374 self.targetfd_invalid = os.open( 

375 os.devnull, os.O_RDWR 

376 ) # type: Optional[int] 

377 os.dup2(self.targetfd_invalid, targetfd) 

378 else: 

379 self.targetfd_invalid = None 

380 self.targetfd_save = os.dup(targetfd) 

381 

382 if targetfd == 0: 

383 self.tmpfile = open(os.devnull) 

384 self.syscapture = SysCapture(targetfd) 

385 else: 

386 self.tmpfile = EncodedFile( 

387 # TODO: Remove type ignore, fixed in next mypy release. 

388 TemporaryFile(buffering=0), # type: ignore[arg-type] 

389 encoding="utf-8", 

390 errors="replace", 

391 newline="", 

392 write_through=True, 

393 ) 

394 if targetfd in patchsysdict: 

395 self.syscapture = SysCapture(targetfd, self.tmpfile) 

396 else: 

397 self.syscapture = NoCapture() 

398 

399 self._state = "initialized" 

400 

401 def __repr__(self) -> str: 

402 return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( 

403 self.__class__.__name__, 

404 self.targetfd, 

405 self.targetfd_save, 

406 self._state, 

407 self.tmpfile, 

408 ) 

409 

410 def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: 

411 assert ( 

412 self._state in states 

413 ), "cannot {} in state {!r}: expected one of {}".format( 

414 op, self._state, ", ".join(states) 

415 ) 

416 

417 def start(self) -> None: 

418 """ Start capturing on targetfd using memorized tmpfile. """ 

419 self._assert_state("start", ("initialized",)) 

420 os.dup2(self.tmpfile.fileno(), self.targetfd) 

421 self.syscapture.start() 

422 self._state = "started" 

423 

424 def snap(self): 

425 self._assert_state("snap", ("started", "suspended")) 

426 self.tmpfile.seek(0) 

427 res = self.tmpfile.buffer.read() 

428 self.tmpfile.seek(0) 

429 self.tmpfile.truncate() 

430 return res 

431 

432 def done(self) -> None: 

433 """ stop capturing, restore streams, return original capture file, 

434 seeked to position zero. """ 

435 self._assert_state("done", ("initialized", "started", "suspended", "done")) 

436 if self._state == "done": 

437 return 

438 os.dup2(self.targetfd_save, self.targetfd) 

439 os.close(self.targetfd_save) 

440 if self.targetfd_invalid is not None: 

441 if self.targetfd_invalid != self.targetfd: 

442 os.close(self.targetfd) 

443 os.close(self.targetfd_invalid) 

444 self.syscapture.done() 

445 self.tmpfile.close() 

446 self._state = "done" 

447 

448 def suspend(self) -> None: 

449 self._assert_state("suspend", ("started", "suspended")) 

450 if self._state == "suspended": 

451 return 

452 self.syscapture.suspend() 

453 os.dup2(self.targetfd_save, self.targetfd) 

454 self._state = "suspended" 

455 

456 def resume(self) -> None: 

457 self._assert_state("resume", ("started", "suspended")) 

458 if self._state == "started": 

459 return 

460 self.syscapture.resume() 

461 os.dup2(self.tmpfile.fileno(), self.targetfd) 

462 self._state = "started" 

463 

464 def writeorg(self, data): 

465 """ write to original file descriptor. """ 

466 self._assert_state("writeorg", ("started", "suspended")) 

467 os.write(self.targetfd_save, data) 

468 

469 

470class FDCapture(FDCaptureBinary): 

471 """Capture IO to/from a given os-level filedescriptor. 

472 

473 snap() produces text 

474 """ 

475 

476 # Ignore type because it doesn't match the type in the superclass (bytes). 

477 EMPTY_BUFFER = "" # type: ignore 

478 

479 def snap(self): 

480 self._assert_state("snap", ("started", "suspended")) 

481 self.tmpfile.seek(0) 

482 res = self.tmpfile.read() 

483 self.tmpfile.seek(0) 

484 self.tmpfile.truncate() 

485 return res 

486 

487 def writeorg(self, data): 

488 """ write to original file descriptor. """ 

489 super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream 

490 

491 

492# MultiCapture 

493 

494CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) 

495 

496 

497class MultiCapture: 

498 _state = None 

499 _in_suspended = False 

500 

501 def __init__(self, in_, out, err) -> None: 

502 self.in_ = in_ 

503 self.out = out 

504 self.err = err 

505 

506 def __repr__(self) -> str: 

507 return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format( 

508 self.out, self.err, self.in_, self._state, self._in_suspended, 

509 ) 

510 

511 def start_capturing(self) -> None: 

512 self._state = "started" 

513 if self.in_: 

514 self.in_.start() 

515 if self.out: 

516 self.out.start() 

517 if self.err: 

518 self.err.start() 

519 

520 def pop_outerr_to_orig(self): 

521 """ pop current snapshot out/err capture and flush to orig streams. """ 

522 out, err = self.readouterr() 

523 if out: 

524 self.out.writeorg(out) 

525 if err: 

526 self.err.writeorg(err) 

527 return out, err 

528 

529 def suspend_capturing(self, in_: bool = False) -> None: 

530 self._state = "suspended" 

531 if self.out: 

532 self.out.suspend() 

533 if self.err: 

534 self.err.suspend() 

535 if in_ and self.in_: 

536 self.in_.suspend() 

537 self._in_suspended = True 

538 

539 def resume_capturing(self) -> None: 

540 self._state = "started" 

541 if self.out: 

542 self.out.resume() 

543 if self.err: 

544 self.err.resume() 

545 if self._in_suspended: 

546 self.in_.resume() 

547 self._in_suspended = False 

548 

549 def stop_capturing(self) -> None: 

550 """ stop capturing and reset capturing streams """ 

551 if self._state == "stopped": 

552 raise ValueError("was already stopped") 

553 self._state = "stopped" 

554 if self.out: 

555 self.out.done() 

556 if self.err: 

557 self.err.done() 

558 if self.in_: 

559 self.in_.done() 

560 

561 def is_started(self) -> bool: 

562 """Whether actively capturing -- not suspended or stopped.""" 

563 return self._state == "started" 

564 

565 def readouterr(self) -> CaptureResult: 

566 if self.out: 

567 out = self.out.snap() 

568 else: 

569 out = "" 

570 if self.err: 

571 err = self.err.snap() 

572 else: 

573 err = "" 

574 return CaptureResult(out, err) 

575 

576 

577def _get_multicapture(method: "_CaptureMethod") -> MultiCapture: 

578 if method == "fd": 

579 return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) 

580 elif method == "sys": 

581 return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) 

582 elif method == "no": 

583 return MultiCapture(in_=None, out=None, err=None) 

584 elif method == "tee-sys": 

585 return MultiCapture( 

586 in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) 

587 ) 

588 raise ValueError("unknown capturing method: {!r}".format(method)) 

589 

590 

591# CaptureManager and CaptureFixture 

592 

593 

594class CaptureManager: 

595 """ 

596 Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each 

597 test phase (setup, call, teardown). After each of those points, the captured output is obtained and 

598 attached to the collection/runtest report. 

599 

600 There are two levels of capture: 

601 * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled 

602 during collection and each test phase. 

603 * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this 

604 case special handling is needed to ensure the fixtures take precedence over the global capture. 

605 """ 

606 

607 def __init__(self, method: "_CaptureMethod") -> None: 

608 self._method = method 

609 self._global_capturing = None # type: Optional[MultiCapture] 

610 self._capture_fixture = None # type: Optional[CaptureFixture] 

611 

612 def __repr__(self) -> str: 

613 return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format( 

614 self._method, self._global_capturing, self._capture_fixture 

615 ) 

616 

617 def is_capturing(self) -> Union[str, bool]: 

618 if self.is_globally_capturing(): 

619 return "global" 

620 if self._capture_fixture: 

621 return "fixture %s" % self._capture_fixture.request.fixturename 

622 return False 

623 

624 # Global capturing control 

625 

626 def is_globally_capturing(self) -> bool: 

627 return self._method != "no" 

628 

629 def start_global_capturing(self) -> None: 

630 assert self._global_capturing is None 

631 self._global_capturing = _get_multicapture(self._method) 

632 self._global_capturing.start_capturing() 

633 

634 def stop_global_capturing(self) -> None: 

635 if self._global_capturing is not None: 

636 self._global_capturing.pop_outerr_to_orig() 

637 self._global_capturing.stop_capturing() 

638 self._global_capturing = None 

639 

640 def resume_global_capture(self) -> None: 

641 # During teardown of the python process, and on rare occasions, capture 

642 # attributes can be `None` while trying to resume global capture. 

643 if self._global_capturing is not None: 

644 self._global_capturing.resume_capturing() 

645 

646 def suspend_global_capture(self, in_: bool = False) -> None: 

647 if self._global_capturing is not None: 

648 self._global_capturing.suspend_capturing(in_=in_) 

649 

650 def suspend(self, in_: bool = False) -> None: 

651 # Need to undo local capsys-et-al if it exists before disabling global capture. 

652 self.suspend_fixture() 

653 self.suspend_global_capture(in_) 

654 

655 def resume(self) -> None: 

656 self.resume_global_capture() 

657 self.resume_fixture() 

658 

659 def read_global_capture(self): 

660 assert self._global_capturing is not None 

661 return self._global_capturing.readouterr() 

662 

663 # Fixture Control 

664 

665 def set_fixture(self, capture_fixture: "CaptureFixture") -> None: 

666 if self._capture_fixture: 

667 current_fixture = self._capture_fixture.request.fixturename 

668 requested_fixture = capture_fixture.request.fixturename 

669 capture_fixture.request.raiseerror( 

670 "cannot use {} and {} at the same time".format( 

671 requested_fixture, current_fixture 

672 ) 

673 ) 

674 self._capture_fixture = capture_fixture 

675 

676 def unset_fixture(self) -> None: 

677 self._capture_fixture = None 

678 

679 def activate_fixture(self) -> None: 

680 """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over 

681 the global capture. 

682 """ 

683 if self._capture_fixture: 

684 self._capture_fixture._start() 

685 

686 def deactivate_fixture(self) -> None: 

687 """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" 

688 if self._capture_fixture: 

689 self._capture_fixture.close() 

690 

691 def suspend_fixture(self) -> None: 

692 if self._capture_fixture: 

693 self._capture_fixture._suspend() 

694 

695 def resume_fixture(self) -> None: 

696 if self._capture_fixture: 

697 self._capture_fixture._resume() 

698 

699 # Helper context managers 

700 

701 @contextlib.contextmanager 

702 def global_and_fixture_disabled(self) -> Generator[None, None, None]: 

703 """Context manager to temporarily disable global and current fixture capturing.""" 

704 do_fixture = self._capture_fixture and self._capture_fixture._is_started() 

705 if do_fixture: 

706 self.suspend_fixture() 

707 do_global = self._global_capturing and self._global_capturing.is_started() 

708 if do_global: 

709 self.suspend_global_capture() 

710 try: 

711 yield 

712 finally: 

713 if do_global: 

714 self.resume_global_capture() 

715 if do_fixture: 

716 self.resume_fixture() 

717 

718 @contextlib.contextmanager 

719 def item_capture(self, when: str, item: Item) -> Generator[None, None, None]: 

720 self.resume_global_capture() 

721 self.activate_fixture() 

722 try: 

723 yield 

724 finally: 

725 self.deactivate_fixture() 

726 self.suspend_global_capture(in_=False) 

727 

728 out, err = self.read_global_capture() 

729 item.add_report_section(when, "stdout", out) 

730 item.add_report_section(when, "stderr", err) 

731 

732 # Hooks 

733 

734 @pytest.hookimpl(hookwrapper=True) 

735 def pytest_make_collect_report(self, collector: Collector): 

736 if isinstance(collector, pytest.File): 

737 self.resume_global_capture() 

738 outcome = yield 

739 self.suspend_global_capture() 

740 out, err = self.read_global_capture() 

741 rep = outcome.get_result() 

742 if out: 

743 rep.sections.append(("Captured stdout", out)) 

744 if err: 

745 rep.sections.append(("Captured stderr", err)) 

746 else: 

747 yield 

748 

749 @pytest.hookimpl(hookwrapper=True) 

750 def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]: 

751 with self.item_capture("setup", item): 

752 yield 

753 

754 @pytest.hookimpl(hookwrapper=True) 

755 def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]: 

756 with self.item_capture("call", item): 

757 yield 

758 

759 @pytest.hookimpl(hookwrapper=True) 

760 def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]: 

761 with self.item_capture("teardown", item): 

762 yield 

763 

764 @pytest.hookimpl(tryfirst=True) 

765 def pytest_keyboard_interrupt(self) -> None: 

766 self.stop_global_capturing() 

767 

768 @pytest.hookimpl(tryfirst=True) 

769 def pytest_internalerror(self) -> None: 

770 self.stop_global_capturing() 

771 

772 

773class CaptureFixture: 

774 """ 

775 Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` 

776 fixtures. 

777 """ 

778 

779 def __init__(self, captureclass, request: SubRequest) -> None: 

780 self.captureclass = captureclass 

781 self.request = request 

782 self._capture = None # type: Optional[MultiCapture] 

783 self._captured_out = self.captureclass.EMPTY_BUFFER 

784 self._captured_err = self.captureclass.EMPTY_BUFFER 

785 

786 def _start(self) -> None: 

787 if self._capture is None: 

788 self._capture = MultiCapture( 

789 in_=None, out=self.captureclass(1), err=self.captureclass(2), 

790 ) 

791 self._capture.start_capturing() 

792 

793 def close(self) -> None: 

794 if self._capture is not None: 

795 out, err = self._capture.pop_outerr_to_orig() 

796 self._captured_out += out 

797 self._captured_err += err 

798 self._capture.stop_capturing() 

799 self._capture = None 

800 

801 def readouterr(self): 

802 """Read and return the captured output so far, resetting the internal buffer. 

803 

804 :return: captured content as a namedtuple with ``out`` and ``err`` string attributes 

805 """ 

806 captured_out, captured_err = self._captured_out, self._captured_err 

807 if self._capture is not None: 

808 out, err = self._capture.readouterr() 

809 captured_out += out 

810 captured_err += err 

811 self._captured_out = self.captureclass.EMPTY_BUFFER 

812 self._captured_err = self.captureclass.EMPTY_BUFFER 

813 return CaptureResult(captured_out, captured_err) 

814 

815 def _suspend(self) -> None: 

816 """Suspends this fixture's own capturing temporarily.""" 

817 if self._capture is not None: 

818 self._capture.suspend_capturing() 

819 

820 def _resume(self) -> None: 

821 """Resumes this fixture's own capturing temporarily.""" 

822 if self._capture is not None: 

823 self._capture.resume_capturing() 

824 

825 def _is_started(self) -> bool: 

826 """Whether actively capturing -- not disabled or closed.""" 

827 if self._capture is not None: 

828 return self._capture.is_started() 

829 return False 

830 

831 @contextlib.contextmanager 

832 def disabled(self) -> Generator[None, None, None]: 

833 """Temporarily disables capture while inside the 'with' block.""" 

834 capmanager = self.request.config.pluginmanager.getplugin("capturemanager") 

835 with capmanager.global_and_fixture_disabled(): 

836 yield 

837 

838 

839# The fixtures. 

840 

841 

842@pytest.fixture 

843def capsys(request: SubRequest) -> Generator[CaptureFixture, None, None]: 

844 """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. 

845 

846 The captured output is made available via ``capsys.readouterr()`` method 

847 calls, which return a ``(out, err)`` namedtuple. 

848 ``out`` and ``err`` will be ``text`` objects. 

849 """ 

850 capman = request.config.pluginmanager.getplugin("capturemanager") 

851 capture_fixture = CaptureFixture(SysCapture, request) 

852 capman.set_fixture(capture_fixture) 

853 capture_fixture._start() 

854 yield capture_fixture 

855 capture_fixture.close() 

856 capman.unset_fixture() 

857 

858 

859@pytest.fixture 

860def capsysbinary(request: SubRequest) -> Generator[CaptureFixture, None, None]: 

861 """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. 

862 

863 The captured output is made available via ``capsysbinary.readouterr()`` 

864 method calls, which return a ``(out, err)`` namedtuple. 

865 ``out`` and ``err`` will be ``bytes`` objects. 

866 """ 

867 capman = request.config.pluginmanager.getplugin("capturemanager") 

868 capture_fixture = CaptureFixture(SysCaptureBinary, request) 

869 capman.set_fixture(capture_fixture) 

870 capture_fixture._start() 

871 yield capture_fixture 

872 capture_fixture.close() 

873 capman.unset_fixture() 

874 

875 

876@pytest.fixture 

877def capfd(request: SubRequest) -> Generator[CaptureFixture, None, None]: 

878 """Enable text capturing of writes to file descriptors ``1`` and ``2``. 

879 

880 The captured output is made available via ``capfd.readouterr()`` method 

881 calls, which return a ``(out, err)`` namedtuple. 

882 ``out`` and ``err`` will be ``text`` objects. 

883 """ 

884 capman = request.config.pluginmanager.getplugin("capturemanager") 

885 capture_fixture = CaptureFixture(FDCapture, request) 

886 capman.set_fixture(capture_fixture) 

887 capture_fixture._start() 

888 yield capture_fixture 

889 capture_fixture.close() 

890 capman.unset_fixture() 

891 

892 

893@pytest.fixture 

894def capfdbinary(request: SubRequest) -> Generator[CaptureFixture, None, None]: 

895 """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. 

896 

897 The captured output is made available via ``capfd.readouterr()`` method 

898 calls, which return a ``(out, err)`` namedtuple. 

899 ``out`` and ``err`` will be ``byte`` objects. 

900 """ 

901 capman = request.config.pluginmanager.getplugin("capturemanager") 

902 capture_fixture = CaptureFixture(FDCaptureBinary, request) 

903 capman.set_fixture(capture_fixture) 

904 capture_fixture._start() 

905 yield capture_fixture 

906 capture_fixture.close() 

907 capman.unset_fixture()