Source code for runtimepy.net.stream.string
"""
A module implementing a string-message stream interface.
"""
# built-in
from typing import BinaryIO as _BinaryIO
# internal
from runtimepy.net.stream.base import PrefixedMessageConnection
[docs]
class StringMessageConnection(PrefixedMessageConnection):
"""A simple string-message sending and processing connection."""
[docs]
async def process_message(
self, data: str, addr: tuple[str, int] = None
) -> bool:
"""Process a string message."""
del addr
self.logger.info(data)
return True
[docs]
async def process_single(
self, stream: _BinaryIO, addr: tuple[str, int] = None
) -> bool:
"""Process a single message."""
return await self.process_message(stream.read().decode(), addr=addr)