Source code for runtimepy.metrics.connection
"""
A module implementing a connection-metrics interface.
"""
# internal
from runtimepy.metrics.channel import ChannelMetrics
[docs]
class ConnectionMetrics:
"""Metrics for a network connection."""
def __init__(self) -> None:
"""Initialize this instance."""
self.tx = ChannelMetrics()
self.rx = ChannelMetrics()
[docs]
def update(self, other: "ConnectionMetrics") -> None:
"""Update values in this instance from values in another instance."""
self.tx.update(other.tx)
self.rx.update(other.rx)
[docs]
def reset(self) -> None:
"""Reset metrics."""
self.tx.reset()
self.rx.reset()
[docs]
def poll(self, time_ns: int = None) -> None:
"""Poll channels."""
self.tx.poll(time_ns=time_ns)
self.rx.poll(time_ns=time_ns)