Source code for runtimepy.channel.registry
"""
A module implementing a channel registry.
"""
# built-in
from contextlib import ExitStack, contextmanager
from typing import Any as _Any
from typing import BinaryIO, Iterable, Iterator, NamedTuple
from typing import Optional as _Optional
from typing import Union
# third-party
from vcorelib.io import ByteFifo
from vcorelib.io.types import JsonObject as _JsonObject
# internal
from runtimepy.channel import AnyChannel as _AnyChannel
from runtimepy.channel import Channel as _Channel
from runtimepy.channel.event.header import PrimitiveEventHeader
from runtimepy.codec.protocol import Protocol
from runtimepy.mapping import DEFAULT_PATTERN
from runtimepy.metrics.channel import ChannelMetrics
from runtimepy.mixins.regex import CHANNEL_PATTERN as _CHANNEL_PATTERN
from runtimepy.primitives import ChannelScaling, Primitive
from runtimepy.primitives import Primitivelike as _Primitivelike
from runtimepy.primitives import normalize
from runtimepy.primitives.types.base import PythonPrimitive
from runtimepy.registry import Registry as _Registry
from runtimepy.registry.name import NameRegistry as _NameRegistry
from runtimepy.registry.name import RegistryKey as _RegistryKey
from runtimepy.ui.controls import Controlslike, Default, normalize_controls
[docs]
class ChannelNameRegistry(_NameRegistry):
"""A name registry with a name-matching pattern for channel names."""
name_regex = _CHANNEL_PATTERN
ChannelEventMap = dict[str, list["ParsedEvent"]]
[docs]
class ParsedEvent(NamedTuple):
"""A raw channel event."""
name: str
timestamp: int
value: PythonPrimitive
[docs]
@staticmethod
def by_channel(
event_stream: Iterable["ParsedEvent"],
) -> ChannelEventMap:
"""
Get a dictionary of channel events broken down by individual channels.
"""
result: ChannelEventMap = {}
for event in event_stream:
result.setdefault(event.name, []).append(event)
return result
[docs]
class ChannelRegistry(_Registry[_Channel[_Any]]):
"""A runtime enumeration registry."""
name_registry = ChannelNameRegistry
event_header: Protocol
event_fifo: ByteFifo
header_ready: bool
@property
def kind(self) -> type[_Channel[_Any]]:
"""Determine what kind of registry this is."""
return _Channel
[docs]
def init(self, data: _JsonObject) -> None:
"""Perform implementation-specific initialization."""
super().init(data)
self.event_header = PrimitiveEventHeader.instance()
self.header_ready = False
self.event_fifo = ByteFifo()
[docs]
def channel(
self,
name: str,
kind: Union[Primitive[_Any], _Primitivelike],
commandable: bool = False,
enum: _RegistryKey = None,
scaling: ChannelScaling = None,
description: str = None,
default: Default = None,
controls: Controlslike = None,
**kwargs,
) -> _Optional[_AnyChannel]:
"""Create a new channel."""
if isinstance(kind, str):
kind = normalize(kind)
if isinstance(kind, Primitive):
primitive = kind
else:
primitive = kind()
if scaling:
assert not primitive.scaling or scaling == primitive.scaling, (
scaling,
primitive.scaling,
)
primitive.scaling = scaling
data: _JsonObject = {
"type": str(primitive.kind),
"commandable": commandable,
**kwargs,
}
if enum is not None:
data["enum"] = enum
if description:
data["description"] = description
if default is not None:
data["default"] = default
if controls:
data["controls"] = normalize_controls(controls) # type: ignore
result = self.register_dict(name, data)
# Replace the underlying primitive, in case it was direclty passed in.
if result is not None:
result.update_primitive(primitive)
return result
[docs]
@contextmanager
def registered(
self,
stream: BinaryIO,
pattern: str = DEFAULT_PATTERN,
exact: bool = False,
flush: bool = False,
channel: ChannelMetrics = None,
) -> Iterator[list[str]]:
"""
Register a stream as a managed context. Returns a list of all channels
registered.
"""
with ExitStack() as stack:
names = []
for name, chan in self.search(pattern, exact=exact):
stack.enter_context(
chan.event.registered(stream, flush=flush, channel=channel)
)
names.append(name)
yield names