Source code for runtimepy.primitives.array

"""
A module for implementing arrays of arbitrary primitives.
"""

# built-in
from copy import copy as _copy
from struct import pack as _pack
from struct import unpack as _unpack
from typing import NamedTuple
from typing import cast as _cast

# third-party
from vcorelib.io import BinaryMessage

# internal
from runtimepy.primitives import AnyPrimitive as _AnyPrimitive
from runtimepy.primitives import Primitivelike as _Primitivelike
from runtimepy.primitives import create as _create
from runtimepy.primitives.byte_order import (
    DEFAULT_BYTE_ORDER as _DEFAULT_BYTE_ORDER,
)
from runtimepy.primitives.byte_order import ByteOrder as _ByteOrder
from runtimepy.primitives.serializable import Serializable


[docs] class ArrayFragmentSpec(NamedTuple): """Information that can be used to construct an array fragment.""" index_start: int index_end: int byte_start: int byte_end: int
[docs] class PrimitiveArray(Serializable): """A class for managing primitives as arrays.""" def __init__( self, *primitives: _AnyPrimitive, byte_order: _ByteOrder = _DEFAULT_BYTE_ORDER, fragments: list[ArrayFragmentSpec] = None, chain: Serializable = None, ) -> None: """Initialize this primitive array.""" self._primitives: list[_AnyPrimitive] = [] # Keep track of a quick lookup for converting between element indices # and byte indices. self._bytes_to_index: dict[int, int] = {0: 0} self._index_to_bytes: dict[int, int] = {0: 0} self.size = 0 self.chain = None super().__init__(byte_order=byte_order, chain=chain) self._format: str = self.byte_order.fmt # Add initial items. for item in primitives: self.add(item) self._fragments: list["PrimitiveArray"] = [] self._fragment_specs: list[ArrayFragmentSpec] = [] # Create array fragments from the specifications. if fragments is None: fragments = [] for spec in fragments: self._add_fragment(spec)
[docs] def reset(self) -> None: """Reset this array so it's empty.""" self._primitives = [] self._format = self.byte_order.fmt self.size = 0 self._bytes_to_index = {0: 0} self._index_to_bytes = {0: 0} self._fragments = [] self._fragment_specs = []
@property def num_fragments(self) -> int: """Get the number of fragments belonging to this array.""" return len(self._fragments)
[docs] def fragment(self, index: int) -> "PrimitiveArray": """A simple accessor for fragments.""" return self._fragments[index]
def _create_fragment(self, spec: ArrayFragmentSpec) -> "PrimitiveArray": """Create a new array fragment from a fragment specification.""" return PrimitiveArray( *self._primitives[spec.index_start : spec.index_end], byte_order=self.byte_order, ) def _index_fragment_spec( self, start: int, end: int = -1 ) -> ArrayFragmentSpec: """Create an array-fragment specification from array indices.""" # Allow '-1' to include all elements to the right. if end == -1: end = len(self._primitives) assert end > start # The 'byte_at_index' calls sufficiently validate the inputs. return ArrayFragmentSpec( start, end, self.byte_at_index(start), self.byte_at_index(end) ) def _byte_fragment_spec( self, start: int, end: int = -1 ) -> ArrayFragmentSpec: """Create an array-fragment specification from byte indices.""" # Allow '-1' to include all bytes to the right. if end == -1: end = self.byte_at_index(len(self._primitives)) assert end > start # The 'index_at_byte' calls sufficiently validate the inputs. return ArrayFragmentSpec( self.index_at_byte(start), self.index_at_byte(end), start, end ) def _add_fragment(self, spec: ArrayFragmentSpec) -> int: """Add a new array fragment from a fragment specification.""" # The index of the new fragment will be equivalent to the current # length. result = len(self._fragments) self._fragment_specs.append(spec) self._fragments.append(self._create_fragment(spec)) return result
[docs] def fragment_from_indices(self, start: int, end: int = -1) -> int: """ Create a new array fragment from primitive-member indices and return the fragment index. """ return self._add_fragment(self._index_fragment_spec(start, end))
[docs] def fragment_from_byte_indices(self, start: int, end: int = -1) -> int: """ Create a new array fragment from byte indices and return the fragment index. """ return self._add_fragment(self._byte_fragment_spec(start, end))
[docs] def byte_at_index(self, index: int) -> int: """ Get the byte index that a primitive at the provided index starts at. This can also be thought of as the size of the array leading up to the element at this index. """ return self._index_to_bytes[index]
[docs] def index_at_byte(self, count: int) -> int: """Determine the array index that a byte index lands on.""" return self._bytes_to_index[count]
def _copy_impl(self) -> "PrimitiveArray": """Make a copy of this primitive array.""" return PrimitiveArray( *[_cast(_AnyPrimitive, x.copy()) for x in self._primitives], byte_order=self.byte_order, fragments=_copy(self._fragment_specs), ) def __getitem__(self, index: int) -> _AnyPrimitive: """Access underlying primitives by index.""" return self._primitives[index]
[docs] def add_primitive( self, kind: _Primitivelike, array_length: int = None ) -> list[_AnyPrimitive]: """Add to the array by specifying the type of element to add.""" return self.add(_create(kind), array_length=array_length)
[docs] def add( self, primitive: _AnyPrimitive, array_length: int = None ) -> list[_AnyPrimitive]: """Add another primitive to manage.""" result = [] end = self.end if isinstance(end, PrimitiveArray): if end is self: self._primitives.append(primitive) self._format += primitive.kind.format self.size += primitive.size result.append(primitive) # Handle array length. if array_length is not None: for _ in range(array_length - 1): inst = primitive.copy() self._primitives.append( inst, # type: ignore ) self._format += inst.kind.format self.size += inst.size result.append(inst) # type: ignore # Add tracking information for the current tail. curr_idx = len(self._primitives) self._bytes_to_index[self.size] = curr_idx self._index_to_bytes[curr_idx] = self.size else: result.extend(end.add(primitive, array_length=array_length)) # Add a new primitive array to the end of this chain for this # primitive. else: new_array = PrimitiveArray(byte_order=self.byte_order) end.assign(new_array) result.extend(new_array.add(primitive, array_length=array_length)) return result
def __bytes__(self) -> bytes: """Get this primitive array as a bytes instance.""" return _pack(self._format, *(x.value for x in self._primitives))
[docs] def fragment_bytes(self, index: int) -> bytes: """Get bytes from a fragment.""" return bytes(self._fragments[index])
[docs] def update(self, data: BinaryMessage, timestamp_ns: int = None) -> int: """Update primitive values from a bytes instance.""" for primitive, item in zip( self._primitives, _unpack(self._format, data) ): primitive.set_value(item, timestamp_ns=timestamp_ns) return self.size
[docs] def update_fragment( self, index: int, data: bytes, timestamp_ns: int = None ) -> None: """Update a fragment by index.""" self._fragments[index].update(data, timestamp_ns=timestamp_ns)
[docs] def randomize(self, timestamp_ns: int = None, chain: bool = True) -> None: """Randomize array contents.""" for prim in self._primitives: prim.randomize(timestamp_ns=timestamp_ns) if ( chain and self.chain is not None and isinstance(self.chain, PrimitiveArray) ): self.chain.randomize(timestamp_ns=timestamp_ns, chain=chain)