Source code for runtimepy.mixins.logging

"""
A module implementing a logger-mixin extension.
"""

# built-in
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
import io
import logging
from typing import Any, AsyncIterator, Iterable, Optional

# third-party
import aiofiles
from vcorelib.logging import ListLogger, LoggerMixin, LoggerType
from vcorelib.paths import Pathlike, normalize

# internal
from runtimepy.channel.environment import ChannelEnvironment
from runtimepy.enum.registry import RuntimeIntEnum


[docs] class LogLevel(RuntimeIntEnum): """A runtime enumeration for log level.""" DEBUG = logging.DEBUG INFO = logging.INFO WARNING = logging.WARNING ERROR = logging.ERROR CRITICAL = logging.CRITICAL
[docs] @classmethod def id(cls) -> Optional[int]: """Override in sub-class to coerce enum id.""" return 2
LogLevellike = LogLevel | int | str
[docs] class LoggerMixinLevelControl(LoggerMixin): """A logger mixin that exposes a runtime-controllable level.""" def _log_level_changed(self, _: int, new_value: int) -> None: """Handle a change in log level.""" self.logger.setLevel(new_value) self.logger.log(new_value, "Log level updated to %d.", new_value)
[docs] def setup_level_channel( self, env: ChannelEnvironment, name: str = "log_level", initial: str = "info", description: str = "Text-log level filter for this environment.", ) -> None: """Add a commandable log-level channel to the environment.""" chan = env.int_channel( name, enum=LogLevel.register_enum(env.enums), commandable=True, description=description, ) # Set up change handler. chan[0].raw.register_callback(self._log_level_changed) # Set the initial log level. env.set(name, initial) del chan
LogPaths = Iterable[tuple[LogLevellike, Pathlike]] EXT_LOG_EXTRA = {"external": True}
[docs] def handle_safe_log( logger: LoggerType, level: int, data: str, safe_to_log: bool ) -> None: """handle a log filtering scenario.""" if safe_to_log: logger.log(level, data, extra=EXT_LOG_EXTRA) else: record = logging.LogRecord( logger.name, level, __file__, -1, data, (), None ) record.external = True for handler in logger.handlers: # type: ignore if isinstance(handler, ListLogger): handler.emit(record)
[docs] class FileWatcher: """A class implementing an interface for watching file contents.""" def __init__(self, level: LogLevellike, path: Pathlike) -> None: """Initialize this instance.""" self.level = LogLevel.normalize(level) self.path = path self.stream: Optional[Any] = None self.path_size: int = 0
[docs] async def get_size(self) -> int: """Get ctime for this path.""" return (await aiofiles.os.stat(self.path)).st_size # type: ignore
[docs] async def handle_open(self) -> None: """Handle opening the stream.""" if not self.stream and normalize(self.path).is_file(): self.stream = await aiofiles.open(self.path, mode="r") await self.stream.seek(0, io.SEEK_END) self.path_size = await self.get_size()
[docs] async def poll(self) -> AsyncIterator[tuple[LogLevel, str]]: """Poll stream contents.""" try: # Handle re-opening file if necessary. curr = await self.get_size() if curr < self.path_size: yield LogLevel.WARNING, f"'{self.path}' truncated, re-opening." await self.close() self.path_size = curr await self.handle_open() if self.stream: while line := (await self.stream.readline()).rstrip(): yield self.level, line.rstrip() except FileNotFoundError: await self.close()
[docs] async def close(self) -> None: """Close this stream.""" if self.stream: await self.stream.close() self.stream = None
[docs] @asynccontextmanager @staticmethod async def running( level: LogLevellike, path: Pathlike ) -> AsyncIterator["FileWatcher"]: """Handle closing stream.""" inst = FileWatcher(level, path) await inst.handle_open() try: yield inst finally: await inst.close()
[docs] class LogCaptureMixin: """A simple async file-reading interface.""" logger: LoggerType watchers: list[FileWatcher] # Set false to only forward to ListLogger handlers. Required for when the # system log / process-management logs are being forwarded (otherwise also # logging would lead to infinite spam). safe_to_log = True
[docs] async def init_log_capture( self, stack: AsyncExitStack, log_paths: LogPaths ) -> None: """Initialize this task with application information.""" self.watchers = [ await stack.enter_async_context(FileWatcher.running(level, path)) for level, path in log_paths ]
[docs] def log_line(self, level: int, data: str) -> None: """Log a line for output.""" handle_safe_log(self.logger, level, data, self.safe_to_log)
[docs] async def handle_watcher(self, watcher: FileWatcher) -> None: """Handle any file watcher lines.""" async for level, line in watcher.poll(): self.log_line(level, line)
[docs] async def dispatch_log_capture(self) -> None: """Get the next line from this log stream.""" await asyncio.gather(*(self.handle_watcher(x) for x in self.watchers))