Source code for runtimepy.control.source

"""
A module implementing signal source structs.
"""

# built-in
from abc import abstractmethod
from typing import Any, Generic, cast

# internal
from runtimepy.control.env import amplitude
from runtimepy.net.arbiter.info import RuntimeStruct
from runtimepy.primitives import Double, T


[docs] class PrimitiveSource(RuntimeStruct, Generic[T]): """A simple output-source struct.""" kind: type[T] outputs: list[T] amplitudes: list[Double] length: int
[docs] def init_source(self) -> None: """Initialize this value source."""
[docs] @abstractmethod def source(self, index: int) -> float | int | bool: """Provide the next value."""
[docs] def init_env(self) -> None: """Initialize this double-source environment.""" self.outputs = [] self.amplitudes = [] # Load 'count' from config. count: int = self.config.get("count", 1) # type: ignore for idx in range(count): # Output channel. output = self.kind() self.outputs.append(output) self.env.channel(f"{idx}.output", output) # Amplitude channel. self.amplitudes.append( amplitude(self.env, Double, name=f"{idx}.amplitude") ) self.init_source() self.length = len(self.outputs)
[docs] def poll(self) -> None: """Update the outputs.""" for idx in range(self.length): # Difficult to avoid cast. self.outputs[idx].value = cast( Any, self.amplitudes[idx].value * self.source(idx) )
[docs] class DoubleSource(PrimitiveSource[Double]): # pylint:disable=abstract-method """A simple double output source.""" kind = Double