Source code for runtimepy.channel.environment.telemetry

"""
A module implementing channel-environment telemetry registration.
"""

# built-in
from contextlib import ExitStack, contextmanager
from typing import BinaryIO, Iterator, Optional, cast

# third-party
from vcorelib.names import name_search

# internal
from runtimepy.channel.environment.base import (
    BaseChannelEnvironment as _BaseChannelEnvironment,
)
from runtimepy.channel.event import PrimitiveEvent
from runtimepy.channel.registry import ParsedEvent
from runtimepy.mapping import DEFAULT_PATTERN
from runtimepy.metrics.channel import ChannelMetrics


[docs] class TelemetryChannelEnvironment(_BaseChannelEnvironment): """A class integrating telemetry streaming.""" events: list[str]
[docs] @contextmanager def registered( self, stream: BinaryIO, pattern: str = DEFAULT_PATTERN, exact: bool = False, flush: bool = False, channel: ChannelMetrics = None, ) -> Iterator[list[str]]: """ Register a stream as a managed context. Returns a list of all channels registered. """ names: list[str] = [] events: list[PrimitiveEvent] = [] # Gather event telemetry emitters for bit-fields. for fields in self.fields.fields: for name in name_search(fields.fields, pattern, exact=exact): names.append(name) field = fields.fields[name] ident = self.channels.names.identifier(name) assert ident is not None, name events.append(PrimitiveEvent(field.raw, ident)) with ExitStack() as stack: # Register bit-field event telemetry. for event in events: stack.enter_context( event.registered( stream, flush=flush, channel=channel, force=True ) ) # Register channel telemetry. names += stack.enter_context( self.channels.registered( stream, pattern=pattern, exact=exact, flush=flush, channel=channel, ) ) yield names
[docs] def ingest(self, point: ParsedEvent) -> None: """ Update internal state based on an event. Note that the event timestamp is not respected. """ if self.fields.has_field(point.name): self.fields[point.name].raw.value = point.value # type: ignore else: self.set(point.name, point.value)
def _parse_channel_event(self, name: str) -> Optional[ParsedEvent]: """Attempt to parse a channel event from the fifo.""" result = None channels = self.channels kind = channels[name].type data = channels.event_fifo.pop(kind.size) if data is not None: result = ParsedEvent( name, cast(int, channels.event_header["timestamp"]), kind.decode( data, byte_order=channels.event_header.byte_order, ), ) return result def _parse_field_event(self, name: str) -> Optional[ParsedEvent]: """Attempt to parse a bit-field event from the fifo.""" result = None field = self.fields[name] kind = field.raw.kind data = self.channels.event_fifo.pop(kind.size) if data is not None: result = ParsedEvent( name, cast(int, self.channels.event_header["timestamp"]), kind.decode( data, byte_order=self.channels.event_header.byte_order, ), ) return result
[docs] def parse_event_stream(self, stream: BinaryIO) -> Iterator[ParsedEvent]: """Parse individual events from a stream.""" channels = self.channels # Ingest stream. channels.event_fifo.ingest(stream.read()) ident = -1 name = "" keep_going = True while keep_going: keep_going = False # Read header. if not channels.header_ready: read_size = channels.event_header.size data = channels.event_fifo.pop(read_size) if data is not None: channels.event_header.update(data) # Update local variables. ident = cast(int, channels.event_header["identifier"]) name = channels.names.name(ident) # type: ignore assert name is not None, ident # Update state. channels.header_ready = True keep_going = True else: event = None # Handle bit-field. if self.fields.has_field(name): event = self._parse_field_event(name) # Handle channel. elif name in channels.items: event = self._parse_channel_event(name) if event is not None: yield event # Update state. channels.header_ready = False keep_going = True