Source code for leasepool._process_logging
from __future__ import annotations
import logging
from collections.abc import Callable
from dataclasses import dataclass
from logging.handlers import QueueHandler, QueueListener
from typing import Any
[docs]
@dataclass(frozen=True, slots=True)
class ProcessLoggingConfig:
"""Configuration for forwarding ProcessPoolExecutor worker logs.
This is intentionally opt-in. Normal leasepool logs are emitted in the parent
process through normal Python loggers and do not need this bridge.
Attributes:
enabled: Enable child-process log forwarding.
level: Minimum level configured on the child-process root logger.
target_logger: Parent-process logger that receives records from workers.
If omitted, LeasedExecutorManager's logger is used.
clear_child_handlers: Remove inherited/preconfigured child handlers before
installing the queue handler. This avoids duplicate child logs after
fork and prevents children from writing directly to stderr.
"""
enabled: bool = False
level: int | str = logging.INFO
target_logger: logging.Logger | None = None
clear_child_handlers: bool = True
[docs]
def coerce_log_level(level: int | str) -> int:
"""Return a numeric logging level from an int or standard level name."""
if isinstance(level, str):
try:
resolved = logging.getLevelName(level.upper())
if isinstance(resolved, int):
return resolved
raise ValueError(f"Unknown logging level: {level!r}")
except Exception as e:
raise ValueError(f"Invalid logging level: {level!r}") from e
return int(level)
[docs]
class LoggerForwardingHandler(logging.Handler):
"""QueueListener target that forwards records into a parent logger.
QueueListener normally writes records directly to concrete handlers. This
handler instead re-enters the parent's logger hierarchy, so the application
keeps control over formatters, handlers, filters, propagation, and levels.
"""
[docs]
def __init__(self, target_logger: logging.Logger):
"""Initialize the handler with a target logger.
Args:
target_logger (logging.Logger): The parent logger to which records will be forwarded.
"""
super().__init__(level=logging.NOTSET)
self._target_logger = target_logger
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Forward a log record to the target logger.
Args:
record (logging.LogRecord): The log record to be forwarded.
"""
if not self._target_logger.isEnabledFor(record.levelno):
return
self._target_logger.handle(record)
[docs]
def process_worker_initializer(
log_queue: Any | None,
level: int | str,
clear_existing_handlers: bool,
user_initializer: Callable[..., Any] | None,
user_initargs: tuple[Any, ...],
) -> None:
"""ProcessPoolExecutor initializer used by leasepool.
It first configures child-process logging, then calls the user's original
initializer, if one was supplied to ProcessPoolExecutor.
Args:
log_queue: The multiprocessing queue to which log records will be sent. If None, logging is not configured.
level: Minimum logging level for the worker process root logger.
clear_existing_handlers: Whether to remove existing handlers from the root logger before adding the QueueHandler.
user_initializer: The original initializer function supplied by the user to ProcessPoolExecutor, if any.
user_initargs: The original initializer arguments supplied by the user to ProcessPoolExecutor, if any.
"""
if log_queue is not None:
configure_process_worker_logging(
log_queue,
level=level,
clear_existing_handlers=clear_existing_handlers,
)
if user_initializer is not None:
user_initializer(*user_initargs)
[docs]
def build_queue_listener(
*,
log_queue: Any,
target_logger: logging.Logger,
) -> QueueListener:
"""Create the parent-side listener for child-process log records.
Args:
log_queue: The multiprocessing queue from which log records will be received.
target_logger: The parent logger to which received log records will be forwarded.
"""
return QueueListener(
log_queue,
LoggerForwardingHandler(target_logger),
respect_handler_level=True,
)