Source code for runtimepy.net.tcp.protocol
"""
A module implementing a Protocol for TcpConnection.
"""
# built-in
from asyncio import BaseTransport as _BaseTransport
from asyncio import Protocol as _Protocol
from logging import getLogger as _getLogger
from typing import Optional as _Optional
# third-party
from vcorelib.io import BinaryMessage
from vcorelib.logging import LoggerType as _LoggerType
# internal
from runtimepy.net.connection import Connection as _Connection
from runtimepy.net.mixin import (
BinaryMessageQueueMixin as _BinaryMessageQueueMixin,
)
from runtimepy.net.mixin import TransportMixin as _TransportMixin
[docs]
class QueueProtocol(_BinaryMessageQueueMixin, _Protocol):
"""A simple streaming protocol that populates a message queue."""
logger: _LoggerType
conn: _Connection
[docs]
def data_received(self, data: BinaryMessage) -> None:
"""Handle incoming data."""
self.queue.put_nowait(data)
[docs]
def connection_made(self, transport: _BaseTransport) -> None:
"""Log the connection establishment."""
self.logger = _getLogger(
_TransportMixin(transport).logger_name("TCP ")
)
self.logger.info("Connected.")
[docs]
def connection_lost(self, exc: _Optional[Exception]) -> None:
"""Log the disconnection."""
msg = "Disconnected." if exc is None else f"Disconnected: '{exc}'."
self.logger.info(msg)
self.conn.disable("disconnected")