Source code for runtimepy.net.arbiter.housekeeping

"""
A module implementing various housekeeping tasks for the connection-arbiter
runtime.
"""

# built-in
import asyncio
from typing import Awaitable

# internal
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.net.arbiter.info import AppInfo as _AppInfo
from runtimepy.net.arbiter.task import ArbiterTask as _ArbiterTask
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory
from runtimepy.net.manager import ConnectionManager as _ConnectionManager
from runtimepy.primitives import AnyPrimitive, Bool

TASK_NAME = "housekeeping"


[docs] class ConnectionMetricsPoller(_ArbiterTask): """A class that periodically polls connection metrics.""" processors: list[Awaitable[None]] extra_channels: dict[str, AnyPrimitive] = {} def __init__( self, name: str, manager: _ConnectionManager, **kwargs, ) -> None: """Initialize this task.""" super().__init__(name, **kwargs) self.manager = manager self.processors = [] def _init_state(self) -> None: """Add channels to this instance's channel environment.""" # Channel control for polling connection metrics. self.poll_connection_metrics = Bool() self.env.channel( "poll_connection_metrics", self.poll_connection_metrics, commandable=True, description="Polls application connection metrics when true.", )
[docs] async def init(self, app: _AppInfo) -> None: """Initialize this task with application information.""" await super().init(app) # Register channels. if not self.env.finalized: for key, val in type(self).extra_channels.items(): self.env.channel(key, val)
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" if self.poll_connection_metrics: self.manager.poll_metrics() # Handle any incoming commands. for mapping in ( self.app.connections.values(), self.app.tasks.values(), self.app.structs.values(), ): for item in mapping: if isinstance(item, AsyncCommandProcessingMixin): self.processors.append(item.process_command_queue()) # Service connection tasks. The connection manager should probably do # this on its own at some point. self.processors += list(self.app.conn_manager.connection_tasks) if self.processors: await asyncio.gather(*self.processors) self.processors.clear() return True
[docs] async def init(app: _AppInfo) -> int: """Perform some initialization tasks.""" for task in app.search_tasks(ConnectionMetricsPoller): task.poll_connection_metrics.value = app.config_param( "poll_connection_metrics", False ) return 0
[docs] class ConnectionMetricsLogger(_ArbiterTask): """A task for logging metrics.""" app: _AppInfo def _log(self) -> None: """Log metrics to console.""" for name, conn in self.app.connections.items(): conn.log_metrics(label=name)
[docs] async def init(self, app: _AppInfo) -> None: """Initialize this task with application information.""" self.app = app self._log()
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" self._log() return True
[docs] class ConnectionMetricsLoggerFactory(_TaskFactory[ConnectionMetricsLogger]): """A factory for the connection-metrics logger.""" kind = ConnectionMetricsLogger
[docs] def housekeeping( manager: _ConnectionManager, period_s: float = 0.1, poll_connection_metrics: bool = True, ) -> ConnectionMetricsPoller: """Create a metrics-polling task.""" task = ConnectionMetricsPoller(TASK_NAME, manager, period_s=period_s) task.poll_connection_metrics.value = poll_connection_metrics return task