Source code for runtimepy.primitives.serializable.base
"""
A module defining a base interface fore serializable objects.
"""
# built-in
from abc import ABC, abstractmethod
from copy import copy as _copy
from io import BytesIO as _BytesIO
from typing import BinaryIO as _BinaryIO
from typing import TypeVar
# third-party
from vcorelib import DEFAULT_ENCODING
from vcorelib.io import BinaryMessage
# internal
from runtimepy.primitives.byte_order import (
DEFAULT_BYTE_ORDER as _DEFAULT_BYTE_ORDER,
)
from runtimepy.primitives.byte_order import ByteOrder as _ByteOrder
T = TypeVar("T", bound="Serializable")
[docs]
class Serializable(ABC):
"""An interface for serializable objects."""
size: int
def __init__(
self,
byte_order: _ByteOrder = _DEFAULT_BYTE_ORDER,
chain: T = None,
) -> None:
"""Initialize this instance."""
if not hasattr(self, "size"):
self.size = 0
self.byte_order = byte_order
self.chain = chain
[docs]
def length(self) -> int:
"""Get the full length of this chain."""
result = self.size
if self.chain is not None:
result += self.chain.length()
return result
[docs]
def resolve_alias(self, alias: str = None) -> str:
"""Resolve a possible alias string."""
if not alias:
alias = getattr(self, "alias") or self.__class__.__name__
return alias
[docs]
def length_trace(self, alias: str = None) -> str:
"""Get a length-tracing string for this instance."""
current = f"{self.resolve_alias(alias=alias)}({self.size})"
if self.chain is not None:
current += " -> " + self.chain.length_trace()
return current
@property
def end(self) -> "Serializable":
"""Get the end of this chain."""
result = self
if self.chain is not None:
result = self.chain.end
return result
@abstractmethod
def _copy_impl(self: T) -> T:
"""Make a copy of this instance."""
[docs]
def copy_without_chain(self: T) -> T:
"""A method for copying instances without chain references."""
orig = self._copy_impl()
orig.byte_order = self.byte_order
return orig
def __copy__(self: T) -> T:
"""Make a copy of this serializable."""
result = self.copy_without_chain()
if self.chain is not None and result.chain is None:
result.assign(self.chain.copy())
return result
[docs]
def copy(self: T) -> T:
"""Create a copy of a serializable instance."""
return _copy(self)
@abstractmethod
def __bytes__(self) -> bytes:
"""Get this serializable as a bytes instance."""
[docs]
def to_stream(self, stream: _BinaryIO) -> int:
"""Write this serializable to a stream."""
stream.write(bytes(self))
result = self.size
if self.chain is not None:
result += self.chain.to_stream(stream)
return result
[docs]
def chain_bytes(self) -> bytes:
"""Get the fully encoded chain."""
with _BytesIO() as stream:
self.to_stream(stream)
return stream.getvalue()
def __eq__(self, other) -> bool:
"""Equivalent if full byte chains are equal."""
result = False
if isinstance(other, Serializable):
result = self.chain_bytes() == other.chain_bytes()
return result
[docs]
def update_with(self: T, other: T, timestamp_ns: int = None) -> int:
"""Update this instance from another of the same type."""
return self.update_chain(
other.chain_bytes(), timestamp_ns=timestamp_ns
)
[docs]
@abstractmethod
def update(self, data: BinaryMessage, timestamp_ns: int = None) -> int:
"""Update this serializable from a bytes instance."""
[docs]
def update_str(self, data: str, timestamp_ns: int = None) -> int:
"""Update this serializable from string data."""
return self.update(
data.encode(encoding=DEFAULT_ENCODING), timestamp_ns=timestamp_ns
)
def _from_stream(self, stream: _BinaryIO, timestamp_ns: int = None) -> int:
"""Update just this instance from a stream."""
return self.update(stream.read(self.size), timestamp_ns=timestamp_ns)
[docs]
def from_stream(self, stream: _BinaryIO, timestamp_ns: int = None) -> int:
"""Update this serializable from a stream."""
result = self._from_stream(stream, timestamp_ns=timestamp_ns)
if self.chain is not None:
result += self.chain.from_stream(stream, timestamp_ns=timestamp_ns)
return result
[docs]
def update_chain(
self, data: BinaryMessage, timestamp_ns: int = None
) -> int:
"""Update this serializable from a bytes instance."""
with _BytesIO(data) as stream:
return self.from_stream(stream, timestamp_ns=timestamp_ns)
[docs]
def assign(self, chain: T) -> None:
"""Assign a next serializable."""
assert self.chain is None, self.chain
# mypy regression?
self.chain = chain # type: ignore
[docs]
def add_to_end(self, chain: T, array_length: int = None) -> list[T]:
"""Add a new serializable to the end of this chain."""
result = []
# Copy the chain element before it becomes part of the current chain if
# an array is created.
copy_base = None
if array_length is not None:
copy_base = chain.copy()
self.end.assign(chain)
result.append(chain)
# Add additional array elements as copies.
if array_length is not None:
assert copy_base is not None
for _ in range(array_length - 1):
inst = copy_base.copy()
self.end.assign(inst)
result.append(inst)
return result