Source code for runtimepy.net.server.websocket.data

"""
A module implementing a binary data connection for the package's WebSocket
server interface.
"""

# built-in
import asyncio
from json import loads

# internal
from runtimepy.net.stream.string import StringMessageConnection
from runtimepy.net.websocket import WebsocketConnection

DATA_CONNECTIONS: dict[
    str, asyncio.Future["RuntimepyDataWebsocketConnection"]
] = {}


[docs] def data_connection_future( guid: str, ) -> asyncio.Future["RuntimepyDataWebsocketConnection"]: """Get a future for a data connection guid.""" if guid not in DATA_CONNECTIONS: DATA_CONNECTIONS[guid] = asyncio.get_running_loop().create_future() return DATA_CONNECTIONS[guid]
[docs] class RuntimepyDataWebsocketConnection( StringMessageConnection, WebsocketConnection ): """A class implementing a WebSocket connection for streaming raw data."""
[docs] async def process_message( self, data: str, addr: tuple[str, int] = None ) -> bool: """Process a string message.""" del addr message = loads(data) if "ui" in message and "guid" in message["ui"]: data_connection_future(message["ui"]["guid"]).set_result(self) else: self.logger.info(message) return True