Source code for runtimepy.mixins.environment

"""
A module implementing a channel-environment class mixin.
"""

# internal
from runtimepy import METRICS_NAME
from runtimepy.channel.environment import ChannelEnvironment
from runtimepy.metrics import ConnectionMetrics, PeriodicTaskMetrics
from runtimepy.metrics.channel import ChannelMetrics

# 10 Hz metrics.
METRICS_MIN_PERIOD_S = 0.1


[docs] class ChannelEnvironmentMixin: """A simple channel-environment mixin.""" env: ChannelEnvironment def __init__(self, env: ChannelEnvironment = None, **kwargs) -> None: """Initialize this instance.""" if not hasattr(self, "env"): if env is None: env = ChannelEnvironment(**kwargs) self.env = env def __hash__(self) -> int: """Get a hash for this instance.""" return id(self.env)
[docs] def register_task_metrics( self, metrics: PeriodicTaskMetrics, *names: str, namespace: str = METRICS_NAME, ) -> None: """Register periodic task metrics.""" with self.env.names_pushed(namespace, *names): self.env.channel( "dispatches", metrics.dispatches, description="Dispatch call counter for this task.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "rate_hz", metrics.rate_hz, description="Measured dispatch rate in Hertz.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "average_s", metrics.average_s, description="An averaged dispatch time measurement.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "max_s", metrics.max_s, description="Maximum dispatch time measured.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "min_s", metrics.min_s, description="Minimum dispatch time measured.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "overruns", metrics.overruns, description="Dispatch time exceeding period counter.", min_period_s=METRICS_MIN_PERIOD_S, )
[docs] def register_channel_metrics( self, name: str, channel: ChannelMetrics, verb: str ) -> None: """Register individual channel metrics.""" with self.env.names_pushed(name): self.env.channel( "messages", channel.messages, description=f"Number of messages {verb}.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "messages_rate", channel.message_rate, description=f"Messages per second {verb}.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "bytes", channel.bytes, description=f"Number of bytes {verb}.", min_period_s=METRICS_MIN_PERIOD_S, ) self.env.channel( "kbps", channel.kbps, description=f"Kilobits per second {verb}.", min_period_s=METRICS_MIN_PERIOD_S, )
[docs] def register_connection_metrics( self, metrics: ConnectionMetrics, *names: str, namespace: str = METRICS_NAME, ) -> None: """Register connection metrics.""" with self.env.names_pushed(namespace, *names): for name, direction, verb in [ ("tx", metrics.tx, "transmitted"), ("rx", metrics.rx, "received"), ]: self.register_channel_metrics(name, direction, verb)