Source code for runtimepy.channel.environment.base

"""
A module implementing a base channel environment.
"""

# built-in
import math
from typing import AsyncIterator as _AsyncIterator
from typing import Iterable as _Iterable
from typing import Optional as _Optional
from typing import Union as _Union
from typing import cast as _cast

# third-party
from vcorelib.namespace import DEFAULT_DELIM, Namespace
from vcorelib.namespace import NamespaceMixin as _NamespaceMixin

# internal
from runtimepy.channel import AnyChannel as _AnyChannel
from runtimepy.channel import BoolChannel as _BoolChannel
from runtimepy.channel import FloatChannel as _FloatChannel
from runtimepy.channel import IntChannel as _IntChannel
from runtimepy.channel.registry import ChannelRegistry as _ChannelRegistry
from runtimepy.enum import RuntimeEnum as _RuntimeEnum
from runtimepy.enum.registry import EnumRegistry as _EnumRegistry
from runtimepy.mixins.finalize import FinalizeMixin
from runtimepy.primitives import BaseIntPrimitive, StrToBool
from runtimepy.primitives.evaluation import EvalResult, Operator, sample_for
from runtimepy.primitives.field import BitField as _BitField
from runtimepy.primitives.field.fields import BitFields as _BitFields
from runtimepy.primitives.field.manager import (
    BitFieldsManager as _BitFieldsManager,
)
from runtimepy.registry.name import RegistryKey as _RegistryKey
from runtimepy.util import Identifier

ChannelValue = _Union[bool, int, float, str]
ValueMap = dict[_RegistryKey, ChannelValue]

ChannelResult = tuple[_AnyChannel, _Optional[_RuntimeEnum]]
BitfieldResult = tuple[_BitField, _Optional[_RuntimeEnum]]
BoolChannelResult = tuple[_BoolChannel, _Optional[_RuntimeEnum]]
IntChannelResult = tuple[_IntChannel, _Optional[_RuntimeEnum]]

FieldOrChannel = _Union[_BitField, _AnyChannel]
IDENTITY = Identifier(scale=1)


[docs] class BaseChannelEnvironment(_NamespaceMixin, FinalizeMixin): """A class integrating channel and enumeration registries.""" def __init__( self, channels: _ChannelRegistry = None, enums: _EnumRegistry = None, values: ValueMap = None, fields: _Iterable[_BitFields] = None, namespace: Namespace = None, namespace_delim: str = DEFAULT_DELIM, views: dict[str, str] = None, identity: int = None, ) -> None: """Initialize this channel environment.""" _NamespaceMixin.__init__( self, namespace=namespace, namespace_delim=namespace_delim ) FinalizeMixin.__init__(self) if channels is None: channels = _ChannelRegistry() if enums is None: enums = _EnumRegistry() self.channels = channels self.enums = enums # Register fields. self.fields = _BitFieldsManager(channels.names, self.enums) if fields: for field in fields: self.fields.add(field) self.to_add: list[_BitFields] = [] # Keep a mapping of each channel's name and integer identifier to the # underlying enumeration. self.channel_enums: dict[_AnyChannel, _RuntimeEnum] = { chan: self.enums[chan.enum] for chan in self.channels.items.values() if chan.is_enum } # Organize channel by Python type. self.bools: set[_BoolChannel] = { chan for chan in self.channels.items.values() if chan.type.is_boolean } self.ints: set[_IntChannel] = { chan for chan in self.channels.items.values() if chan.type.is_integer } self.floats: set[_FloatChannel] = { chan for chan in self.channels.items.values() if chan.type.is_float } # Apply initial values if they were provided. if values is not None: self.apply(values) if not views: views = {} self.views = views # For UI telemetry. if identity is None: identity = IDENTITY() self.id = identity def __setitem__(self, key: _RegistryKey, value: ChannelValue) -> None: """Mapping-set interface.""" return self.set(key, value)
[docs] def set( self, key: _RegistryKey, value: ChannelValue, scaled: bool = True ) -> None: """Attempt to set an arbitrary channel value.""" # Set a field value if this key maps to a bit-field. if self.fields.has_field(key): assert not isinstance(value, float) self.fields.set(key, value, scaled=scaled) return chan, enum = self[key] # Resolve enum values. if isinstance(value, str): # Ensure that the channel has an associated enumeration. if enum is None: resolved = False is_int = chan.raw.kind.is_integer if is_int or chan.raw.kind.is_float: kind = int if is_int and not chan.raw.scaling else float try: value = kind(value) resolved = True except ValueError: pass # Handle booleans. else: parsed = StrToBool.parse(value) value = parsed.result resolved = parsed.valid if not resolved: raise ValueError( ( f"Can't assign '{value}' to channel " f"'{self.channels.names.name(key)}'!" ) ) else: value = ( enum.get_bool(value) if chan.type.is_boolean else enum.get_int(value) ) # Assign the value to the channel. if scaled: chan.raw.scaled = value # type: ignore else: chan.raw.value = value # type: ignore
[docs] def apply(self, values: ValueMap) -> None: """Apply a map of values to the environment.""" for key, value in values.items(): self.set(key, value)
[docs] def values(self, resolve_enum: bool = True) -> ValueMap: """Get a new dictionary of current channel values.""" result: ValueMap = {} for name in self.channels.names.names: value = self.value(name, resolve_enum=resolve_enum) # Don't store NaN. Python will allow JSON encoding but e.g. # browsers (and the JSON specification) don't support decoding # NaN. if isinstance(value, str) or not math.isnan(value): result[name] = value return result
[docs] def value( self, key: _RegistryKey, resolve_enum: bool = True, scaled: bool = True, value: ChannelValue = None, ) -> ChannelValue: """Attempt to get a channel's current value.""" # Get the value from a field if this key points to a bit-field. if self.fields.has_field(key): return self.fields.get( key, resolve_enum=resolve_enum, scaled=scaled ) chan, enum = self[key] if value is None: value = chan.raw.scaled if scaled else chan.raw.value # Resolve enumeration values to strings. if enum is not None and resolve_enum: value = enum.get_str(_cast(int, value)) return value
[docs] def exists(self, val: _RegistryKey) -> bool: """Determine if a channel exists.""" return self.fields.has_field(val) or self.get(val) is not None
[docs] def field_or_channel(self, val: _RegistryKey) -> _Optional[FieldOrChannel]: """Attempt to look up a field or channel for a given registry key.""" channel: _Optional[FieldOrChannel] = None chan = self.get(val) if chan is None: # Check if the name is a field. field = self.fields.get_field(val) if field is not None: channel = field else: channel, _ = chan return channel
[docs] def get(self, val: _RegistryKey) -> _Optional[ChannelResult]: """Attempt to get a channel and its enumeration (if it has one).""" chan = self.channels.get(val) if chan is None: return None enum = None if chan in self.channel_enums: enum = self.channel_enums[chan] return chan, enum
def __getitem__(self, key: _RegistryKey) -> ChannelResult: """Get a channel and its enumeration.""" result = self.get(key) if result is None: raise KeyError(f"No channel '{key}'!") return result
[docs] def get_int(self, key: _RegistryKey) -> IntChannelResult: """Get an integer channel.""" result = self[key] # Ensure that this is an integer channel. if result[0] not in self.ints: raise KeyError("Channel '{key}' is not integer!") return _cast(_IntChannel, result[0]), result[1]
[docs] def add_int(self, key: _RegistryKey, amount: int) -> int: """Modify an integer channel.""" chan = self.get_int(key)[0] chan.raw.value += amount return chan.raw.value
[docs] def get_bool(self, key: _RegistryKey) -> BoolChannelResult: """Get a boolean channel.""" result = self[key] # Ensure that this is an integer channel. if result[0] not in self.bools: raise KeyError("Channel '{key}' is not boolean!") return _cast(_BoolChannel, result[0]), result[1]
[docs] async def wait_for_bool( self, key: _RegistryKey, state: bool, timeout: float ) -> EvalResult: """ wait for a boolean state to reach a provided state within a timeout. """ return await self.get_bool(key)[0].raw.wait_for_state(state, timeout)
[docs] async def wait_for_numeric( self, key: _RegistryKey, value: int | float, timeout: float, operation: Operator = Operator.EQUAL, ) -> EvalResult: """Wait for a numeric event.""" return await _cast(_FloatChannel, self[key][0]).raw.wait_for_value( value, timeout, operation=operation )
[docs] async def wait_for_numeric_isclose( self, key: _RegistryKey, value: float, timeout: float, rel_tol: float = 1e-09, abs_tol: float = 0.0, ) -> EvalResult: """Wait for a numeric event.""" return await _cast(_FloatChannel, self[key][0]).raw.wait_for_isclose( value, timeout, rel_tol=rel_tol, abs_tol=abs_tol )
[docs] async def sample_int_for( self, key: _RegistryKey, timeout: float, count: int = -1, current: bool = True, scale: bool = True, ) -> _AsyncIterator[tuple[int | float, int]]: """Sample an integer channel.""" prim = self.get_int(key)[0].raw scale = scale and bool(prim.scaling) async for value, timestamp in sample_for( prim, timeout, count=count, current=current ): yield prim.scale(value) if scale else value, timestamp
[docs] async def sample_float_for( self, key: _RegistryKey, timeout: float, count: int = -1, current: bool = True, scale: bool = True, ) -> _AsyncIterator[tuple[float, int]]: """Sample a floating-point channel.""" prim = self.get_float(key).raw scale = scale and bool(prim.scaling) async for value, timestamp in sample_for( prim, timeout, count=count, current=current ): yield prim.scale(value) if scale else value, timestamp
[docs] async def sample_bool_for( self, key: _RegistryKey, timeout: float, count: int = -1, current: bool = True, ) -> _AsyncIterator[tuple[bool, int]]: """Sample a boolean channel.""" async for sample in sample_for( self.get_bool(key)[0].raw, timeout, count=count, current=current ): yield sample
[docs] async def wait_for_enum( self, key: _RegistryKey, value: str, timeout: float ) -> EvalResult: """Wait for an enumeration channel to reach a specific value.""" chan, enum = self[key] assert enum is not None, f"'{key}' has no enum! ({chan})" # Handle boolean enumerations. if enum.is_boolean: _translated = enum.as_bool(value) assert _translated is not None, key return await self.wait_for_bool(key, _translated, timeout) # Translate an integer enumeration. translated = enum.as_int(value) assert translated is not None, key return await _cast(BaseIntPrimitive, chan.raw).wait_for_value( translated, timeout )
[docs] async def sample_enum_for( self, key: _RegistryKey, timeout: float, count: int = -1, current: bool = True, ) -> _AsyncIterator[tuple[str, int]]: """Sample an enumeration channel.""" chan, enum = self[key] assert enum is not None, f"'{key}' has no enum! ({chan})" async for sample in sample_for( _cast(_IntChannel, chan).raw, timeout, count=count, current=current ): yield enum.get_str(sample[0]), sample[1]
[docs] def get_float(self, key: _RegistryKey) -> _FloatChannel: """Get a floating-point channel.""" result = self[key] if result[0] not in self.floats: raise KeyError("Channel '{key}' is not a float!") return _cast(_FloatChannel, result[0])
def __eq__(self, other) -> bool: """Determine if two channel environments are equivalent.""" return bool( self.channels == other.channels and self.enums == other.enums )
[docs] def age_ns(self, key: _RegistryKey) -> int: """Get the age of an entity based on registry key.""" chan = self.get(key) if chan is not None: prim = chan[0].raw else: prim = self.fields[key].raw return prim.age_ns()
[docs] def add_field(self, field: _BitField, namespace: Namespace = None) -> str: """Add a bit field to the environment.""" result = self.fields.add_field(field) if result is not None: self.to_add.append(result) return self.namespace(name=field.name, namespace=namespace)
[docs] def finalize(self, strict: bool = True) -> None: """Finalize this instance.""" for fields in self.to_add: self.fields.add(fields) self.to_add = [] super().finalize(strict=strict)