"""
A module implementing an interface to build communication protocols.
"""
# built-in
from contextlib import contextmanager, suppress
from copy import copy as _copy
from io import StringIO
from typing import BinaryIO as _BinaryIO
from typing import Iterator as _Iterator
from typing import NamedTuple
from typing import Optional as _Optional
from typing import TypeVar as _TypeVar
from typing import Union as _Union
# third-party
from vcorelib.io.types import JsonObject as _JsonObject
from vcorelib.logging import LoggerType
# internal
from runtimepy.enum import RuntimeEnum as _RuntimeEnum
from runtimepy.enum.registry import DEFAULT_ENUM_PRIMITIVE, EnumRegistry
from runtimepy.primitives import AnyPrimitive as _AnyPrimitive
from runtimepy.primitives import PrimitiveInstancelike
from runtimepy.primitives import Primitivelike as _Primitivelike
from runtimepy.primitives import normalize_instance as _normalize_instance
from runtimepy.primitives.array import PrimitiveArray
from runtimepy.primitives.byte_order import (
DEFAULT_BYTE_ORDER as _DEFAULT_BYTE_ORDER,
)
from runtimepy.primitives.byte_order import ByteOrder as _ByteOrder
from runtimepy.primitives.field.fields import BitFields as _BitFields
from runtimepy.primitives.field.manager import BitFieldsManager
from runtimepy.primitives.int import UnsignedInt
from runtimepy.primitives.serializable import Serializable, SerializableMap
from runtimepy.registry.name import NameRegistry as _NameRegistry
from runtimepy.registry.name import RegistryKey as _RegistryKey
from runtimepy.ui.controls import Default
ProtocolPrimitive = _Union[int, float, bool, str]
[docs]
class FieldSpec(NamedTuple):
"""Information specifying a protocol field."""
name: str
kind: str
enum: _Optional[_RegistryKey] = None
array_length: _Optional[int] = None
default: Default = None
description: _Optional[str] = None
config: _Optional[dict[str, ProtocolPrimitive]] = None
[docs]
def is_array(self) -> bool:
"""Determine if this instance is an array."""
return self.array_length is not None and self.array_length > 1
[docs]
def asdict(self) -> _JsonObject:
"""Obtain a dictionary representing this instance."""
result: _JsonObject = {
"name": self.name,
"kind": self.kind,
"array_length": self.array_length,
}
if self.enum is not None:
result["enum"] = self.enum
if self.default is not None:
result["default"] = self.default
if self.description:
result["description"] = self.description
if self.config:
result["config"] = self.config # type: ignore
return result
T = _TypeVar("T", bound="ProtocolBase")
ProtocolBuild = list[_Union[tuple[int, str], FieldSpec, tuple[str, int]]]
[docs]
class ProtocolBase(PrimitiveArray):
"""A class for defining runtime communication protocols."""
def __init__(
self,
enum_registry: EnumRegistry,
names: _NameRegistry = None,
fields: BitFieldsManager = None,
build: ProtocolBuild = None,
identifier: int = 1,
identifier_primitive: PrimitiveInstancelike = DEFAULT_ENUM_PRIMITIVE,
byte_order: _Union[_ByteOrder, _RegistryKey] = _DEFAULT_BYTE_ORDER,
serializables: SerializableMap = None,
alias: str = None,
) -> None:
"""Initialize this protocol."""
self.id = identifier
self.id_primitive: UnsignedInt = _normalize_instance( # type: ignore
identifier_primitive
)
assert self.id_primitive.kind.is_integer
self.id_primitive.value = self.id
self.alias = alias
# Register the byte-order enumeration if it's not present.
self.enum_registry = enum_registry
if not self.enum_registry.get("ByteOrder"):
_ByteOrder.register_enum(self.enum_registry)
# Each instance gets its own array.
if not isinstance(byte_order, _ByteOrder):
byte_order = _ByteOrder(
self.enum_registry["ByteOrder"].get_int(byte_order)
)
super().__init__(byte_order=byte_order)
if names is None:
names = _NameRegistry()
self.names = names
if fields is None:
fields = BitFieldsManager(self.names, self.enum_registry)
self._fields = fields
self._regular_fields: dict[str, list[_AnyPrimitive]] = {}
self._enum_fields: dict[str, _RuntimeEnum] = {}
# Keep track of the order that the protocol was created.
self.build: ProtocolBuild = []
# Keep track of named serializables.
self.serializables: SerializableMap = {}
# Add fields if necessary.
if build is None:
build = []
for item in build:
if isinstance(item, FieldSpec):
self.add_field(
item.name,
item.kind,
enum=item.enum,
array_length=item.array_length,
default=item.default,
description=item.description,
config=item.config,
)
elif isinstance(item[0], str):
assert serializables, (item, serializables)
name = item[0]
self.add_serializable(
name,
serializables[name][0],
array_length=None if item[1] == 1 else item[1],
)
del serializables[name]
elif isinstance(item[0], int):
self._add_bit_fields(
item[1], self._fields.fields[item[0]], index=item[0]
)
# Ensure all serializables were handled via build.
assert not serializables, serializables
def _copy_impl(self: T) -> T:
"""Create another protocol instance from this one."""
return self.__class__(
self.enum_registry,
names=self.names,
fields=_copy(self._fields),
build=self.build,
byte_order=self.byte_order,
identifier=self.id,
identifier_primitive=self.id_primitive.kind.name,
serializables={
key: [val[0].copy_without_chain()]
for key, val in self.serializables.items()
},
alias=self.alias,
)
[docs]
def register_name(self, name: str) -> int:
"""Register the field name."""
ident = self.names.register_name(name)
assert ident is not None, f"Couldn't register field '{name}'!"
return ident
[docs]
def add_serializable(
self, name: str, serializable: Serializable, array_length: int = None
) -> None:
"""Add a serializable instance."""
self.register_name(name)
instances = self.add_to_end(serializable, array_length=array_length)
self.build.append((name, len(instances)))
assert name not in self.serializables, name
self.serializables[name] = instances
[docs]
def add_field(
self,
name: str,
kind: _Primitivelike | _Optional[_AnyPrimitive] = None,
enum: _RegistryKey = None,
serializable: Serializable = None,
array_length: int = None,
track: bool = True,
default: Default = None,
description: str = None,
config: dict[str, ProtocolPrimitive] = None,
) -> None:
"""Add a new field to the protocol."""
# Add the serializable to the end of this protocol.
if serializable is not None:
assert kind is None and enum is None
self.add_serializable(
name, serializable, array_length=array_length
)
return
self.register_name(name)
runtime_enum = None
if enum is not None:
runtime_enum = self.enum_registry[enum]
self._enum_fields[name] = runtime_enum
# Allow the primitive type to be overridden when passed as a
# method argument.
if kind is None:
kind = runtime_enum.primitive
# Translate possible enum default.
raw = default or runtime_enum.default
if raw:
assert not isinstance(raw, float)
default = runtime_enum.as_int(raw)
assert kind is not None
inst = _normalize_instance(kind)
# Assign default.
if default is not None:
inst(default) # type: ignore
assert name not in self._regular_fields, name
self._regular_fields[name] = self.add(inst, array_length=array_length)
if track:
self.build.append(
FieldSpec(
name,
inst.kind.name,
enum,
array_length=array_length,
default=default,
description=description,
config=config,
)
)
def _add_bit_fields(
self, name: str, fields: _BitFields, index: int = None
) -> None:
"""Add a bit-fields instance."""
# If the index is known, these fields are already registered.
if index is None:
index = self._fields.add(fields)
self.build.append((index, name))
self.add_field(name, kind=fields.raw, track=False)
[docs]
@contextmanager
def add_bit_fields(
self, name: str, kind: _Primitivelike | _AnyPrimitive = "uint8"
) -> _Iterator[_BitFields]:
"""Add a bit-fields primitive to the protocol."""
new = _BitFields.new(value=kind)
yield new
self._add_bit_fields(name, new)
[docs]
def get_primitive(self, name: str, index: int = 0) -> _AnyPrimitive:
"""Get an underlying primitive instance."""
return self._regular_fields[name][index]
[docs]
def get_fields(self, index: int) -> _BitFields:
"""Get a bit-fields instance from its build identifier."""
return self._fields.fields[index]
[docs]
def value(
self, name: str, resolve_enum: bool = True, index: int = 0
) -> ProtocolPrimitive:
"""Get the value of a field belonging to the protocol."""
val: ProtocolPrimitive = 0
if name in self._regular_fields:
val = self.get_primitive(name, index=index).value
# Resolve the enum value.
if resolve_enum and name in self._enum_fields:
with suppress(KeyError):
val = self._enum_fields[name].get_str(val) # type: ignore
return val
return self._fields.get(name, resolve_enum=resolve_enum)
[docs]
def trace_size(self, logger: LoggerType) -> None:
"""Log a size trace."""
logger.info("%s: %s", self, self.length_trace(alias=self.alias))
def __str__(self) -> str:
"""Get this instance as a string."""
with StringIO() as stream:
stream.write(
"{"
+ f"{self.resolve_alias(alias=self.alias)}({self.length()}): "
)
parts = []
for name in self.names.registered_order:
part = name
count = 1
if name in self.serializables:
count = len(self.serializables[name])
part += f"<{self.serializables[name][0].resolve_alias()}>"
elif name in self._regular_fields:
count = len(self._regular_fields[name])
part += f"<{self._regular_fields[name][0].kind}>"
if count > 1:
part += f"[{count}] = ["
if name in self._regular_fields:
part += ", ".join(
str(x.value) for x in self._regular_fields[name]
)
else:
part += ", ".join("..." for _ in range(count))
part += "]"
else:
part += f" = {self[name]}"
parts.append(part)
stream.write(", ".join(parts))
stream.write("}")
return stream.getvalue()
def __getitem__(self, name: str) -> ProtocolPrimitive: # type: ignore
"""Get the value of a protocol field."""
if name in self.serializables:
return str(self.serializables[name][0])
return self.value(name)
[docs]
def set(self, name: str, val: ProtocolPrimitive, index: int = 0) -> None:
"""Set a value of a field belonging to the protocol."""
if name in self._regular_fields:
# Resolve an enum value.
if isinstance(val, str):
val = self._enum_fields[name].get_int(val)
self._regular_fields[name][index].value = val
elif name in self.serializables and isinstance(val, str):
self.serializables[name][index].update_str(val)
else:
self._fields.set(name, val) # type: ignore
def __setitem__(self, name: str, val: ProtocolPrimitive) -> None:
"""Set a value of a field belonging to the protocol."""
self.set(name, val)
[docs]
def write_with_id(self, stream: _BinaryIO) -> None:
"""Write this instance to a stream with its identifier as a prefix."""
self.id_primitive.to_stream(stream, byte_order=self.byte_order)
self.to_stream(stream)