Source code for runtimepy.net.stream.base

"""
A module implementing a base, stream-oriented connection interface.
"""

# built-in
from io import BytesIO as _BytesIO
from typing import BinaryIO as _BinaryIO

# third-party
from vcorelib.io import BinaryMessage

# internal
from runtimepy.message import MessageProcessor
from runtimepy.net.connection import Connection as _Connection


[docs] class PrefixedMessageConnection(_Connection): """ A connection for handling inter-frame message size prefixes for some stream-oriented protocols. """ processor: MessageProcessor
[docs] def init(self) -> None: """Initialize this instance.""" # Header parsing. self.processor = MessageProcessor(byte_order=self.byte_order)
def _send_message( self, data: BinaryMessage, addr: tuple[str, int] = None ) -> None: """Underlying data send.""" del addr self.send_binary(data)
[docs] def send_message( self, data: BinaryMessage, addr: tuple[str, int] = None ) -> None: """Handle inter-message prefixes for outgoing messages.""" with _BytesIO() as stream: self.processor.encode(stream, data) self._send_message(stream.getvalue(), addr=addr)
[docs] def send_message_str( self, data: str, addr: tuple[str, int] = None ) -> None: """Convert a message to bytes before sending.""" self.send_message(data.encode(), addr=addr)
[docs] async def process_single( self, stream: _BinaryIO, addr: tuple[str, int] = None ) -> bool: """Process a single message.""" del stream del addr return True
[docs] async def process_binary( self, data: BinaryMessage, addr: tuple[str, int] = None ) -> bool: """Process an incoming message.""" result = True for message in self.processor.process(data): with _BytesIO(message) as stream: result &= await self.process_single(stream, addr=addr) return result