Coverage for arrakis/mux.py: 71.2%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-13 15:09 -0700

1# Copyright (c) 2022, California Institute of Technology and contributors 

2# 

3# You should have received a copy of the licensing terms for this 

4# software included in the file "LICENSE" located in the top-level 

5# directory of this package. If you did not, you can view a copy at 

6# https://git.ligo.org/ngdd/arrakis-server/-/raw/main/LICENSE 

7 

8import heapq 

9import logging 

10import warnings 

11from collections import defaultdict 

12from collections.abc import Iterable, Iterator, Mapping 

13from dataclasses import dataclass 

14from datetime import timedelta 

15from enum import Enum, auto 

16from typing import Generic, TypeVar 

17 

18import gpstime 

19import numpy 

20 

21from .block import Time 

22 

23logger = logging.getLogger("arrakis") 

24 

25T = TypeVar("T") 

26 

27DEFAULT_TIMEOUT = timedelta(seconds=1) 

28 

29 

30class OnDrop(Enum): 

31 IGNORE = auto() 

32 RAISE = auto() 

33 WARN = auto() 

34 

35 

36@dataclass 

37class MuxedData(Mapping, Generic[T]): 

38 """Container that holds timestamped data. 

39 

40 Parameters 

41 ---------- 

42 time : int 

43 The timestamp associated with this data, in nanoseconds. 

44 data : dict[str, T] 

45 The keyed data. 

46 

47 """ 

48 

49 time: int 

50 data: dict[str, T] 

51 

52 def __getitem__(self, index: str) -> T: 

53 return self.data[index] 

54 

55 def __iter__(self) -> Iterator[str]: 

56 return iter(self.data) 

57 

58 def __len__(self) -> int: 

59 return len(self.data) 

60 

61 

62class Muxer(Generic[T]): 

63 """A data structure that multiplexes items from multiple named streams. 

64 

65 Given items from multiple named streams with monotonically increasing 

66 integer timestamps, this data structure can be used to pull out sets of 

67 synchronized items (items all with the same timestamp). 

68 

69 The oldest items will be held until either all named streams are 

70 available or until the timeout has been reached. If a start time has been 

71 set, any items with an older timestamp will be rejected. 

72 

73 Parameters 

74 ---------- 

75 keys : Iterable[str] 

76 Identifiers for the named streams to expect when adding items. 

77 start : int, optional 

78 The GPS time to start muxing items for. 

79 If not set, accept items from any time. 

80 timeout : timedelta or None, optional 

81 The maximum time to wait for messages from named streams, in seconds, 

82 before multiplexing. If None is specified, wait indefinitely. Default 

83 is 1 second. 

84 

85 """ 

86 

87 def __init__( 

88 self, 

89 keys: Iterable[str], 

90 start: int | None = None, 

91 timeout: timedelta | None = DEFAULT_TIMEOUT, 

92 ) -> None: 

93 self._keys = set(keys) 

94 self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict()) 

95 self._times: list[int] = [] 

96 self._last_time = start if start is not None else numpy.iinfo(numpy.int64).min 

97 self._start = start 

98 self._timeout = timeout 

99 

100 # track when processing started to handle lookback properly 

101 self._processing_start_time = int(gpstime.gpsnow() * Time.SECONDS) 

102 

103 def push(self, time: int, key: str, item: T, on_drop: str = "warn") -> None: 

104 """Push an item into the muxer. 

105 

106 Parameters 

107 ---------- 

108 time : int 

109 The timestamp associated with this item. 

110 key : str 

111 The key stream associated with this item. Must match a key provided 

112 at initialization. 

113 item : T 

114 The item to add. 

115 on_drop : str, optional 

116 Specifies behavior when the item would be dropped from the muxer, 

117 in the case that it was not provided to the muxer before the 

118 specified timeout. Options are 'ignore', 'raise', or 'warn'. 

119 Default is 'warn'. 

120 

121 """ 

122 if key not in self._keys: 

123 msg = f"{key} doesn't match keys provided at initialization" 

124 raise KeyError(msg) 

125 

126 # skip over items that have already been pulled 

127 if time <= self._last_time: 

128 if self._start is not None and time < self._start: 

129 return 

130 msg = f"item's timestamp is too old: ({time} <= {self._last_time})" 

131 match OnDrop[on_drop.upper()]: 

132 case OnDrop.IGNORE: 

133 return 

134 case OnDrop.RAISE: 

135 raise ValueError(msg) 

136 case OnDrop.WARN: 

137 logger.warning(msg) 

138 warnings.warn(msg, stacklevel=2) 

139 return 

140 

141 # add item 

142 if time in self._items: 

143 if key not in self._items[time]: 

144 self._items[time][key] = item 

145 else: 

146 heapq.heappush(self._times, time) 

147 self._items[time][key] = item 

148 

149 def pull(self) -> Iterator[MuxedData[T]]: 

150 """Pull monotonically increasing synchronized items from the muxer. 

151 

152 Yields 

153 ------ 

154 MuxedData[T] 

155 Synchronized items with a common timestamp, keyed by stream keys. 

156 

157 """ 

158 if not self._times: 

159 return 

160 

161 # yield items in monotonically increasing order as long 

162 # as conditions are met 

163 time = self._times[0] 

164 while self._has_all_items(time) or self._are_items_stale(time): 

165 yield MuxedData(time, self._items.pop(time)) 

166 self._last_time = heapq.heappop(self._times) 

167 if not self._times: 

168 break 

169 time = self._times[0] 

170 

171 def _has_all_items(self, time: int): 

172 """Check if a timestamp has all items requested.""" 

173 return len(self._items[time]) == len(self._keys) 

174 

175 def _are_items_stale(self, time): 

176 """Check if a timestamp is older than the latency cutoff.""" 

177 if self._timeout is None: 

178 return False 

179 

180 time_now = gpstime.gpsnow() 

181 dt_lookback = max(self._processing_start_time - time, 0) / float(Time.SECONDS) 

182 dt_timeout = self._timeout.total_seconds() 

183 oldest_time_allowed = time_now - dt_lookback - dt_timeout 

184 return time <= int(oldest_time_allowed * Time.SECONDS)