Source code for runtimepy.net.arbiter.factory.task
"""
A module implementing task-factory registration.
"""
# third-party
from vcorelib.names import obj_class_to_snake
# internal
from runtimepy.net.arbiter.base import (
BaseConnectionArbiter as _BaseConnectionArbiter,
)
from runtimepy.net.arbiter.task import ArbiterTask as _ArbiterTask
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory
Factory = _TaskFactory[_ArbiterTask]
[docs]
class TaskConnectionArbiter(_BaseConnectionArbiter):
"""A class for managing task factories."""
def _init(self) -> None:
"""Additional initialization tasks."""
super()._init()
self._task_factories: dict[str, Factory] = {}
self._task_names: dict[Factory, list[str]] = {}
[docs]
def factory_task(
self, factory: str, name: str, period_s: float = None, **kwargs
) -> bool:
"""
Register a periodic task from one of the registered task factories.
"""
result = False
if factory in self._task_factories:
result = self.task_manager.register(
self._task_factories[factory].kind(name, **kwargs),
period_s=period_s,
)
return result
[docs]
def register_task_factory(
self, factory: Factory, *namespaces: str
) -> bool:
"""Attempt to register a periodic task factory."""
result = False
assert isinstance(factory, _TaskFactory), factory
name = factory.__class__.__name__
snake_name = obj_class_to_snake(factory)
if (
name not in self._task_factories
and snake_name not in self._task_factories
):
self._task_factories[name] = factory
self._task_factories[snake_name] = factory
self._task_names[factory] = [*namespaces]
result = True
self.logger.debug(
"Registered '%s' (%s) task factory.", name, snake_name
)
return result