Source code for runtimepy.net.arbiter.factory.connection

"""
A module implementing an interface extension to the base connection-arbiter so
that methods that create connections can be registered by name.
"""

# built-in
import asyncio as _asyncio

# third-party
from vcorelib.names import obj_class_to_snake

# internal
from runtimepy.net.arbiter.base import (
    BaseConnectionArbiter as _BaseConnectionArbiter,
)
from runtimepy.net.arbiter.base import ServerTask as _ServerTask
from runtimepy.net.connection import Connection as _Connection
from runtimepy.net.manager import ConnectionManager as _ConnectionManager
from runtimepy.ui.button import ActionButton


[docs] class ConnectionFactory: """An interface for creating client connections."""
[docs] async def client(self, name: str, *args, **kwargs) -> _Connection: """Create a client connection.""" raise NotImplementedError
[docs] async def server_task( self, stop_sig: _asyncio.Event, manager: _ConnectionManager, started_sem: _asyncio.Semaphore, *args, **kwargs, ) -> _ServerTask: """Create a task that will run a connection server.""" raise NotImplementedError
[docs] class FactoryConnectionArbiter(_BaseConnectionArbiter): """ A class implementing an interface to allow connection factories to be used for creating new connections (that can then be registered). """ def _init(self) -> None: """Additional initialization tasks.""" super()._init() self._conn_factories: dict[str, ConnectionFactory] = {} self._conn_names: dict[ConnectionFactory, list[str]] = {}
[docs] def register_connection_factory( self, factory: ConnectionFactory, *namespaces: str ) -> bool: """Attempt to register a connection factory.""" result = False assert isinstance(factory, ConnectionFactory), factory name = factory.__class__.__name__ snake_name = obj_class_to_snake(factory) if ( name not in self._conn_factories and snake_name not in self._conn_factories ): self._conn_factories[name] = factory self._conn_factories[snake_name] = factory self._conn_names[factory] = [*namespaces] result = True self.logger.debug( "Registered '%s' (%s) connection factory.", name, snake_name ) return result
[docs] async def factory_client( self, factory: str, name: str, *args, defer: bool = False, **kwargs ) -> bool: """ Attempt to register a client connection using a registered factory. """ result = False if factory in self._conn_factories: factory_inst = self._conn_factories[factory] views = kwargs.pop("views", {}) buttons = ActionButton.from_top_level(kwargs.pop("buttons", [])) conn = factory_inst.client(name, *args, **kwargs) if not defer: conn = await conn # type: ignore result = self.register_connection( conn, *self._conn_names[factory_inst], name, views=views, buttons=buttons, ) return result
[docs] async def factory_server(self, factory: str, *args, **kwargs) -> bool: """Attempt to create a server task using a registered factory.""" result = False if factory in self._conn_factories: factory_inst = self._conn_factories[factory] self._servers.append( _asyncio.create_task( await factory_inst.server_task( # type: ignore self.stop_sig, self.manager, self._servers_started, *args, **kwargs, ) ) ) result = True return result