Source code for runtimepy.net.arbiter.config

"""
A module implementing a configuration-file interface for registering client
connections or servers.
"""

# built-in
from importlib import import_module as _import_module
from site import addsitedir as _addsitedir
import sys
from typing import Any as _Any
from typing import Callable as _Callable
from typing import Iterable as _Iterable

# third-party
from vcorelib.dict import merge as _merge
from vcorelib.dict.env import dict_resolve_env_vars, list_resolve_env_vars
from vcorelib.io import ARBITER as _ARBITER
from vcorelib.io.types import JsonObject as _JsonObject
from vcorelib.logging import LoggerMixin as _LoggerMixin
from vcorelib.names import import_str_and_item
from vcorelib.paths import Pathlike as _Pathlike
from vcorelib.paths import find_file
from vcorelib.paths import normalize as _normalize

# internal
from runtimepy import DEFAULT_EXT, PKG_NAME
from runtimepy.net.arbiter.config.codec import ConnectionArbiterConfig
from runtimepy.net.arbiter.config.util import fix_args, fix_kwargs, list_adder
from runtimepy.net.arbiter.imports import (
    ImportConnectionArbiter as _ImportConnectionArbiter,
)
from runtimepy.net.arbiter.imports.util import get_apps

ConfigObject = dict[str, _Any]
ConfigBuilder = _Callable[[ConfigObject], None]


[docs] def handle_config_builders(data: ConfigObject, logger: _LoggerMixin) -> None: """Run any configured configuration-data building methods.""" for builder in data.get("config_builders", []): module, method = import_str_and_item(str(builder)) with logger.log_time("Running config-builder '%s'", builder): getattr(_import_module(module), method)(data)
[docs] class ConfigConnectionArbiter(_ImportConnectionArbiter): """ A class implementing a configuration loading interface for the connection arbiter. """ search_packages: list[str] = []
[docs] @classmethod def add_search_package(cls, name: str, front: bool = True) -> None: """Add a package to the search path.""" name = name.replace("-", "_") if name not in cls.search_packages: list_adder(cls.search_packages, name, front=front)
[docs] async def load_configs( self, paths: _Iterable[_Pathlike], wait_for_stop: bool = False ) -> None: """Load a client and server configuration to the arbiter.""" loaded = set() # Load and meld configuration data. config_data: _JsonObject = {} for path in paths: # Try the path itself. found = find_file(path, logger=self.logger, include_cwd=True) # Try package search path next. if found is None: for pkg in self.search_packages: found = find_file( f"{path}.{DEFAULT_EXT}", logger=self.logger, package=pkg, ) if found is not None: break assert found is not None, f"Couldn't find '{path}'!" # Only load files once. absolute = found.resolve() if absolute not in loaded: _merge( config_data, _ARBITER.decode( found, includes_key="includes", require_success=True, logger=self.logger, ).data, logger=self.logger, ) loaded.add(absolute) # Add the working directory and parent directories for module loading # / package discovery. directories = set(str(_normalize(x).parent) for x in paths) directories.add( str( config_data.setdefault( "directory", str(_normalize(".").resolve()) ) ) ) for directory in directories: # Add the site directory to facilitate module discovery. _addsitedir(directory) # Add directory to Python path. if directory not in sys.path: sys.path.append(directory) # Run any JIT config methods. handle_config_builders(config_data, self) assert "root" not in self._config, self._config self._config["root"] = config_data # type: ignore await self.process_config( ConnectionArbiterConfig(data=config_data), wait_for_stop=wait_for_stop, )
[docs] async def process_config( self, config: ConnectionArbiterConfig, wait_for_stop: bool = False ) -> None: """Register clients and servers from a configuration object.""" names = set() self._ports = config.ports # Registier factories. for factory in config.factories: name = factory["name"] # Double specifying a factory (because of include shenanigans) # should be fine. if name not in names: assert self.register_module_factory( name, *factory.get("namespaces", []), **dict_resolve_env_vars( factory.get("kwargs", {}), env=self._ports, # type: ignore ), ), f"Couldn't register factory '{factory}'!" names.add(name) # Register clients. for client in config.clients: factory = client["factory"] name = client["name"] # Resolve any port variables that may have been used. args = list_resolve_env_vars( client.get("args", []), env=self._ports # type: ignore ) kwargs = dict_resolve_env_vars( client.get("kwargs", {}), env=self._ports # type: ignore ) assert await self.factory_client( factory, name, *fix_args(args, self._ports), defer=client["defer"], # Perform some known fixes for common keyword arguments. **fix_kwargs(kwargs), views=client.get("views"), markdown=client.get("markdown"), buttons=client.get("buttons", []), ), f"Couldn't register client '{name}' ({factory})!" # Register servers. for server in config.servers: factory = server["factory"] assert await self.factory_server( factory, *list_resolve_env_vars( server.get("args", []), env=self._ports # type: ignore ), **dict_resolve_env_vars( server.get("kwargs", {}), env=self._ports # type: ignore ), ), f"Couldn't register a '{factory}' server!" # Register tasks. for task in config.tasks: name = task["name"] factory = task["factory"] assert self.factory_task( factory, name, period_s=task["period_s"], average_depth=task["average_depth"], markdown=task.get("markdown"), config=task.get("config"), ), f"Couldn't register task '{name}' ({factory})!" # Register structs. for struct in config.structs: name = struct["name"] factory = struct["factory"] assert self.factory_struct( struct["factory"], struct["name"], struct.get("config", {}) ), f"Couldn't register struct '{name}' ({factory})!" # Register processes. for process in config.processes: name = process["name"] factory = process["factory"] assert self.factory_process( factory, name, # Allow port information to pass through. dict_resolve_env_vars( process, env=self._ports, # type: ignore ), ), f"Couldn't register process '{name}' ({factory})!" # Load initialization methods. self._inits = get_apps(config.inits) # Set the new application entry if it's set. apps = get_apps(config.app, wait_for_stop=wait_for_stop) if apps: self._apps = apps # Update application configuration data if necessary. if config.config is not None: root = self._config["root"] self._config = config.config assert "root" not in config.config, config.config config.config["root"] = root # Register commands. self._commands = config.commands
ConfigConnectionArbiter.add_search_package(PKG_NAME)