Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/_pytest/capture.py : 17%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2per-test stdout/stderr capturing mechanism.
4"""
5import collections
6import contextlib
7import io
8import os
9import sys
10from io import UnsupportedOperation
11from tempfile import TemporaryFile
12from typing import Generator
13from typing import Optional
14from typing import TextIO
15from typing import Tuple
16from typing import Union
18import pytest
19from _pytest.compat import TYPE_CHECKING
20from _pytest.config import Config
21from _pytest.config.argparsing import Parser
22from _pytest.fixtures import SubRequest
23from _pytest.nodes import Collector
24from _pytest.nodes import Item
26if TYPE_CHECKING:
27 from typing_extensions import Literal
29 _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]
32def pytest_addoption(parser: Parser) -> None:
33 group = parser.getgroup("general")
34 group._addoption(
35 "--capture",
36 action="store",
37 default="fd",
38 metavar="method",
39 choices=["fd", "sys", "no", "tee-sys"],
40 help="per-test capturing method: one of fd|sys|no|tee-sys.",
41 )
42 group._addoption(
43 "-s",
44 action="store_const",
45 const="no",
46 dest="capture",
47 help="shortcut for --capture=no.",
48 )
51def _colorama_workaround() -> None:
52 """
53 Ensure colorama is imported so that it attaches to the correct stdio
54 handles on Windows.
56 colorama uses the terminal on import time. So if something does the
57 first import of colorama while I/O capture is active, colorama will
58 fail in various ways.
59 """
60 if sys.platform.startswith("win32"):
61 try:
62 import colorama # noqa: F401
63 except ImportError:
64 pass
67def _readline_workaround() -> None:
68 """
69 Ensure readline is imported so that it attaches to the correct stdio
70 handles on Windows.
72 Pdb uses readline support where available--when not running from the Python
73 prompt, the readline module is not imported until running the pdb REPL. If
74 running pytest with the --pdb option this means the readline module is not
75 imported until after I/O capture has been started.
77 This is a problem for pyreadline, which is often used to implement readline
78 support on Windows, as it does not attach to the correct handles for stdout
79 and/or stdin if they have been redirected by the FDCapture mechanism. This
80 workaround ensures that readline is imported before I/O capture is setup so
81 that it can attach to the actual stdin/out for the console.
83 See https://github.com/pytest-dev/pytest/pull/1281
84 """
85 if sys.platform.startswith("win32"):
86 try:
87 import readline # noqa: F401
88 except ImportError:
89 pass
92def _py36_windowsconsoleio_workaround(stream: TextIO) -> None:
93 """
94 Python 3.6 implemented unicode console handling for Windows. This works
95 by reading/writing to the raw console handle using
96 ``{Read,Write}ConsoleW``.
98 The problem is that we are going to ``dup2`` over the stdio file
99 descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
100 handles used by Python to write to the console. Though there is still some
101 weirdness and the console handle seems to only be closed randomly and not
102 on the first call to ``CloseHandle``, or maybe it gets reopened with the
103 same handle value when we suspend capturing.
105 The workaround in this case will reopen stdio with a different fd which
106 also means a different handle by replicating the logic in
107 "Py_lifecycle.c:initstdio/create_stdio".
109 :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given
110 here as parameter for unittesting purposes.
112 See https://github.com/pytest-dev/py/issues/103
113 """
114 if (
115 not sys.platform.startswith("win32")
116 or sys.version_info[:2] < (3, 6)
117 or hasattr(sys, "pypy_version_info")
118 ):
119 return
121 # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666)
122 if not hasattr(stream, "buffer"):
123 return
125 buffered = hasattr(stream.buffer, "raw")
126 raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined]
128 if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined]
129 return
131 def _reopen_stdio(f, mode):
132 if not buffered and mode[0] == "w":
133 buffering = 0
134 else:
135 buffering = -1
137 return io.TextIOWrapper(
138 open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type]
139 f.encoding,
140 f.errors,
141 f.newlines,
142 f.line_buffering,
143 )
145 sys.stdin = _reopen_stdio(sys.stdin, "rb")
146 sys.stdout = _reopen_stdio(sys.stdout, "wb")
147 sys.stderr = _reopen_stdio(sys.stderr, "wb")
150@pytest.hookimpl(hookwrapper=True)
151def pytest_load_initial_conftests(early_config: Config):
152 ns = early_config.known_args_namespace
153 if ns.capture == "fd":
154 _py36_windowsconsoleio_workaround(sys.stdout)
155 _colorama_workaround()
156 _readline_workaround()
157 pluginmanager = early_config.pluginmanager
158 capman = CaptureManager(ns.capture)
159 pluginmanager.register(capman, "capturemanager")
161 # make sure that capturemanager is properly reset at final shutdown
162 early_config.add_cleanup(capman.stop_global_capturing)
164 # finally trigger conftest loading but while capturing (issue93)
165 capman.start_global_capturing()
166 outcome = yield
167 capman.suspend_global_capture()
168 if outcome.excinfo is not None:
169 out, err = capman.read_global_capture()
170 sys.stdout.write(out)
171 sys.stderr.write(err)
174# IO Helpers.
177class EncodedFile(io.TextIOWrapper):
178 __slots__ = ()
180 @property
181 def name(self) -> str:
182 # Ensure that file.name is a string. Workaround for a Python bug
183 # fixed in >=3.7.4: https://bugs.python.org/issue36015
184 return repr(self.buffer)
186 @property
187 def mode(self) -> str:
188 # TextIOWrapper doesn't expose a mode, but at least some of our
189 # tests check it.
190 return self.buffer.mode.replace("b", "")
193class CaptureIO(io.TextIOWrapper):
194 def __init__(self) -> None:
195 super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True)
197 def getvalue(self) -> str:
198 assert isinstance(self.buffer, io.BytesIO)
199 return self.buffer.getvalue().decode("UTF-8")
202class TeeCaptureIO(CaptureIO):
203 def __init__(self, other: TextIO) -> None:
204 self._other = other
205 super().__init__()
207 def write(self, s: str) -> int:
208 super().write(s)
209 return self._other.write(s)
212class DontReadFromInput:
213 encoding = None
215 def read(self, *args):
216 raise OSError(
217 "pytest: reading from stdin while output is captured! Consider using `-s`."
218 )
220 readline = read
221 readlines = read
222 __next__ = read
224 def __iter__(self):
225 return self
227 def fileno(self) -> int:
228 raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
230 def isatty(self) -> bool:
231 return False
233 def close(self) -> None:
234 pass
236 @property
237 def buffer(self):
238 return self
241# Capture classes.
244patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
247class NoCapture:
248 EMPTY_BUFFER = None
249 __init__ = start = done = suspend = resume = lambda *args: None
252class SysCaptureBinary:
254 EMPTY_BUFFER = b""
256 def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None:
257 name = patchsysdict[fd]
258 self._old = getattr(sys, name)
259 self.name = name
260 if tmpfile is None:
261 if name == "stdin":
262 tmpfile = DontReadFromInput()
263 else:
264 tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old)
265 self.tmpfile = tmpfile
266 self._state = "initialized"
268 def repr(self, class_name: str) -> str:
269 return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
270 class_name,
271 self.name,
272 hasattr(self, "_old") and repr(self._old) or "<UNSET>",
273 self._state,
274 self.tmpfile,
275 )
277 def __repr__(self) -> str:
278 return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
279 self.__class__.__name__,
280 self.name,
281 hasattr(self, "_old") and repr(self._old) or "<UNSET>",
282 self._state,
283 self.tmpfile,
284 )
286 def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
287 assert (
288 self._state in states
289 ), "cannot {} in state {!r}: expected one of {}".format(
290 op, self._state, ", ".join(states)
291 )
293 def start(self) -> None:
294 self._assert_state("start", ("initialized",))
295 setattr(sys, self.name, self.tmpfile)
296 self._state = "started"
298 def snap(self):
299 self._assert_state("snap", ("started", "suspended"))
300 self.tmpfile.seek(0)
301 res = self.tmpfile.buffer.read()
302 self.tmpfile.seek(0)
303 self.tmpfile.truncate()
304 return res
306 def done(self) -> None:
307 self._assert_state("done", ("initialized", "started", "suspended", "done"))
308 if self._state == "done":
309 return
310 setattr(sys, self.name, self._old)
311 del self._old
312 self.tmpfile.close()
313 self._state = "done"
315 def suspend(self) -> None:
316 self._assert_state("suspend", ("started", "suspended"))
317 setattr(sys, self.name, self._old)
318 self._state = "suspended"
320 def resume(self) -> None:
321 self._assert_state("resume", ("started", "suspended"))
322 if self._state == "started":
323 return
324 setattr(sys, self.name, self.tmpfile)
325 self._state = "started"
327 def writeorg(self, data) -> None:
328 self._assert_state("writeorg", ("started", "suspended"))
329 self._old.flush()
330 self._old.buffer.write(data)
331 self._old.buffer.flush()
334class SysCapture(SysCaptureBinary):
335 EMPTY_BUFFER = "" # type: ignore[assignment]
337 def snap(self):
338 res = self.tmpfile.getvalue()
339 self.tmpfile.seek(0)
340 self.tmpfile.truncate()
341 return res
343 def writeorg(self, data):
344 self._assert_state("writeorg", ("started", "suspended"))
345 self._old.write(data)
346 self._old.flush()
349class FDCaptureBinary:
350 """Capture IO to/from a given os-level filedescriptor.
352 snap() produces `bytes`
353 """
355 EMPTY_BUFFER = b""
357 def __init__(self, targetfd: int) -> None:
358 self.targetfd = targetfd
360 try:
361 os.fstat(targetfd)
362 except OSError:
363 # FD capturing is conceptually simple -- create a temporary file,
364 # redirect the FD to it, redirect back when done. But when the
365 # target FD is invalid it throws a wrench into this loveley scheme.
366 #
367 # Tests themselves shouldn't care if the FD is valid, FD capturing
368 # should work regardless of external circumstances. So falling back
369 # to just sys capturing is not a good option.
370 #
371 # Further complications are the need to support suspend() and the
372 # possibility of FD reuse (e.g. the tmpfile getting the very same
373 # target FD). The following approach is robust, I believe.
374 self.targetfd_invalid = os.open(
375 os.devnull, os.O_RDWR
376 ) # type: Optional[int]
377 os.dup2(self.targetfd_invalid, targetfd)
378 else:
379 self.targetfd_invalid = None
380 self.targetfd_save = os.dup(targetfd)
382 if targetfd == 0:
383 self.tmpfile = open(os.devnull)
384 self.syscapture = SysCapture(targetfd)
385 else:
386 self.tmpfile = EncodedFile(
387 # TODO: Remove type ignore, fixed in next mypy release.
388 TemporaryFile(buffering=0), # type: ignore[arg-type]
389 encoding="utf-8",
390 errors="replace",
391 newline="",
392 write_through=True,
393 )
394 if targetfd in patchsysdict:
395 self.syscapture = SysCapture(targetfd, self.tmpfile)
396 else:
397 self.syscapture = NoCapture()
399 self._state = "initialized"
401 def __repr__(self) -> str:
402 return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
403 self.__class__.__name__,
404 self.targetfd,
405 self.targetfd_save,
406 self._state,
407 self.tmpfile,
408 )
410 def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
411 assert (
412 self._state in states
413 ), "cannot {} in state {!r}: expected one of {}".format(
414 op, self._state, ", ".join(states)
415 )
417 def start(self) -> None:
418 """ Start capturing on targetfd using memorized tmpfile. """
419 self._assert_state("start", ("initialized",))
420 os.dup2(self.tmpfile.fileno(), self.targetfd)
421 self.syscapture.start()
422 self._state = "started"
424 def snap(self):
425 self._assert_state("snap", ("started", "suspended"))
426 self.tmpfile.seek(0)
427 res = self.tmpfile.buffer.read()
428 self.tmpfile.seek(0)
429 self.tmpfile.truncate()
430 return res
432 def done(self) -> None:
433 """ stop capturing, restore streams, return original capture file,
434 seeked to position zero. """
435 self._assert_state("done", ("initialized", "started", "suspended", "done"))
436 if self._state == "done":
437 return
438 os.dup2(self.targetfd_save, self.targetfd)
439 os.close(self.targetfd_save)
440 if self.targetfd_invalid is not None:
441 if self.targetfd_invalid != self.targetfd:
442 os.close(self.targetfd)
443 os.close(self.targetfd_invalid)
444 self.syscapture.done()
445 self.tmpfile.close()
446 self._state = "done"
448 def suspend(self) -> None:
449 self._assert_state("suspend", ("started", "suspended"))
450 if self._state == "suspended":
451 return
452 self.syscapture.suspend()
453 os.dup2(self.targetfd_save, self.targetfd)
454 self._state = "suspended"
456 def resume(self) -> None:
457 self._assert_state("resume", ("started", "suspended"))
458 if self._state == "started":
459 return
460 self.syscapture.resume()
461 os.dup2(self.tmpfile.fileno(), self.targetfd)
462 self._state = "started"
464 def writeorg(self, data):
465 """ write to original file descriptor. """
466 self._assert_state("writeorg", ("started", "suspended"))
467 os.write(self.targetfd_save, data)
470class FDCapture(FDCaptureBinary):
471 """Capture IO to/from a given os-level filedescriptor.
473 snap() produces text
474 """
476 # Ignore type because it doesn't match the type in the superclass (bytes).
477 EMPTY_BUFFER = "" # type: ignore
479 def snap(self):
480 self._assert_state("snap", ("started", "suspended"))
481 self.tmpfile.seek(0)
482 res = self.tmpfile.read()
483 self.tmpfile.seek(0)
484 self.tmpfile.truncate()
485 return res
487 def writeorg(self, data):
488 """ write to original file descriptor. """
489 super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream
492# MultiCapture
494CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
497class MultiCapture:
498 _state = None
499 _in_suspended = False
501 def __init__(self, in_, out, err) -> None:
502 self.in_ = in_
503 self.out = out
504 self.err = err
506 def __repr__(self) -> str:
507 return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
508 self.out, self.err, self.in_, self._state, self._in_suspended,
509 )
511 def start_capturing(self) -> None:
512 self._state = "started"
513 if self.in_:
514 self.in_.start()
515 if self.out:
516 self.out.start()
517 if self.err:
518 self.err.start()
520 def pop_outerr_to_orig(self):
521 """ pop current snapshot out/err capture and flush to orig streams. """
522 out, err = self.readouterr()
523 if out:
524 self.out.writeorg(out)
525 if err:
526 self.err.writeorg(err)
527 return out, err
529 def suspend_capturing(self, in_: bool = False) -> None:
530 self._state = "suspended"
531 if self.out:
532 self.out.suspend()
533 if self.err:
534 self.err.suspend()
535 if in_ and self.in_:
536 self.in_.suspend()
537 self._in_suspended = True
539 def resume_capturing(self) -> None:
540 self._state = "started"
541 if self.out:
542 self.out.resume()
543 if self.err:
544 self.err.resume()
545 if self._in_suspended:
546 self.in_.resume()
547 self._in_suspended = False
549 def stop_capturing(self) -> None:
550 """ stop capturing and reset capturing streams """
551 if self._state == "stopped":
552 raise ValueError("was already stopped")
553 self._state = "stopped"
554 if self.out:
555 self.out.done()
556 if self.err:
557 self.err.done()
558 if self.in_:
559 self.in_.done()
561 def is_started(self) -> bool:
562 """Whether actively capturing -- not suspended or stopped."""
563 return self._state == "started"
565 def readouterr(self) -> CaptureResult:
566 if self.out:
567 out = self.out.snap()
568 else:
569 out = ""
570 if self.err:
571 err = self.err.snap()
572 else:
573 err = ""
574 return CaptureResult(out, err)
577def _get_multicapture(method: "_CaptureMethod") -> MultiCapture:
578 if method == "fd":
579 return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2))
580 elif method == "sys":
581 return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2))
582 elif method == "no":
583 return MultiCapture(in_=None, out=None, err=None)
584 elif method == "tee-sys":
585 return MultiCapture(
586 in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True)
587 )
588 raise ValueError("unknown capturing method: {!r}".format(method))
591# CaptureManager and CaptureFixture
594class CaptureManager:
595 """
596 Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each
597 test phase (setup, call, teardown). After each of those points, the captured output is obtained and
598 attached to the collection/runtest report.
600 There are two levels of capture:
601 * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled
602 during collection and each test phase.
603 * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this
604 case special handling is needed to ensure the fixtures take precedence over the global capture.
605 """
607 def __init__(self, method: "_CaptureMethod") -> None:
608 self._method = method
609 self._global_capturing = None # type: Optional[MultiCapture]
610 self._capture_fixture = None # type: Optional[CaptureFixture]
612 def __repr__(self) -> str:
613 return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
614 self._method, self._global_capturing, self._capture_fixture
615 )
617 def is_capturing(self) -> Union[str, bool]:
618 if self.is_globally_capturing():
619 return "global"
620 if self._capture_fixture:
621 return "fixture %s" % self._capture_fixture.request.fixturename
622 return False
624 # Global capturing control
626 def is_globally_capturing(self) -> bool:
627 return self._method != "no"
629 def start_global_capturing(self) -> None:
630 assert self._global_capturing is None
631 self._global_capturing = _get_multicapture(self._method)
632 self._global_capturing.start_capturing()
634 def stop_global_capturing(self) -> None:
635 if self._global_capturing is not None:
636 self._global_capturing.pop_outerr_to_orig()
637 self._global_capturing.stop_capturing()
638 self._global_capturing = None
640 def resume_global_capture(self) -> None:
641 # During teardown of the python process, and on rare occasions, capture
642 # attributes can be `None` while trying to resume global capture.
643 if self._global_capturing is not None:
644 self._global_capturing.resume_capturing()
646 def suspend_global_capture(self, in_: bool = False) -> None:
647 if self._global_capturing is not None:
648 self._global_capturing.suspend_capturing(in_=in_)
650 def suspend(self, in_: bool = False) -> None:
651 # Need to undo local capsys-et-al if it exists before disabling global capture.
652 self.suspend_fixture()
653 self.suspend_global_capture(in_)
655 def resume(self) -> None:
656 self.resume_global_capture()
657 self.resume_fixture()
659 def read_global_capture(self):
660 assert self._global_capturing is not None
661 return self._global_capturing.readouterr()
663 # Fixture Control
665 def set_fixture(self, capture_fixture: "CaptureFixture") -> None:
666 if self._capture_fixture:
667 current_fixture = self._capture_fixture.request.fixturename
668 requested_fixture = capture_fixture.request.fixturename
669 capture_fixture.request.raiseerror(
670 "cannot use {} and {} at the same time".format(
671 requested_fixture, current_fixture
672 )
673 )
674 self._capture_fixture = capture_fixture
676 def unset_fixture(self) -> None:
677 self._capture_fixture = None
679 def activate_fixture(self) -> None:
680 """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
681 the global capture.
682 """
683 if self._capture_fixture:
684 self._capture_fixture._start()
686 def deactivate_fixture(self) -> None:
687 """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
688 if self._capture_fixture:
689 self._capture_fixture.close()
691 def suspend_fixture(self) -> None:
692 if self._capture_fixture:
693 self._capture_fixture._suspend()
695 def resume_fixture(self) -> None:
696 if self._capture_fixture:
697 self._capture_fixture._resume()
699 # Helper context managers
701 @contextlib.contextmanager
702 def global_and_fixture_disabled(self) -> Generator[None, None, None]:
703 """Context manager to temporarily disable global and current fixture capturing."""
704 do_fixture = self._capture_fixture and self._capture_fixture._is_started()
705 if do_fixture:
706 self.suspend_fixture()
707 do_global = self._global_capturing and self._global_capturing.is_started()
708 if do_global:
709 self.suspend_global_capture()
710 try:
711 yield
712 finally:
713 if do_global:
714 self.resume_global_capture()
715 if do_fixture:
716 self.resume_fixture()
718 @contextlib.contextmanager
719 def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
720 self.resume_global_capture()
721 self.activate_fixture()
722 try:
723 yield
724 finally:
725 self.deactivate_fixture()
726 self.suspend_global_capture(in_=False)
728 out, err = self.read_global_capture()
729 item.add_report_section(when, "stdout", out)
730 item.add_report_section(when, "stderr", err)
732 # Hooks
734 @pytest.hookimpl(hookwrapper=True)
735 def pytest_make_collect_report(self, collector: Collector):
736 if isinstance(collector, pytest.File):
737 self.resume_global_capture()
738 outcome = yield
739 self.suspend_global_capture()
740 out, err = self.read_global_capture()
741 rep = outcome.get_result()
742 if out:
743 rep.sections.append(("Captured stdout", out))
744 if err:
745 rep.sections.append(("Captured stderr", err))
746 else:
747 yield
749 @pytest.hookimpl(hookwrapper=True)
750 def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]:
751 with self.item_capture("setup", item):
752 yield
754 @pytest.hookimpl(hookwrapper=True)
755 def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]:
756 with self.item_capture("call", item):
757 yield
759 @pytest.hookimpl(hookwrapper=True)
760 def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]:
761 with self.item_capture("teardown", item):
762 yield
764 @pytest.hookimpl(tryfirst=True)
765 def pytest_keyboard_interrupt(self) -> None:
766 self.stop_global_capturing()
768 @pytest.hookimpl(tryfirst=True)
769 def pytest_internalerror(self) -> None:
770 self.stop_global_capturing()
773class CaptureFixture:
774 """
775 Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary`
776 fixtures.
777 """
779 def __init__(self, captureclass, request: SubRequest) -> None:
780 self.captureclass = captureclass
781 self.request = request
782 self._capture = None # type: Optional[MultiCapture]
783 self._captured_out = self.captureclass.EMPTY_BUFFER
784 self._captured_err = self.captureclass.EMPTY_BUFFER
786 def _start(self) -> None:
787 if self._capture is None:
788 self._capture = MultiCapture(
789 in_=None, out=self.captureclass(1), err=self.captureclass(2),
790 )
791 self._capture.start_capturing()
793 def close(self) -> None:
794 if self._capture is not None:
795 out, err = self._capture.pop_outerr_to_orig()
796 self._captured_out += out
797 self._captured_err += err
798 self._capture.stop_capturing()
799 self._capture = None
801 def readouterr(self):
802 """Read and return the captured output so far, resetting the internal buffer.
804 :return: captured content as a namedtuple with ``out`` and ``err`` string attributes
805 """
806 captured_out, captured_err = self._captured_out, self._captured_err
807 if self._capture is not None:
808 out, err = self._capture.readouterr()
809 captured_out += out
810 captured_err += err
811 self._captured_out = self.captureclass.EMPTY_BUFFER
812 self._captured_err = self.captureclass.EMPTY_BUFFER
813 return CaptureResult(captured_out, captured_err)
815 def _suspend(self) -> None:
816 """Suspends this fixture's own capturing temporarily."""
817 if self._capture is not None:
818 self._capture.suspend_capturing()
820 def _resume(self) -> None:
821 """Resumes this fixture's own capturing temporarily."""
822 if self._capture is not None:
823 self._capture.resume_capturing()
825 def _is_started(self) -> bool:
826 """Whether actively capturing -- not disabled or closed."""
827 if self._capture is not None:
828 return self._capture.is_started()
829 return False
831 @contextlib.contextmanager
832 def disabled(self) -> Generator[None, None, None]:
833 """Temporarily disables capture while inside the 'with' block."""
834 capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
835 with capmanager.global_and_fixture_disabled():
836 yield
839# The fixtures.
842@pytest.fixture
843def capsys(request: SubRequest) -> Generator[CaptureFixture, None, None]:
844 """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
846 The captured output is made available via ``capsys.readouterr()`` method
847 calls, which return a ``(out, err)`` namedtuple.
848 ``out`` and ``err`` will be ``text`` objects.
849 """
850 capman = request.config.pluginmanager.getplugin("capturemanager")
851 capture_fixture = CaptureFixture(SysCapture, request)
852 capman.set_fixture(capture_fixture)
853 capture_fixture._start()
854 yield capture_fixture
855 capture_fixture.close()
856 capman.unset_fixture()
859@pytest.fixture
860def capsysbinary(request: SubRequest) -> Generator[CaptureFixture, None, None]:
861 """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
863 The captured output is made available via ``capsysbinary.readouterr()``
864 method calls, which return a ``(out, err)`` namedtuple.
865 ``out`` and ``err`` will be ``bytes`` objects.
866 """
867 capman = request.config.pluginmanager.getplugin("capturemanager")
868 capture_fixture = CaptureFixture(SysCaptureBinary, request)
869 capman.set_fixture(capture_fixture)
870 capture_fixture._start()
871 yield capture_fixture
872 capture_fixture.close()
873 capman.unset_fixture()
876@pytest.fixture
877def capfd(request: SubRequest) -> Generator[CaptureFixture, None, None]:
878 """Enable text capturing of writes to file descriptors ``1`` and ``2``.
880 The captured output is made available via ``capfd.readouterr()`` method
881 calls, which return a ``(out, err)`` namedtuple.
882 ``out`` and ``err`` will be ``text`` objects.
883 """
884 capman = request.config.pluginmanager.getplugin("capturemanager")
885 capture_fixture = CaptureFixture(FDCapture, request)
886 capman.set_fixture(capture_fixture)
887 capture_fixture._start()
888 yield capture_fixture
889 capture_fixture.close()
890 capman.unset_fixture()
893@pytest.fixture
894def capfdbinary(request: SubRequest) -> Generator[CaptureFixture, None, None]:
895 """Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
897 The captured output is made available via ``capfd.readouterr()`` method
898 calls, which return a ``(out, err)`` namedtuple.
899 ``out`` and ``err`` will be ``byte`` objects.
900 """
901 capman = request.config.pluginmanager.getplugin("capturemanager")
902 capture_fixture = CaptureFixture(FDCaptureBinary, request)
903 capman.set_fixture(capture_fixture)
904 capture_fixture._start()
905 yield capture_fixture
906 capture_fixture.close()
907 capman.unset_fixture()