Source code for runtimepy.codec.protocol.receiver

"""
A module implementing an interface for receiving struct messages.
"""

# built-in
from io import BytesIO
import os
from typing import Callable, Optional

# third-party
from vcorelib.logging import LoggerMixin
from vcorelib.math import RateTracker, default_time_ns
from vcorelib.names import to_snake

# internal
from runtimepy.codec.protocol import Protocol, ProtocolFactory
from runtimepy.net.arbiter.housekeeping import ConnectionMetricsPoller
from runtimepy.primitives.byte_order import ByteOrder
from runtimepy.primitives.float import Double
from runtimepy.primitives.int import UnsignedInt

StructHandler = Callable[[Protocol], None]
NonStructHandler = Callable[[BytesIO], bool]

NON_STRUCT_ID = 0


[docs] def null_struct_handler(_: Protocol) -> None: """Takes no action."""
[docs] class StructReceiver(LoggerMixin): """A class for sending and receiving struct messages.""" non_struct_message_prefix: bytes id_primitive: UnsignedInt byte_order: ByteOrder def __init__(self, *factories: type[ProtocolFactory]) -> None: """Initialize this instance.""" super().__init__() self.non_struct_handler: Optional[NonStructHandler] = None self.handlers: dict[int, StructHandler] = {} self.instances: dict[int, Protocol] = {} self.rates: dict[int, RateTracker] = {} self.rate_primitives: dict[int, Double] = {} for factory in factories: self.register(factory)
[docs] def add_non_struct_handler(self, handler: NonStructHandler) -> None: """Set the non-struct handler for this instance.""" assert self.non_struct_handler is None self.non_struct_handler = handler
[docs] def add_handler( self, identifier: int, handler: StructHandler = null_struct_handler ) -> None: """Add a struct message handler.""" assert identifier not in self.handlers assert identifier != NON_STRUCT_ID self.handlers[identifier] = handler
[docs] def register(self, factory: type[ProtocolFactory]) -> None: """Track a protocol factory's structure by identifier.""" inst = factory.singleton() assert inst.id != NON_STRUCT_ID if not hasattr(self, "id_primitive"): self.id_primitive = inst.id_primitive.copy() # type: ignore self.byte_order = inst.byte_order self.non_struct_message_prefix = self.id_primitive.kind.encode( NON_STRUCT_ID, byte_order=self.byte_order ) else: assert self.id_primitive.kind == inst.id_primitive.kind assert self.byte_order == inst.byte_order assert inst.id not in self.instances self.instances[inst.id] = inst # Rate tracking. self.rates[inst.id] = RateTracker() prim = Double() self.rate_primitives[inst.id] = prim name = f"{to_snake(factory.__name__)}.rx_hz" assert name not in ConnectionMetricsPoller.extra_channels, name ConnectionMetricsPoller.extra_channels[name] = prim
[docs] def process(self, data: bytes) -> None: """Attempt to process a struct message.""" timestamp_ns = default_time_ns() with BytesIO(data) as stream: stream.seek(0, os.SEEK_END) end_pos = stream.tell() stream.seek(0, os.SEEK_SET) while stream.tell() < end_pos: ident = self.id_primitive.from_stream( stream, byte_order=self.byte_order, timestamp_ns=timestamp_ns, ) # Handle non-struct messages. if ident == NON_STRUCT_ID: if self.non_struct_handler is not None: if not self.non_struct_handler(stream): self.logger.error( "Parsing non-struct message failed." ) stream.seek(0, os.SEEK_END) else: self.logger.error( "No handler for non-struct messages." ) stream.seek(0, os.SEEK_END) # Handle struct messages. elif ident in self.instances: self.rate_primitives[ident](self.rates[ident]()) inst = self.instances[ident] inst.from_stream(stream, timestamp_ns=timestamp_ns) if ident in self.handlers: self.handlers[ident](inst) else: self.logger.warning( "No message handler for struct '%d' (%s).", ident, inst, ) # Can't continue reading if we don't know this identifier. else: self.logger.error("Unknown struct identifier '%d'.", ident) stream.seek(0, os.SEEK_END)