Source code for runtimepy.net.stream
"""
A module aggregating stream-oriented connection interfaces.
"""
# built-in
from typing import BinaryIO as _BinaryIO
# third-party
from vcorelib.io import BinaryMessage
# internal
from runtimepy.net.stream.base import PrefixedMessageConnection
from runtimepy.net.stream.string import StringMessageConnection
from runtimepy.net.tcp.connection import TcpConnection
from runtimepy.net.udp.connection import UdpConnection
__all__ = [
"PrefixedMessageConnection",
"StringMessageConnection",
"TcpPrefixedMessageConnection",
"UdpPrefixedMessageConnection",
"EchoMessageConnection",
"EchoTcpMessageConnection",
"EchoUdpMessageConnection",
"TcpStringMessageConnection",
"UdpStringMessageConnection",
]
[docs]
class TcpPrefixedMessageConnection(PrefixedMessageConnection, TcpConnection):
"""A TCP implementation for size-prefixed messages."""
[docs]
class UdpPrefixedMessageConnection(PrefixedMessageConnection, UdpConnection):
"""A UDP implementation for size-prefixed messages."""
[docs]
async def process_datagram(
self, data: BinaryMessage, addr: tuple[str, int]
) -> bool:
"""Process a datagram."""
return await self.process_binary(data, addr=addr)
def _send_message(
self, data: BinaryMessage, addr: tuple[str, int] = None
) -> None:
"""Underlying data send."""
self.sendto(data, addr=addr)
[docs]
class EchoMessageConnection(PrefixedMessageConnection):
"""A connection that just echoes what it was sent."""
[docs]
async def process_single(
self, stream: _BinaryIO, addr: tuple[str, int] = None
) -> bool:
"""Process a single message."""
self.send_message(stream.read(), addr=addr)
return True
[docs]
class EchoTcpMessageConnection(
TcpPrefixedMessageConnection, EchoMessageConnection
):
"""A connection that just echoes what it was sent."""
[docs]
class EchoUdpMessageConnection(
UdpPrefixedMessageConnection, EchoMessageConnection
):
"""A connection that just echoes what it was sent."""
[docs]
class TcpStringMessageConnection(StringMessageConnection, TcpConnection):
"""A simple string-message sending and processing connection using TCP."""
[docs]
class UdpStringMessageConnection(
StringMessageConnection, UdpPrefixedMessageConnection
):
"""A simple string-message sending and processing connection using UDP."""