Coverage for arrakis/mux.py: 71.2%
80 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-13 15:09 -0700
1# Copyright (c) 2022, California Institute of Technology and contributors
2#
3# You should have received a copy of the licensing terms for this
4# software included in the file "LICENSE" located in the top-level
5# directory of this package. If you did not, you can view a copy at
6# https://git.ligo.org/ngdd/arrakis-server/-/raw/main/LICENSE
8import heapq
9import logging
10import warnings
11from collections import defaultdict
12from collections.abc import Iterable, Iterator, Mapping
13from dataclasses import dataclass
14from datetime import timedelta
15from enum import Enum, auto
16from typing import Generic, TypeVar
18import gpstime
19import numpy
21from .block import Time
23logger = logging.getLogger("arrakis")
25T = TypeVar("T")
27DEFAULT_TIMEOUT = timedelta(seconds=1)
30class OnDrop(Enum):
31 IGNORE = auto()
32 RAISE = auto()
33 WARN = auto()
36@dataclass
37class MuxedData(Mapping, Generic[T]):
38 """Container that holds timestamped data.
40 Parameters
41 ----------
42 time : int
43 The timestamp associated with this data, in nanoseconds.
44 data : dict[str, T]
45 The keyed data.
47 """
49 time: int
50 data: dict[str, T]
52 def __getitem__(self, index: str) -> T:
53 return self.data[index]
55 def __iter__(self) -> Iterator[str]:
56 return iter(self.data)
58 def __len__(self) -> int:
59 return len(self.data)
62class Muxer(Generic[T]):
63 """A data structure that multiplexes items from multiple named streams.
65 Given items from multiple named streams with monotonically increasing
66 integer timestamps, this data structure can be used to pull out sets of
67 synchronized items (items all with the same timestamp).
69 The oldest items will be held until either all named streams are
70 available or until the timeout has been reached. If a start time has been
71 set, any items with an older timestamp will be rejected.
73 Parameters
74 ----------
75 keys : Iterable[str]
76 Identifiers for the named streams to expect when adding items.
77 start : int, optional
78 The GPS time to start muxing items for.
79 If not set, accept items from any time.
80 timeout : timedelta or None, optional
81 The maximum time to wait for messages from named streams, in seconds,
82 before multiplexing. If None is specified, wait indefinitely. Default
83 is 1 second.
85 """
87 def __init__(
88 self,
89 keys: Iterable[str],
90 start: int | None = None,
91 timeout: timedelta | None = DEFAULT_TIMEOUT,
92 ) -> None:
93 self._keys = set(keys)
94 self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict())
95 self._times: list[int] = []
96 self._last_time = start if start is not None else numpy.iinfo(numpy.int64).min
97 self._start = start
98 self._timeout = timeout
100 # track when processing started to handle lookback properly
101 self._processing_start_time = int(gpstime.gpsnow() * Time.SECONDS)
103 def push(self, time: int, key: str, item: T, on_drop: str = "warn") -> None:
104 """Push an item into the muxer.
106 Parameters
107 ----------
108 time : int
109 The timestamp associated with this item.
110 key : str
111 The key stream associated with this item. Must match a key provided
112 at initialization.
113 item : T
114 The item to add.
115 on_drop : str, optional
116 Specifies behavior when the item would be dropped from the muxer,
117 in the case that it was not provided to the muxer before the
118 specified timeout. Options are 'ignore', 'raise', or 'warn'.
119 Default is 'warn'.
121 """
122 if key not in self._keys:
123 msg = f"{key} doesn't match keys provided at initialization"
124 raise KeyError(msg)
126 # skip over items that have already been pulled
127 if time <= self._last_time:
128 if self._start is not None and time < self._start:
129 return
130 msg = f"item's timestamp is too old: ({time} <= {self._last_time})"
131 match OnDrop[on_drop.upper()]:
132 case OnDrop.IGNORE:
133 return
134 case OnDrop.RAISE:
135 raise ValueError(msg)
136 case OnDrop.WARN:
137 logger.warning(msg)
138 warnings.warn(msg, stacklevel=2)
139 return
141 # add item
142 if time in self._items:
143 if key not in self._items[time]:
144 self._items[time][key] = item
145 else:
146 heapq.heappush(self._times, time)
147 self._items[time][key] = item
149 def pull(self) -> Iterator[MuxedData[T]]:
150 """Pull monotonically increasing synchronized items from the muxer.
152 Yields
153 ------
154 MuxedData[T]
155 Synchronized items with a common timestamp, keyed by stream keys.
157 """
158 if not self._times:
159 return
161 # yield items in monotonically increasing order as long
162 # as conditions are met
163 time = self._times[0]
164 while self._has_all_items(time) or self._are_items_stale(time):
165 yield MuxedData(time, self._items.pop(time))
166 self._last_time = heapq.heappop(self._times)
167 if not self._times:
168 break
169 time = self._times[0]
171 def _has_all_items(self, time: int):
172 """Check if a timestamp has all items requested."""
173 return len(self._items[time]) == len(self._keys)
175 def _are_items_stale(self, time):
176 """Check if a timestamp is older than the latency cutoff."""
177 if self._timeout is None:
178 return False
180 time_now = gpstime.gpsnow()
181 dt_lookback = max(self._processing_start_time - time, 0) / float(Time.SECONDS)
182 dt_timeout = self._timeout.total_seconds()
183 oldest_time_allowed = time_now - dt_lookback - dt_timeout
184 return time <= int(oldest_time_allowed * Time.SECONDS)