Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" Access and control log capturing. """ 

2import logging 

3import os 

4import re 

5import sys 

6from contextlib import contextmanager 

7from io import StringIO 

8from typing import AbstractSet 

9from typing import Dict 

10from typing import Generator 

11from typing import List 

12from typing import Mapping 

13from typing import Optional 

14from typing import Tuple 

15from typing import TypeVar 

16from typing import Union 

17 

18import pytest 

19from _pytest import nodes 

20from _pytest._io import TerminalWriter 

21from _pytest.capture import CaptureManager 

22from _pytest.compat import nullcontext 

23from _pytest.config import _strtobool 

24from _pytest.config import Config 

25from _pytest.config import create_terminal_writer 

26from _pytest.config.argparsing import Parser 

27from _pytest.fixtures import FixtureRequest 

28from _pytest.main import Session 

29from _pytest.pathlib import Path 

30from _pytest.store import StoreKey 

31from _pytest.terminal import TerminalReporter 

32 

33 

34DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s" 

35DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S" 

36_ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m") 

37caplog_handler_key = StoreKey["LogCaptureHandler"]() 

38caplog_records_key = StoreKey[Dict[str, List[logging.LogRecord]]]() 

39 

40 

41def _remove_ansi_escape_sequences(text: str) -> str: 

42 return _ANSI_ESCAPE_SEQ.sub("", text) 

43 

44 

45class ColoredLevelFormatter(logging.Formatter): 

46 """ 

47 Colorize the %(levelname)..s part of the log format passed to __init__. 

48 """ 

49 

50 LOGLEVEL_COLOROPTS = { 

51 logging.CRITICAL: {"red"}, 

52 logging.ERROR: {"red", "bold"}, 

53 logging.WARNING: {"yellow"}, 

54 logging.WARN: {"yellow"}, 

55 logging.INFO: {"green"}, 

56 logging.DEBUG: {"purple"}, 

57 logging.NOTSET: set(), 

58 } # type: Mapping[int, AbstractSet[str]] 

59 LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*s)") 

60 

61 def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None: 

62 super().__init__(*args, **kwargs) 

63 self._original_fmt = self._style._fmt 

64 self._level_to_fmt_mapping = {} # type: Dict[int, str] 

65 

66 assert self._fmt is not None 

67 levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt) 

68 if not levelname_fmt_match: 

69 return 

70 levelname_fmt = levelname_fmt_match.group() 

71 

72 for level, color_opts in self.LOGLEVEL_COLOROPTS.items(): 

73 formatted_levelname = levelname_fmt % { 

74 "levelname": logging.getLevelName(level) 

75 } 

76 

77 # add ANSI escape sequences around the formatted levelname 

78 color_kwargs = {name: True for name in color_opts} 

79 colorized_formatted_levelname = terminalwriter.markup( 

80 formatted_levelname, **color_kwargs 

81 ) 

82 self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub( 

83 colorized_formatted_levelname, self._fmt 

84 ) 

85 

86 def format(self, record: logging.LogRecord) -> str: 

87 fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt) 

88 self._style._fmt = fmt 

89 return super().format(record) 

90 

91 

92class PercentStyleMultiline(logging.PercentStyle): 

93 """A logging style with special support for multiline messages. 

94 

95 If the message of a record consists of multiple lines, this style 

96 formats the message as if each line were logged separately. 

97 """ 

98 

99 def __init__(self, fmt: str, auto_indent: Union[int, str, bool, None]) -> None: 

100 super().__init__(fmt) 

101 self._auto_indent = self._get_auto_indent(auto_indent) 

102 

103 @staticmethod 

104 def _update_message( 

105 record_dict: Dict[str, object], message: str 

106 ) -> Dict[str, object]: 

107 tmp = record_dict.copy() 

108 tmp["message"] = message 

109 return tmp 

110 

111 @staticmethod 

112 def _get_auto_indent(auto_indent_option: Union[int, str, bool, None]) -> int: 

113 """Determines the current auto indentation setting 

114 

115 Specify auto indent behavior (on/off/fixed) by passing in 

116 extra={"auto_indent": [value]} to the call to logging.log() or 

117 using a --log-auto-indent [value] command line or the 

118 log_auto_indent [value] config option. 

119 

120 Default behavior is auto-indent off. 

121 

122 Using the string "True" or "on" or the boolean True as the value 

123 turns auto indent on, using the string "False" or "off" or the 

124 boolean False or the int 0 turns it off, and specifying a 

125 positive integer fixes the indentation position to the value 

126 specified. 

127 

128 Any other values for the option are invalid, and will silently be 

129 converted to the default. 

130 

131 :param any auto_indent_option: User specified option for indentation 

132 from command line, config or extra kwarg. Accepts int, bool or str. 

133 str option accepts the same range of values as boolean config options, 

134 as well as positive integers represented in str form. 

135 

136 :returns: indentation value, which can be 

137 -1 (automatically determine indentation) or 

138 0 (auto-indent turned off) or 

139 >0 (explicitly set indentation position). 

140 """ 

141 

142 if auto_indent_option is None: 

143 return 0 

144 elif isinstance(auto_indent_option, bool): 

145 if auto_indent_option: 

146 return -1 

147 else: 

148 return 0 

149 elif isinstance(auto_indent_option, int): 

150 return int(auto_indent_option) 

151 elif isinstance(auto_indent_option, str): 

152 try: 

153 return int(auto_indent_option) 

154 except ValueError: 

155 pass 

156 try: 

157 if _strtobool(auto_indent_option): 

158 return -1 

159 except ValueError: 

160 return 0 

161 

162 return 0 

163 

164 def format(self, record: logging.LogRecord) -> str: 

165 if "\n" in record.message: 

166 if hasattr(record, "auto_indent"): 

167 # passed in from the "extra={}" kwarg on the call to logging.log() 

168 auto_indent = self._get_auto_indent(record.auto_indent) # type: ignore[attr-defined] 

169 else: 

170 auto_indent = self._auto_indent 

171 

172 if auto_indent: 

173 lines = record.message.splitlines() 

174 formatted = self._fmt % self._update_message(record.__dict__, lines[0]) 

175 

176 if auto_indent < 0: 

177 indentation = _remove_ansi_escape_sequences(formatted).find( 

178 lines[0] 

179 ) 

180 else: 

181 # optimizes logging by allowing a fixed indentation 

182 indentation = auto_indent 

183 lines[0] = formatted 

184 return ("\n" + " " * indentation).join(lines) 

185 return self._fmt % record.__dict__ 

186 

187 

188def get_option_ini(config: Config, *names: str): 

189 for name in names: 

190 ret = config.getoption(name) # 'default' arg won't work as expected 

191 if ret is None: 

192 ret = config.getini(name) 

193 if ret: 

194 return ret 

195 

196 

197def pytest_addoption(parser: Parser) -> None: 

198 """Add options to control log capturing.""" 

199 group = parser.getgroup("logging") 

200 

201 def add_option_ini(option, dest, default=None, type=None, **kwargs): 

202 parser.addini( 

203 dest, default=default, type=type, help="default value for " + option 

204 ) 

205 group.addoption(option, dest=dest, **kwargs) 

206 

207 add_option_ini( 

208 "--log-level", 

209 dest="log_level", 

210 default=None, 

211 metavar="LEVEL", 

212 help=( 

213 "level of messages to catch/display.\n" 

214 "Not set by default, so it depends on the root/parent log handler's" 

215 ' effective level, where it is "WARNING" by default.' 

216 ), 

217 ) 

218 add_option_ini( 

219 "--log-format", 

220 dest="log_format", 

221 default=DEFAULT_LOG_FORMAT, 

222 help="log format as used by the logging module.", 

223 ) 

224 add_option_ini( 

225 "--log-date-format", 

226 dest="log_date_format", 

227 default=DEFAULT_LOG_DATE_FORMAT, 

228 help="log date format as used by the logging module.", 

229 ) 

230 parser.addini( 

231 "log_cli", 

232 default=False, 

233 type="bool", 

234 help='enable log display during test run (also known as "live logging").', 

235 ) 

236 add_option_ini( 

237 "--log-cli-level", dest="log_cli_level", default=None, help="cli logging level." 

238 ) 

239 add_option_ini( 

240 "--log-cli-format", 

241 dest="log_cli_format", 

242 default=None, 

243 help="log format as used by the logging module.", 

244 ) 

245 add_option_ini( 

246 "--log-cli-date-format", 

247 dest="log_cli_date_format", 

248 default=None, 

249 help="log date format as used by the logging module.", 

250 ) 

251 add_option_ini( 

252 "--log-file", 

253 dest="log_file", 

254 default=None, 

255 help="path to a file when logging will be written to.", 

256 ) 

257 add_option_ini( 

258 "--log-file-level", 

259 dest="log_file_level", 

260 default=None, 

261 help="log file logging level.", 

262 ) 

263 add_option_ini( 

264 "--log-file-format", 

265 dest="log_file_format", 

266 default=DEFAULT_LOG_FORMAT, 

267 help="log format as used by the logging module.", 

268 ) 

269 add_option_ini( 

270 "--log-file-date-format", 

271 dest="log_file_date_format", 

272 default=DEFAULT_LOG_DATE_FORMAT, 

273 help="log date format as used by the logging module.", 

274 ) 

275 add_option_ini( 

276 "--log-auto-indent", 

277 dest="log_auto_indent", 

278 default=None, 

279 help="Auto-indent multiline messages passed to the logging module. Accepts true|on, false|off or an integer.", 

280 ) 

281 

282 

283_HandlerType = TypeVar("_HandlerType", bound=logging.Handler) 

284 

285 

286# Not using @contextmanager for performance reasons. 

287class catching_logs: 

288 """Context manager that prepares the whole logging machinery properly.""" 

289 

290 __slots__ = ("handler", "level", "orig_level") 

291 

292 def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None: 

293 self.handler = handler 

294 self.level = level 

295 

296 def __enter__(self): 

297 root_logger = logging.getLogger() 

298 if self.level is not None: 

299 self.handler.setLevel(self.level) 

300 root_logger.addHandler(self.handler) 

301 if self.level is not None: 

302 self.orig_level = root_logger.level 

303 root_logger.setLevel(min(self.orig_level, self.level)) 

304 return self.handler 

305 

306 def __exit__(self, type, value, traceback): 

307 root_logger = logging.getLogger() 

308 if self.level is not None: 

309 root_logger.setLevel(self.orig_level) 

310 root_logger.removeHandler(self.handler) 

311 

312 

313class LogCaptureHandler(logging.StreamHandler): 

314 """A logging handler that stores log records and the log text.""" 

315 

316 stream = None # type: StringIO 

317 

318 def __init__(self) -> None: 

319 """Creates a new log handler.""" 

320 super().__init__(StringIO()) 

321 self.records = [] # type: List[logging.LogRecord] 

322 

323 def emit(self, record: logging.LogRecord) -> None: 

324 """Keep the log records in a list in addition to the log text.""" 

325 self.records.append(record) 

326 super().emit(record) 

327 

328 def reset(self) -> None: 

329 self.records = [] 

330 self.stream = StringIO() 

331 

332 def handleError(self, record: logging.LogRecord) -> None: 

333 if logging.raiseExceptions: 

334 # Fail the test if the log message is bad (emit failed). 

335 # The default behavior of logging is to print "Logging error" 

336 # to stderr with the call stack and some extra details. 

337 # pytest wants to make such mistakes visible during testing. 

338 raise 

339 

340 

341class LogCaptureFixture: 

342 """Provides access and control of log capturing.""" 

343 

344 def __init__(self, item: nodes.Node) -> None: 

345 """Creates a new funcarg.""" 

346 self._item = item 

347 # dict of log name -> log level 

348 self._initial_handler_level = None # type: Optional[int] 

349 self._initial_logger_levels = {} # type: Dict[Optional[str], int] 

350 

351 def _finalize(self) -> None: 

352 """Finalizes the fixture. 

353 

354 This restores the log levels changed by :meth:`set_level`. 

355 """ 

356 # restore log levels 

357 if self._initial_handler_level is not None: 

358 self.handler.setLevel(self._initial_handler_level) 

359 for logger_name, level in self._initial_logger_levels.items(): 

360 logger = logging.getLogger(logger_name) 

361 logger.setLevel(level) 

362 

363 @property 

364 def handler(self) -> LogCaptureHandler: 

365 """ 

366 :rtype: LogCaptureHandler 

367 """ 

368 return self._item._store[caplog_handler_key] 

369 

370 def get_records(self, when: str) -> List[logging.LogRecord]: 

371 """ 

372 Get the logging records for one of the possible test phases. 

373 

374 :param str when: 

375 Which test phase to obtain the records from. Valid values are: "setup", "call" and "teardown". 

376 

377 :rtype: List[logging.LogRecord] 

378 :return: the list of captured records at the given stage 

379 

380 .. versionadded:: 3.4 

381 """ 

382 return self._item._store[caplog_records_key].get(when, []) 

383 

384 @property 

385 def text(self) -> str: 

386 """Returns the formatted log text.""" 

387 return _remove_ansi_escape_sequences(self.handler.stream.getvalue()) 

388 

389 @property 

390 def records(self) -> List[logging.LogRecord]: 

391 """Returns the list of log records.""" 

392 return self.handler.records 

393 

394 @property 

395 def record_tuples(self) -> List[Tuple[str, int, str]]: 

396 """Returns a list of a stripped down version of log records intended 

397 for use in assertion comparison. 

398 

399 The format of the tuple is: 

400 

401 (logger_name, log_level, message) 

402 """ 

403 return [(r.name, r.levelno, r.getMessage()) for r in self.records] 

404 

405 @property 

406 def messages(self) -> List[str]: 

407 """Returns a list of format-interpolated log messages. 

408 

409 Unlike 'records', which contains the format string and parameters for interpolation, log messages in this list 

410 are all interpolated. 

411 Unlike 'text', which contains the output from the handler, log messages in this list are unadorned with 

412 levels, timestamps, etc, making exact comparisons more reliable. 

413 

414 Note that traceback or stack info (from :func:`logging.exception` or the `exc_info` or `stack_info` arguments 

415 to the logging functions) is not included, as this is added by the formatter in the handler. 

416 

417 .. versionadded:: 3.7 

418 """ 

419 return [r.getMessage() for r in self.records] 

420 

421 def clear(self) -> None: 

422 """Reset the list of log records and the captured log text.""" 

423 self.handler.reset() 

424 

425 def set_level(self, level: Union[int, str], logger: Optional[str] = None) -> None: 

426 """Sets the level for capturing of logs. The level will be restored to its previous value at the end of 

427 the test. 

428 

429 :param int level: the logger to level. 

430 :param str logger: the logger to update the level. If not given, the root logger level is updated. 

431 

432 .. versionchanged:: 3.4 

433 The levels of the loggers changed by this function will be restored to their initial values at the 

434 end of the test. 

435 """ 

436 logger_obj = logging.getLogger(logger) 

437 # save the original log-level to restore it during teardown 

438 self._initial_logger_levels.setdefault(logger, logger_obj.level) 

439 logger_obj.setLevel(level) 

440 if self._initial_handler_level is None: 

441 self._initial_handler_level = self.handler.level 

442 self.handler.setLevel(level) 

443 

444 @contextmanager 

445 def at_level( 

446 self, level: int, logger: Optional[str] = None 

447 ) -> Generator[None, None, None]: 

448 """Context manager that sets the level for capturing of logs. After the end of the 'with' statement the 

449 level is restored to its original value. 

450 

451 :param int level: the logger to level. 

452 :param str logger: the logger to update the level. If not given, the root logger level is updated. 

453 """ 

454 logger_obj = logging.getLogger(logger) 

455 orig_level = logger_obj.level 

456 logger_obj.setLevel(level) 

457 handler_orig_level = self.handler.level 

458 self.handler.setLevel(level) 

459 try: 

460 yield 

461 finally: 

462 logger_obj.setLevel(orig_level) 

463 self.handler.setLevel(handler_orig_level) 

464 

465 

466@pytest.fixture 

467def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture, None, None]: 

468 """Access and control log capturing. 

469 

470 Captured logs are available through the following properties/methods:: 

471 

472 * caplog.messages -> list of format-interpolated log messages 

473 * caplog.text -> string containing formatted log output 

474 * caplog.records -> list of logging.LogRecord instances 

475 * caplog.record_tuples -> list of (logger_name, level, message) tuples 

476 * caplog.clear() -> clear captured records and formatted log output string 

477 """ 

478 result = LogCaptureFixture(request.node) 

479 yield result 

480 result._finalize() 

481 

482 

483def get_log_level_for_setting(config: Config, *setting_names: str) -> Optional[int]: 

484 for setting_name in setting_names: 

485 log_level = config.getoption(setting_name) 

486 if log_level is None: 

487 log_level = config.getini(setting_name) 

488 if log_level: 

489 break 

490 else: 

491 return None 

492 

493 if isinstance(log_level, str): 

494 log_level = log_level.upper() 

495 try: 

496 return int(getattr(logging, log_level, log_level)) 

497 except ValueError as e: 

498 # Python logging does not recognise this as a logging level 

499 raise pytest.UsageError( 

500 "'{}' is not recognized as a logging level name for " 

501 "'{}'. Please consider passing the " 

502 "logging level num instead.".format(log_level, setting_name) 

503 ) from e 

504 

505 

506# run after terminalreporter/capturemanager are configured 

507@pytest.hookimpl(trylast=True) 

508def pytest_configure(config: Config) -> None: 

509 config.pluginmanager.register(LoggingPlugin(config), "logging-plugin") 

510 

511 

512class LoggingPlugin: 

513 """Attaches to the logging module and captures log messages for each test. 

514 """ 

515 

516 def __init__(self, config: Config) -> None: 

517 """Creates a new plugin to capture log messages. 

518 

519 The formatter can be safely shared across all handlers so 

520 create a single one for the entire test session here. 

521 """ 

522 self._config = config 

523 

524 # Report logging. 

525 self.formatter = self._create_formatter( 

526 get_option_ini(config, "log_format"), 

527 get_option_ini(config, "log_date_format"), 

528 get_option_ini(config, "log_auto_indent"), 

529 ) 

530 self.log_level = get_log_level_for_setting(config, "log_level") 

531 self.caplog_handler = LogCaptureHandler() 

532 self.caplog_handler.setFormatter(self.formatter) 

533 self.report_handler = LogCaptureHandler() 

534 self.report_handler.setFormatter(self.formatter) 

535 

536 # File logging. 

537 self.log_file_level = get_log_level_for_setting(config, "log_file_level") 

538 log_file = get_option_ini(config, "log_file") or os.devnull 

539 if log_file != os.devnull: 

540 directory = os.path.dirname(os.path.abspath(log_file)) 

541 if not os.path.isdir(directory): 

542 os.makedirs(directory) 

543 

544 self.log_file_handler = _FileHandler(log_file, mode="w", encoding="UTF-8") 

545 log_file_format = get_option_ini(config, "log_file_format", "log_format") 

546 log_file_date_format = get_option_ini( 

547 config, "log_file_date_format", "log_date_format" 

548 ) 

549 

550 log_file_formatter = logging.Formatter( 

551 log_file_format, datefmt=log_file_date_format 

552 ) 

553 self.log_file_handler.setFormatter(log_file_formatter) 

554 

555 # CLI/live logging. 

556 self.log_cli_level = get_log_level_for_setting( 

557 config, "log_cli_level", "log_level" 

558 ) 

559 if self._log_cli_enabled(): 

560 terminal_reporter = config.pluginmanager.get_plugin("terminalreporter") 

561 capture_manager = config.pluginmanager.get_plugin("capturemanager") 

562 # if capturemanager plugin is disabled, live logging still works. 

563 self.log_cli_handler = _LiveLoggingStreamHandler( 

564 terminal_reporter, capture_manager 

565 ) # type: Union[_LiveLoggingStreamHandler, _LiveLoggingNullHandler] 

566 else: 

567 self.log_cli_handler = _LiveLoggingNullHandler() 

568 log_cli_formatter = self._create_formatter( 

569 get_option_ini(config, "log_cli_format", "log_format"), 

570 get_option_ini(config, "log_cli_date_format", "log_date_format"), 

571 get_option_ini(config, "log_auto_indent"), 

572 ) 

573 self.log_cli_handler.setFormatter(log_cli_formatter) 

574 

575 def _create_formatter(self, log_format, log_date_format, auto_indent): 

576 # color option doesn't exist if terminal plugin is disabled 

577 color = getattr(self._config.option, "color", "no") 

578 if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search( 

579 log_format 

580 ): 

581 formatter = ColoredLevelFormatter( 

582 create_terminal_writer(self._config), log_format, log_date_format 

583 ) # type: logging.Formatter 

584 else: 

585 formatter = logging.Formatter(log_format, log_date_format) 

586 

587 formatter._style = PercentStyleMultiline( 

588 formatter._style._fmt, auto_indent=auto_indent 

589 ) 

590 

591 return formatter 

592 

593 def set_log_path(self, fname: str) -> None: 

594 """Public method, which can set filename parameter for 

595 Logging.FileHandler(). Also creates parent directory if 

596 it does not exist. 

597 

598 .. warning:: 

599 Please considered as an experimental API. 

600 """ 

601 fpath = Path(fname) 

602 

603 if not fpath.is_absolute(): 

604 fpath = Path(str(self._config.rootdir), fpath) 

605 

606 if not fpath.parent.exists(): 

607 fpath.parent.mkdir(exist_ok=True, parents=True) 

608 

609 stream = fpath.open(mode="w", encoding="UTF-8") 

610 if sys.version_info >= (3, 7): 

611 old_stream = self.log_file_handler.setStream(stream) 

612 else: 

613 old_stream = self.log_file_handler.stream 

614 self.log_file_handler.acquire() 

615 try: 

616 self.log_file_handler.flush() 

617 self.log_file_handler.stream = stream 

618 finally: 

619 self.log_file_handler.release() 

620 if old_stream: 

621 old_stream.close() 

622 

623 def _log_cli_enabled(self): 

624 """Return whether live logging is enabled.""" 

625 enabled = self._config.getoption( 

626 "--log-cli-level" 

627 ) is not None or self._config.getini("log_cli") 

628 if not enabled: 

629 return False 

630 

631 terminal_reporter = self._config.pluginmanager.get_plugin("terminalreporter") 

632 if terminal_reporter is None: 

633 # terminal reporter is disabled e.g. by pytest-xdist. 

634 return False 

635 

636 return True 

637 

638 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

639 def pytest_sessionstart(self) -> Generator[None, None, None]: 

640 self.log_cli_handler.set_when("sessionstart") 

641 

642 with catching_logs(self.log_cli_handler, level=self.log_cli_level): 

643 with catching_logs(self.log_file_handler, level=self.log_file_level): 

644 yield 

645 

646 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

647 def pytest_collection(self) -> Generator[None, None, None]: 

648 self.log_cli_handler.set_when("collection") 

649 

650 with catching_logs(self.log_cli_handler, level=self.log_cli_level): 

651 with catching_logs(self.log_file_handler, level=self.log_file_level): 

652 yield 

653 

654 @pytest.hookimpl(hookwrapper=True) 

655 def pytest_runtestloop(self, session: Session) -> Generator[None, None, None]: 

656 """Runs all collected test items.""" 

657 

658 if session.config.option.collectonly: 

659 yield 

660 return 

661 

662 if self._log_cli_enabled() and self._config.getoption("verbose") < 1: 

663 # setting verbose flag is needed to avoid messy test progress output 

664 self._config.option.verbose = 1 

665 

666 with catching_logs(self.log_cli_handler, level=self.log_cli_level): 

667 with catching_logs(self.log_file_handler, level=self.log_file_level): 

668 yield # run all the tests 

669 

670 @pytest.hookimpl 

671 def pytest_runtest_logstart(self) -> None: 

672 self.log_cli_handler.reset() 

673 self.log_cli_handler.set_when("start") 

674 

675 @pytest.hookimpl 

676 def pytest_runtest_logreport(self) -> None: 

677 self.log_cli_handler.set_when("logreport") 

678 

679 def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None, None, None]: 

680 """Implements the internals of pytest_runtest_xxx() hook.""" 

681 with catching_logs( 

682 self.caplog_handler, level=self.log_level, 

683 ) as caplog_handler, catching_logs( 

684 self.report_handler, level=self.log_level, 

685 ) as report_handler: 

686 caplog_handler.reset() 

687 report_handler.reset() 

688 item._store[caplog_records_key][when] = caplog_handler.records 

689 item._store[caplog_handler_key] = caplog_handler 

690 

691 yield 

692 

693 log = report_handler.stream.getvalue().strip() 

694 item.add_report_section(when, "log", log) 

695 

696 @pytest.hookimpl(hookwrapper=True) 

697 def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None, None, None]: 

698 self.log_cli_handler.set_when("setup") 

699 

700 empty = {} # type: Dict[str, List[logging.LogRecord]] 

701 item._store[caplog_records_key] = empty 

702 yield from self._runtest_for(item, "setup") 

703 

704 @pytest.hookimpl(hookwrapper=True) 

705 def pytest_runtest_call(self, item: nodes.Item) -> Generator[None, None, None]: 

706 self.log_cli_handler.set_when("call") 

707 

708 yield from self._runtest_for(item, "call") 

709 

710 @pytest.hookimpl(hookwrapper=True) 

711 def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None, None, None]: 

712 self.log_cli_handler.set_when("teardown") 

713 

714 yield from self._runtest_for(item, "teardown") 

715 del item._store[caplog_records_key] 

716 del item._store[caplog_handler_key] 

717 

718 @pytest.hookimpl 

719 def pytest_runtest_logfinish(self) -> None: 

720 self.log_cli_handler.set_when("finish") 

721 

722 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 

723 def pytest_sessionfinish(self) -> Generator[None, None, None]: 

724 self.log_cli_handler.set_when("sessionfinish") 

725 

726 with catching_logs(self.log_cli_handler, level=self.log_cli_level): 

727 with catching_logs(self.log_file_handler, level=self.log_file_level): 

728 yield 

729 

730 @pytest.hookimpl 

731 def pytest_unconfigure(self) -> None: 

732 # Close the FileHandler explicitly. 

733 # (logging.shutdown might have lost the weakref?!) 

734 self.log_file_handler.close() 

735 

736 

737class _FileHandler(logging.FileHandler): 

738 """ 

739 Custom FileHandler with pytest tweaks. 

740 """ 

741 

742 def handleError(self, record: logging.LogRecord) -> None: 

743 # Handled by LogCaptureHandler. 

744 pass 

745 

746 

747class _LiveLoggingStreamHandler(logging.StreamHandler): 

748 """ 

749 Custom StreamHandler used by the live logging feature: it will write a newline before the first log message 

750 in each test. 

751 

752 During live logging we must also explicitly disable stdout/stderr capturing otherwise it will get captured 

753 and won't appear in the terminal. 

754 """ 

755 

756 # Officially stream needs to be a IO[str], but TerminalReporter 

757 # isn't. So force it. 

758 stream = None # type: TerminalReporter # type: ignore 

759 

760 def __init__( 

761 self, 

762 terminal_reporter: TerminalReporter, 

763 capture_manager: Optional[CaptureManager], 

764 ) -> None: 

765 """ 

766 :param _pytest.terminal.TerminalReporter terminal_reporter: 

767 :param _pytest.capture.CaptureManager capture_manager: 

768 """ 

769 logging.StreamHandler.__init__(self, stream=terminal_reporter) # type: ignore[arg-type] 

770 self.capture_manager = capture_manager 

771 self.reset() 

772 self.set_when(None) 

773 self._test_outcome_written = False 

774 

775 def reset(self) -> None: 

776 """Reset the handler; should be called before the start of each test""" 

777 self._first_record_emitted = False 

778 

779 def set_when(self, when: Optional[str]) -> None: 

780 """Prepares for the given test phase (setup/call/teardown)""" 

781 self._when = when 

782 self._section_name_shown = False 

783 if when == "start": 

784 self._test_outcome_written = False 

785 

786 def emit(self, record: logging.LogRecord) -> None: 

787 ctx_manager = ( 

788 self.capture_manager.global_and_fixture_disabled() 

789 if self.capture_manager 

790 else nullcontext() 

791 ) 

792 with ctx_manager: 

793 if not self._first_record_emitted: 

794 self.stream.write("\n") 

795 self._first_record_emitted = True 

796 elif self._when in ("teardown", "finish"): 

797 if not self._test_outcome_written: 

798 self._test_outcome_written = True 

799 self.stream.write("\n") 

800 if not self._section_name_shown and self._when: 

801 self.stream.section("live log " + self._when, sep="-", bold=True) 

802 self._section_name_shown = True 

803 super().emit(record) 

804 

805 def handleError(self, record: logging.LogRecord) -> None: 

806 # Handled by LogCaptureHandler. 

807 pass 

808 

809 

810class _LiveLoggingNullHandler(logging.NullHandler): 

811 """A handler used when live logging is disabled.""" 

812 

813 def reset(self) -> None: 

814 pass 

815 

816 def set_when(self, when: str) -> None: 

817 pass 

818 

819 def handleError(self, record: logging.LogRecord) -> None: 

820 # Handled by LogCaptureHandler. 

821 pass