Source code for runtimepy.task.trig

"""
A module implementing basic trigonometric tasks.
"""

# internal
from runtimepy.mixins.trig import TrigMixin
from runtimepy.net.arbiter.task import ArbiterTask as _ArbiterTask
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory


[docs] class SinusoidTask(_ArbiterTask, TrigMixin): """A task for logging metrics.""" auto_finalize = True def _init_state(self) -> None: """Add channels to this instance's channel environment.""" TrigMixin.__init__(self, self.env)
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" self.dispatch_trig(self.metrics.dispatches.value) return True
[docs] class Sinusoid(_TaskFactory[SinusoidTask]): """A factory for the sinusoid task.""" kind = SinusoidTask