Source code for runtimepy.metrics.channel
"""
A module implementing a channel-metrics interface.
"""
# third-party
from vcorelib.math import RateTracker as _RateTracker
from vcorelib.math import metrics_time_ns as _metrics_time_ns
# internal
from runtimepy.primitives import Float as _Float
from runtimepy.primitives import Uint32 as _Uint32
from runtimepy.primitives import Uint64 as _Uint64
METRICS_DEPTH = 50
[docs]
class ChannelMetrics:
"""Metrics for a network channel."""
def __init__(self) -> None:
"""Initialize this instance."""
self.messages = _Uint32(time_source=_metrics_time_ns)
self.message_rate = _Float(time_source=_metrics_time_ns)
self._message_rate_tracker = _RateTracker(
depth=METRICS_DEPTH, source=_metrics_time_ns
)
self.bytes = _Uint64(time_source=_metrics_time_ns)
self.kbps = _Float(time_source=_metrics_time_ns)
self._kbps_tracker = _RateTracker(
depth=METRICS_DEPTH, source=_metrics_time_ns
)
[docs]
def update(self, other: "ChannelMetrics") -> None:
"""Update values in this instance from values in another instance."""
self.messages.value = other.messages.value
self.message_rate.value = other.message_rate.value
self.bytes.value = other.bytes.value
self.kbps.value = other.kbps.value
def __str__(self) -> str:
"""Get metrics as a string."""
return " | ".join(
[
f"messages={self.messages.value}",
f"message_rate={self.message_rate.value:.2f}",
f"bytes={self.bytes.value}",
f"kbps={self.kbps.value:.2f}",
]
)
[docs]
def poll(self, time_ns: int = None) -> None:
"""Poll kbps tracking."""
self.kbps.value = self._kbps_tracker.poll(time_ns=time_ns)
self.message_rate.value = self._message_rate_tracker.poll(
time_ns=time_ns
)
[docs]
def increment(self, count: int, time_ns: int = None) -> None:
"""Update tracking."""
self.messages.value += 1
self.message_rate.value = self._message_rate_tracker(time_ns=time_ns)
self.bytes.value += count
# Multiply by 8 to get bits from bytes.
self.kbps.value = (
self._kbps_tracker(time_ns=time_ns, value=float(count) / 1000.0)
* 8
)
[docs]
def reset(self) -> None:
"""Reset metrics."""
self.messages.value = 0
self.bytes.value = 0
self.kbps.value = 0.0
self._kbps_tracker.reset()