Source code for runtimepy.metrics.connection

"""
A module implementing a connection-metrics interface.
"""

# internal
from runtimepy.metrics.channel import ChannelMetrics


[docs] class ConnectionMetrics: """Metrics for a network connection.""" def __init__(self) -> None: """Initialize this instance.""" self.tx = ChannelMetrics() self.rx = ChannelMetrics()
[docs] def update(self, other: "ConnectionMetrics") -> None: """Update values in this instance from values in another instance.""" self.tx.update(other.tx) self.rx.update(other.rx)
[docs] def reset(self) -> None: """Reset metrics.""" self.tx.reset() self.rx.reset()
[docs] def poll(self, time_ns: int = None) -> None: """Poll channels.""" self.tx.poll(time_ns=time_ns) self.rx.poll(time_ns=time_ns)