Source code for runtimepy.control.step

"""
A module implementing a manual-stepping interface (e.g. program/human
controlled clock).
"""

# built-in
from typing import cast

# third-party
from vcorelib.math import (
    RateTracker,
    SimulatedTime,
    default_time_ns,
    from_nanos,
    metrics_time_ns,
    restore_time_source,
    set_simulated_source,
)

# internal
from runtimepy.net.arbiter import AppInfo
from runtimepy.net.arbiter.info import RuntimeStruct
from runtimepy.net.arbiter.task import ArbiterTask, TaskFactory
from runtimepy.net.manager import ConnectionManager
from runtimepy.net.server.websocket import RuntimepyWebsocketConnection
from runtimepy.primitives import Bool, Double, Uint32

DONT_POLL_SET = {
    "ToggleStepper",
    "UiState",
    "SampleStruct",
}


[docs] def should_poll(kind: type[RuntimeStruct]) -> bool: """ Determine if a toggle stepper should poll the provided type of struct. """ return kind.__name__ not in DONT_POLL_SET and not kind.__name__.startswith( "Rt" )
[docs] def refresh_all_plots(manager: ConnectionManager) -> None: """Signal to clients to refresh all data.""" for conn in manager.by_type(RuntimepyWebsocketConnection): for interface in conn.send_interfaces.values(): interface({"actions": ["clear_points"]})
[docs] class ToggleStepper(RuntimeStruct): """A simple struct that ties a clock toggle to various runtime entities.""" step: Bool simulate_time: Bool time_s: Double count: Uint32 counts: Uint32 count_rate: Double count_rate_tracker: RateTracker to_poll: set[RuntimeStruct] timer: SimulatedTime def _poll_time(self) -> None: """Update current time.""" self.time_s.value = from_nanos(default_time_ns())
[docs] def init_env(self) -> None: """Initialize this toggle stepper environment.""" self.step = Bool() self.simulate_time = Bool() self.time_s = Double() self.env.channel( "time_s", self.time_s, description="Current time (based on default time source).", ) self._poll_time() self.count = Uint32( value=cast(int, self.config.get("count", 1)), time_source=metrics_time_ns, ) self.counts = Uint32(time_source=metrics_time_ns) self.count_rate = Double(time_source=metrics_time_ns) self.count_rate_tracker = RateTracker(source=metrics_time_ns) self.to_poll = set() self.timer = SimulatedTime(cast(int, self.config.get("step_dt_ns", 1))) def do_step(_: bool, __: bool) -> None: """Poll every step edge.""" for _ in range(self.count.raw.value): # type: ignore self.poll() self.step.register_callback(do_step) self.env.channel( "step", self.step, description="Toggle to drive 'count' iterations forward.", commandable=True, ) self.env.channel( "count", self.count, controls="steps_1_1000", description="The number of iterations to step.", commandable=True, ) self.env.channel( "counts", self.counts, description="Total number of counts." ) self.env.channel( "count_rate", self.count_rate, description="Counts per second (based on realtime clock).", ) self.env.channel( "simulate_time", self.simulate_time, description=( "Whether or not time is controlled by the simulated source." ), commandable=True, ) def do_simulate_time(_: bool, curr: bool) -> None: """Toggle the time source selection.""" if curr: set_simulated_source(self.timer) else: restore_time_source() # Signal to the UI to clear plot data. refresh_all_plots(self.app.conn_manager) self._poll_time() self.simulate_time.register_callback(do_simulate_time) # Register other structs if configured to do so. if self.config.get("global", True): for struct in self.app.structs.values(): if should_poll(type(struct)): # type: ignore self.to_poll.add(struct) # type: ignore
[docs] def poll(self) -> None: """Poll all other entities.""" self._poll_time() for item in self.to_poll: item.poll() # Bug? # pylint: disable=no-member self.counts.value += 1 # pylint: enable=no-member self.count_rate.value = self.count_rate_tracker() self.timer.step()
[docs] class ToggleStepperTask(ArbiterTask): """A task for automatically stepping a toggle-stepper clock.""" steppers: list[ToggleStepper]
[docs] async def init(self, app: AppInfo) -> None: """Initialize this task with application information.""" await super().init(app) self.steppers = list(app.search_structs(ToggleStepper)) # Start paused. self.paused.value = True
[docs] async def dispatch(self) -> bool: """Dispatch an iteration of this task.""" for stepper in self.steppers: stepper.step.toggle() return True
[docs] class StepperToggler(TaskFactory[ToggleStepperTask]): """A task factory for toggle stepper tasks.""" kind = ToggleStepperTask