Source code for runtimepy.net.arbiter.imports
"""
A module extending capability of the connection arbiter using Python import
machinery.
"""
# built-in
from importlib import import_module as _import_module
# third-party
from vcorelib.io.types import JsonObject as _JsonObject
from vcorelib.names import import_str_and_item, to_snake
# internal
from runtimepy.net.arbiter.factory import (
ConnectionFactory as _ConnectionFactory,
)
from runtimepy.net.arbiter.factory import (
FactoryConnectionArbiter as _FactoryConnectionArbiter,
)
from runtimepy.net.arbiter.factory.task import (
TaskConnectionArbiter as _TaskConnectionArbiter,
)
from runtimepy.net.arbiter.info import RuntimeStruct as _RuntimeStruct
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory
from runtimepy.subprocess.peer import RuntimepyPeer as _RuntimepyPeer
[docs]
class ImportConnectionArbiter(
_FactoryConnectionArbiter, _TaskConnectionArbiter
):
"""
A class implementing extensions to the connection arbiter for working with
arbitrary Python modules.
"""
def _init(self) -> None:
"""Additional initialization tasks."""
super()._init()
self._struct_factories: dict[str, type[_RuntimeStruct]] = {}
self._struct_names: dict[type[_RuntimeStruct], list[str]] = {}
self._peer_factories: dict[str, type[_RuntimepyPeer]] = {}
self._peer_names: dict[type[_RuntimepyPeer], list[str]] = {}
[docs]
def register_peer_factory(
self, factory: type[_RuntimepyPeer], *namespaces: str
) -> bool:
"""Attempt to register a subprocess peer factory."""
result = False
name = factory.__name__
snake_name = to_snake(name)
if (
name not in self._peer_factories
and snake_name not in self._peer_factories
):
self._peer_factories[name] = factory
self._peer_factories[snake_name] = factory
self._peer_names[factory] = [*namespaces]
result = True
self.logger.debug(
"Registered '%s' (%s) subprocess peer factory.",
name,
snake_name,
)
return result
[docs]
def register_struct_factory(
self, factory: type[_RuntimeStruct], *namespaces: str
) -> bool:
"""Attempt to register a struct factory."""
result = False
name = factory.__name__
snake_name = to_snake(name)
if (
name not in self._struct_factories
and snake_name not in self._struct_factories
):
self._struct_factories[name] = factory
self._struct_factories[snake_name] = factory
self._struct_names[factory] = [*namespaces]
result = True
self.logger.debug(
"Registered '%s' (%s) struct factory.", name, snake_name
)
return result
[docs]
def factory_struct(
self, factory: str, name: str, config: _JsonObject
) -> bool:
"""Register a runtime structure from factory and name."""
result = False
if factory in self._struct_factories and name not in self._structs:
self._structs[name] = self._struct_factories[factory](name, config)
result = True
return result
[docs]
def factory_process(
self, factory: str, name: str, top_level: _JsonObject
) -> bool:
"""Register a runtime process."""
result = False
if factory in self._peer_factories and name not in self._peers:
self._peers[name] = (
self._peer_factories[factory],
name,
top_level.get("config", {}), # type: ignore
str(top_level["program"]),
top_level.get("markdown"),
)
result = True
return result
[docs]
def register_module_factory(
self, module_path: str, *namespaces: str, **kwargs
) -> bool:
"""Attempt to register a factory class based on its module path."""
module, factory_class = import_str_and_item(module_path)
raw_import = getattr(_import_module(module), factory_class)
result = False
# Handle factories that don't need factory-class proxying.
if isinstance(raw_import, type):
if issubclass(raw_import, _RuntimeStruct):
result = self.register_struct_factory(raw_import, *namespaces)
elif issubclass(raw_import, _RuntimepyPeer):
result = self.register_peer_factory(raw_import, *namespaces)
if not result:
# We need to call the factory class to create an instance.
inst = raw_import(**kwargs)
# Determine what kind of factory to register.
result = False
if isinstance(inst, _ConnectionFactory):
result = self.register_connection_factory(inst, *namespaces)
elif isinstance(inst, _TaskFactory):
result = self.register_task_factory(inst, *namespaces)
return result