Source code for runtimepy.channel.event

"""
A module implementing a channel-event protocol.
"""

# built-in
from contextlib import contextmanager
from typing import BinaryIO, Iterator

# third-party
from vcorelib.math import to_nanos

# internal
from runtimepy.channel.event.header import PrimitiveEventHeader
from runtimepy.metrics.channel import ChannelMetrics
from runtimepy.primitives import AnyPrimitive


[docs] class PrimitiveEvent: """A class implementing a simple channel-even interface.""" def __init__( self, primitive: AnyPrimitive, identifier: int, ) -> None: """Initialize this instance.""" self.primitive = primitive self.header = PrimitiveEventHeader.instance() PrimitiveEventHeader.init_header(self.header, identifier) self.prev_ns: int = 0 self.min_period_ns: int = 0 self.streaming = False
[docs] def set_min_period(self, min_period_s: float) -> None: """Set a minimum period.""" assert min_period_s > 0, min_period_s self.min_period_ns = to_nanos(min_period_s)
[docs] @contextmanager def registered( self, stream: BinaryIO, flush: bool = False, channel: ChannelMetrics = None, force: bool = False, ) -> Iterator[None]: """Register a stream as a managed context.""" assert not self.streaming, "Already streaming!" def callback(_, __) -> None: """Emit a change event to the stream.""" self._poll(stream, flush=flush, channel=channel, force=force) # Poll immediately. self.prev_ns = 0 self._poll(stream, flush=flush, channel=channel) raw = self.primitive ident = raw.register_callback(callback) self.streaming = True yield assert raw.remove_callback(ident) self.streaming = False
def _poll( self, stream: BinaryIO, flush: bool = False, channel: ChannelMetrics = None, force: bool = False, ) -> int: """ Poll this event so that if the underlying channel has changed since the last write, we write another event. """ written = 0 # Check timestamp and update header if necessary. raw = self.primitive curr_ns = raw.last_updated_ns if force or curr_ns - self.prev_ns >= self.min_period_ns: self.prev_ns = curr_ns self.header["timestamp"] = curr_ns # Write header then value. array = self.header written += array.to_stream(stream) written += raw.to_stream(stream, byte_order=array.byte_order) if flush: stream.flush() if channel is not None: channel.increment(written) return written