"""Factory functions for building stogger components."""
import atexit
import logging
import tomllib
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from queue import Queue
from typing import Any
import structlog
from .config import StoggerConfig
from .core import (
ConsoleFileRenderer,
JSONRenderer,
SelectRenderedString,
TranslationProcessor,
add_caller_info,
add_pid,
process_exc_info,
)
from .processors import build_timestamp_processor
# Get a logger for this module
log = structlog.get_logger(__name__)
[docs]
def build_shared_processors(config: StoggerConfig) -> list[Any]:
"""Builds processors that are shared between sync and async modes."""
if config.verbose:
log.debug(
"building-shared-processors",
translation_dir=str(config.translation_dir) if config.translation_dir else None,
)
processors = [
structlog.stdlib.add_log_level,
# Timestamp processor via central factory function
build_timestamp_processor(config),
add_pid,
add_caller_info,
process_exc_info,
]
if config.translation_dir:
try:
translation_file = config.translation_dir / f"{config.language}.toml"
if config.verbose:
log.debug(
"loading-translations",
file=str(translation_file),
language=config.language,
)
with translation_file.open("rb") as f:
translations = tomllib.load(f)
if config.verbose:
log.debug(
"translations-loaded",
translation_count=len(translations),
language=config.language,
)
processors.append(TranslationProcessor(translations))
except (OSError, tomllib.TOMLDecodeError):
log.warning(
"translation-load-failed",
file=str(translation_file),
_replace_msg="Failed to load translations from {file}",
)
# Add the final renderer
if config.log_format == "json":
processors.append(JSONRenderer())
else:
processors.append(
ConsoleFileRenderer(
format_config=config.format,
min_level="debug" if config.verbose else "info",
show_caller_info=config.show_caller_info,
), # ty: ignore[invalid-argument-type]
)
# Add SelectRenderedString to convert dict output to string for PrintLogger
processors.append(SelectRenderedString(key="console")) # ty: ignore[invalid-argument-type]
if config.verbose:
log.debug("shared-processors-built", processor_count=len(processors))
return processors
[docs]
def build_renderer(config: StoggerConfig) -> ConsoleFileRenderer | JSONRenderer:
"""Builds the final renderer based on the log format."""
log.debug(
"building-renderer",
format=config.log_format,
verbose=config.verbose,
show_caller_info=config.show_caller_info,
)
if config.log_format == "json":
renderer = JSONRenderer()
log.debug(
"json-renderer-created",
min_level="debug" if config.verbose else "info",
)
else:
# Use ConsoleFileRenderer with direct parameters
renderer = ConsoleFileRenderer(format_config=config.format)
log.debug(
"console-renderer-created",
min_level="debug" if config.verbose else "info",
)
return renderer
def _create_file_handler(logdir: str | Path, syslog_identifier: str) -> logging.FileHandler | None:
"""Attempts to create log directory and file handler. Returns None on failure."""
try:
logdir = Path(logdir)
logdir.mkdir(parents=True, exist_ok=True)
log_file = logdir / f"{syslog_identifier}.log"
log.debug("creating-file-handler", log_file=str(log_file))
file_handler = logging.FileHandler(log_file)
log.debug("file-logging-enabled", log_file=str(log_file))
return file_handler
except (OSError, PermissionError):
log.exception("file-logging-setup-failed", logdir=str(logdir))
return None
def _assign_formatters(
handlers: list[logging.Handler],
console_formatter: logging.Formatter,
file_formatter: logging.Formatter | None,
) -> None:
"""Assigns the correct formatter to each handler based on handler type."""
log.debug("assigning-formatters", handler_count=len(handlers))
for handler in handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(
handler,
logging.FileHandler,
):
handler.setFormatter(console_formatter)
elif isinstance(handler, logging.FileHandler):
handler.setFormatter(file_formatter)
else:
handler.setFormatter(console_formatter)
def _configure_async_logging(handlers: list[logging.Handler]) -> None:
"""Sets up QueueHandler/QueueListener with atexit cleanup."""
log.debug("enabling-async-logging", handler_count=len(handlers))
log_queue: Queue = Queue(-1)
queue_handler = QueueHandler(log_queue)
log.debug("starting-queue-listener", handler_count=len(handlers))
listener = QueueListener(log_queue, *handlers)
listener.start()
# Register cleanup handler to stop listener on exit
def cleanup_listener() -> None:
log.debug("stopping-queue-listener", reason="atexit")
listener.stop()
atexit.register(cleanup_listener)
root_logger = logging.getLogger()
root_logger.addHandler(queue_handler)
root_logger.setLevel(logging.DEBUG)
for handler in list(root_logger.handlers):
if handler is not queue_handler:
root_logger.removeHandler(handler)
log.debug("async-logging-configured", handler_count=len(handlers))
def _configure_sync_logging(handlers: list[logging.Handler]) -> None:
"""Configures synchronous logging via basicConfig."""
log.debug("configuring-sync-logging", handler_count=len(handlers))
logging.basicConfig(
level=logging.DEBUG,
handlers=handlers,
force=True, # Override existing config
)
log.debug("sync-logging-configured", handler_count=len(handlers))