Source code for runtimepy.telemetry.sample
"""
A module implementing a simple telemetry sample interface.
"""
# internal
from runtimepy.channel.environment.sample import (
long_name_int_enum,
sample_bool_enum,
sample_fields,
sample_float,
sample_int_enum,
)
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.net.arbiter import AppInfo
from runtimepy.net.arbiter.struct import (
TimestampedStruct,
UdpStructTransceiver,
)
from runtimepy.net.arbiter.task import ArbiterTask, TaskFactory
from runtimepy.net.arbiter.udp import UdpConnectionFactory
from runtimepy.primitives import Uint16
from runtimepy.primitives.byte_order import ByteOrder
[docs]
class SampleTelemetryStruct(TimestampedStruct):
"""A sample telemetry struct."""
# Demonstrate simple byte-order control.
byte_order = ByteOrder.LITTLE_ENDIAN
[docs]
def init_env(self) -> None:
"""Initialize this sample environment."""
super().init_env()
sample_int_enum(self.env)
sample_bool_enum(self.env)
long_name_int_enum(self.env)
sample_float(self.env)
sample_fields(self.env)
[docs]
class SampleTelemetryTransceiver(UdpStructTransceiver[SampleTelemetryStruct]):
"""A sample telemetry sending connection."""
struct_kind = SampleTelemetryStruct
[docs]
def handle_update(
self,
timestamp_ns: int,
instance: SampleTelemetryStruct,
addr: tuple[str, int],
) -> None:
"""Handle individual struct updates."""
[docs]
class SampleTelemetry(UdpConnectionFactory[SampleTelemetryTransceiver]):
"""A sample telemetry sending-and-receiving connection factory."""
kind = SampleTelemetryTransceiver
[docs]
class SampleTelemetryTask(ArbiterTask, AsyncCommandProcessingMixin):
"""A sample telemetry task."""
rx_conn: SampleTelemetryTransceiver
tx_conn: SampleTelemetryTransceiver
flush_count: Uint16
[docs]
async def init(self, app: AppInfo) -> None:
"""Initialize this task with application information."""
await super().init(app)
self._setup_async_commands()
self.flush_count = Uint16(value=10)
self.env.channel(
"flush_count",
self.flush_count,
commandable=True,
description="Maximum number of samples before flushing telemetry.",
default=self.flush_count.value,
controls="steps_1_1000",
)
kind = SampleTelemetryTransceiver
# Setup Receive.
pattern = "rx"
self.rx_conn = app.single(pattern=pattern, kind=kind)
assert self.rx_conn.assign_app_rx(pattern, app) is not None
# Setup transmit.
pattern = "tx"
self.tx_conn = app.single(pattern=pattern, kind=kind)
assert self.tx_conn.assign_app_tx(pattern, app) is not None
[docs]
async def dispatch(self) -> bool:
"""Dispatch an iteration of this task."""
self.tx_conn.capture()
if self.metrics.dispatches.value % self.flush_count.value == 0:
self.tx_conn.capture(sample=False, flush=True)
return True
[docs]
class SampleTelemetryPeriodic(TaskFactory[SampleTelemetryTask]):
"""A sample-task application factory."""
kind = SampleTelemetryTask