Source code for runtimepy.primitives.base

"""
A module implementing a base, primitive-type storage entity.
"""

# built-in
from contextlib import contextmanager as _contextmanager
from copy import copy as _copy
from math import isclose as _isclose
from typing import BinaryIO as _BinaryIO
from typing import Callable as _Callable
from typing import Generic as _Generic
from typing import Iterator as _Iterator
from typing import TypeVar as _TypeVar

# third-party
from vcorelib.io import BinaryMessage
from vcorelib.math import default_time_ns, nano_str
from vcorelib.math.keeper import TimeSource

# internal
from runtimepy.primitives.byte_order import (
    DEFAULT_BYTE_ORDER as _DEFAULT_BYTE_ORDER,
)
from runtimepy.primitives.byte_order import ByteOrder as _ByteOrder
from runtimepy.primitives.scaling import ChannelScaling, Numeric, apply, invert
from runtimepy.primitives.types import AnyPrimitiveType as _AnyPrimitiveType
from runtimepy.primitives.types.base import PythonPrimitive as _PythonPrimitive
from runtimepy.util import Identifier

T = _TypeVar("T", bool, int, float)

# Current value first, new value next.
PrimitiveChangeCallaback = _Callable[[T, T], None]

IDENT = Identifier()
IDENT.curr_id = 0
IDENT.scale = 1


[docs] class Primitive(_Generic[T]): """A simple class for storing an underlying primitive value.""" # Use network byte-order by default. byte_order: _ByteOrder = _DEFAULT_BYTE_ORDER # Nominally set the primitive type at the class level. kind: _AnyPrimitiveType def __hash__(self) -> int: """A hash for this instance.""" return self._hash def __init__( self, value: T = None, scaling: ChannelScaling = None, time_source: TimeSource = default_time_ns, ) -> None: """Initialize this primitive.""" self.raw = self.kind.instance() self.curr_callback: int = 0 self.callbacks: dict[int, tuple[PrimitiveChangeCallaback[T], bool]] = ( {} ) self.time_source = time_source self.set_streak: int = 0 self.last_updated_ns: int = self.time_source() self.prev_updated_ns = self.last_updated_ns self(value=value) self.scaling = scaling self._hash = IDENT()
[docs] @classmethod def valid_primitive(cls, primitive: _PythonPrimitive) -> bool: """Determine if a Python primitive is valid for this class.""" return cls.kind.valid_primitive(primitive)
[docs] def age_ns(self, now: int = None) -> int: """Get the age of this primitive's value in nanoseconds.""" if now is None: now = self.time_source() return now - self.last_updated_ns
[docs] def age_str(self, now: int = None) -> str: """Get the age of this primitive's value as a string.""" return nano_str(self.age_ns(now=now))
@property def size(self) -> int: """Get the size of this primitive.""" return self.kind.size def __copy__(self) -> "Primitive[T]": """Make a copy of this primitive.""" return type(self)( value=self.value, scaling=self.scaling, time_source=self.time_source, )
[docs] def copy(self) -> "Primitive[T]": """A simple wrapper for copy.""" return _copy(self)
[docs] def register_callback( self, callback: PrimitiveChangeCallaback[T], once: bool = False ) -> int: """Register a callback and return an identifier for it.""" callback_id = self.curr_callback self.curr_callback += 1 self.callbacks[callback_id] = callback, once return callback_id
[docs] def remove_callback(self, callback_id: int) -> bool: """Remove a callback if one is registered with this identifier.""" result = callback_id in self.callbacks if result: del self.callbacks[callback_id] return result
[docs] @_contextmanager def callback( self, callback: PrimitiveChangeCallaback[T] ) -> _Iterator[None]: """Register a callback as a managed context.""" ident = self.register_callback(callback) try: yield finally: self.remove_callback(ident)
@property def value(self) -> T: """Obtain the underlying value.""" return self.raw.value # type: ignore @value.setter def value(self, value: T) -> None: """Set a new underlying value.""" self.set_value(value) def _check_callbacks(self, curr: T, new: T) -> None: """Determine if any callbacks should be serviced.""" if curr != new: if self.callbacks: to_remove = [] for ident, (callback, once) in self.callbacks.items(): callback(curr, new) if once: to_remove.append(ident) # Remove one-time callbacks. for item in to_remove: self.remove_callback(item) self.set_streak = 0 else: self.set_streak += 1
[docs] def set_value(self, value: T, timestamp_ns: int = None) -> None: """Set a new underlying value.""" # Set new timestamp. if timestamp_ns is None: timestamp_ns = self.time_source() self.last_updated_ns = timestamp_ns curr: T = self.raw.value # type: ignore self.raw.value = value self._check_callbacks(curr, value) self.prev_updated_ns = self.last_updated_ns
@property def scaled(self) -> Numeric: """Get this primitive as a scaled value.""" return self.scale(self.value) @scaled.setter def scaled(self, value: T) -> None: """Set this value but invert scaling information.""" self.value = self.invert(value) # type: ignore
[docs] def scale(self, value: T) -> Numeric: """Scale a value using this primitive's scaling.""" return apply(value, scaling=self.scaling)
[docs] def invert(self, value: T) -> Numeric: """Invert a value using this primitive's scaling.""" val = invert( value, scaling=self.scaling, should_round=self.kind.is_integer ) if self.kind.int_bounds is not None: val = self.kind.int_bounds.clamp(val) # type: ignore return val
def __call__(self, value: T = None) -> T: """ A callable interface for setting and getting the underlying value. """ if value is not None: self.value = value return self.value def __str__(self) -> str: """Get this primitive's value as a string.""" return f"{self.value} ({self.kind})" def __eq__(self, other) -> bool: """ Determine if this instance is equivalent to the provided argument. """ if isinstance(other, Primitive): other = other.raw.value if self.kind.is_float: return _isclose(self.raw.value, other) return bool(self.raw.value == other) def __bool__(self) -> bool: """Use the underlying value for boolean evaluation.""" return bool(self.raw)
[docs] def binary(self, byte_order: _ByteOrder = None) -> bytes: """Convert this instance to a byte array.""" if byte_order is None: byte_order = self.byte_order return self.kind.encode(self.value, byte_order=byte_order)
def __bytes__(self) -> bytes: """Convert this instance to a byte array.""" return self.binary()
[docs] def to_stream( self, stream: _BinaryIO, byte_order: _ByteOrder = None ) -> int: """Write this primitive to a stream.""" stream.write(self.binary(byte_order=byte_order)) return self.kind.size
[docs] def update( self, data: BinaryMessage, byte_order: _ByteOrder = None, timestamp_ns: int = None, ) -> T: """Update this primitive from a bytes object.""" if byte_order is None: byte_order = self.byte_order self.set_value( self.kind.decode(data, byte_order=byte_order), # type: ignore timestamp_ns=timestamp_ns, ) return self.value
[docs] def from_stream( self, stream: _BinaryIO, byte_order: _ByteOrder = None, timestamp_ns: int = None, ) -> T: """Update this primitive from a stream and return the new value.""" return self.update( stream.read(self.kind.size), byte_order=byte_order, timestamp_ns=timestamp_ns, )
[docs] @classmethod def encode(cls, value: T, byte_order: _ByteOrder = None) -> bytes: """Create a bytes instance based on this primitive type.""" if byte_order is None: byte_order = cls.byte_order return cls.kind.encode(value, byte_order=byte_order)
[docs] @classmethod def decode(cls, data: BinaryMessage, byte_order: _ByteOrder = None) -> T: """Decode a primitive of this type from provided data.""" if byte_order is None: byte_order = cls.byte_order return cls.kind.decode(data, byte_order=byte_order) # type: ignore
[docs] @classmethod def read(cls, stream: _BinaryIO, byte_order: _ByteOrder = None) -> T: """ Read a primitive from the provided stream based on this primitive type. """ if byte_order is None: byte_order = cls.byte_order return cls.kind.read(stream, byte_order=byte_order) # type: ignore
[docs] @classmethod def write( cls, value: T, stream: _BinaryIO, byte_order: _ByteOrder = None ) -> int: """Write a primitive to the stream based on this type.""" if byte_order is None: byte_order = cls.byte_order return cls.kind.write(value, stream, byte_order=byte_order)