Source code for runtimepy.message

"""
A module implementing a message-stream processing interface.
"""

# built-in
from io import BytesIO as _BytesIO
from json import JSONEncoder, dumps, loads
from typing import Any
from typing import Iterator as _Iterator

# third-party
from vcorelib.io import BinaryMessage, ByteFifo

# internal
from runtimepy.primitives import Uint32, UnsignedInt
from runtimepy.primitives.byte_order import DEFAULT_BYTE_ORDER, ByteOrder

JsonMessage = dict[str, Any]


[docs] class StrFallbackJSONEncoder(JSONEncoder): """Custom JSON encoder."""
[docs] def default(self, o): """Use a string conversion if necessary.""" try: return super().default(o) except TypeError: return str(o)
[docs] class MessageProcessor: """A class for parsing size-delimited messages.""" message_length_kind: type[UnsignedInt] = Uint32 def __init__(self, byte_order: ByteOrder = DEFAULT_BYTE_ORDER) -> None: """Initialize this instance.""" self.byte_order = byte_order # Header parsing. self.buffer = ByteFifo() self.reading_header = True self.message_length_in = self.message_length_kind() self.prefix_size = self.message_length_in.size self.message_length_out = self.message_length_kind()
[docs] def encode(self, stream: _BytesIO, data: BinaryMessage | str) -> None: """Encode a message to a stream.""" if isinstance(data, str): data = data.encode() self.message_length_out.value = len(data) self.message_length_out.to_stream(stream, byte_order=self.byte_order) stream.write(data)
[docs] def encode_json(self, stream: _BytesIO, data: JsonMessage) -> None: """Encode a message as JSON.""" self.encode( stream, dumps(data, cls=StrFallbackJSONEncoder, separators=(",", ":")), )
[docs] def messages(self, data: bytes) -> _Iterator[JsonMessage]: """Iterate over incoming messages.""" for message in self.process(data): yield loads(message.decode())
[docs] def process(self, data: BinaryMessage) -> _Iterator[bytearray]: """Process an incoming message.""" self.buffer.ingest(data) can_continue = True while can_continue: # Read the message size. if self.reading_header: size = self.buffer.pop(self.prefix_size) if size is not None: assert len(size) == self.prefix_size self.message_length_in.update( size, byte_order=self.byte_order ) self.reading_header = False else: can_continue = False # Read the message payload. else: message = self.buffer.pop(self.message_length_in.value) if message is not None: yield message self.reading_header = True else: can_continue = False